feat: enhance captcha generation with access token validation and configurable max attempts

This commit is contained in:
wxg0103 2025-09-24 18:55:09 +08:00
parent db0ed2662b
commit 0406fbcaf7
2 changed files with 69 additions and 20 deletions

View File

@ -191,7 +191,8 @@ class CaptchaView(APIView):
responses=CaptchaAPI.get_response())
def get(self, request: Request):
username = request.query_params.get('username', None)
return result.success(CaptchaSerializer().generate(username, 'chat'))
accessToken = request.query_params.get('accessToken', None)
return result.success(CaptchaSerializer().chat_generate(username, 'chat', accessToken))
class SpeechToText(APIView):

View File

@ -17,6 +17,7 @@ from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from application.models import ApplicationAccessToken
from common.constants.authentication_type import AuthenticationType
from common.constants.cache_version import Cache_Version
from common.database_model_manage.database_model_manage import DatabaseModelManage
@ -33,8 +34,7 @@ class LoginRequest(serializers.Serializer):
captcha = serializers.CharField(required=False, max_length=64, label=_('captcha'), allow_null=True,
allow_blank=True)
encryptedData = serializers.CharField(required=False, label=_('encryptedData'), allow_null=True,
allow_blank=True)
allow_blank=True)
system_version, system_get_key = Cache_Version.SYSTEM.value
@ -61,6 +61,20 @@ def record_login_fail(username: str, expire: int = 600):
class LoginSerializer(serializers.Serializer):
@staticmethod
def get_auth_setting():
"""获取认证设置"""
auth_setting_model = DatabaseModelManage.get_model('auth_setting')
auth_setting = {}
if auth_setting_model:
setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first()
if setting_obj:
try:
auth_setting = json.loads(setting_obj.param_value) or {}
except Exception:
auth_setting = {}
return auth_setting
@staticmethod
def login(instance):
username = instance.get("username", "")
@ -73,16 +87,7 @@ class LoginSerializer(serializers.Serializer):
except Exception as e:
record_login_fail(username)
raise e
auth_setting_model = DatabaseModelManage.get_model('auth_setting')
# 默认配置
auth_setting = {}
if auth_setting_model:
setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first()
if setting_obj:
try:
auth_setting = json.loads(setting_obj.param_value) or {}
except Exception:
auth_setting = {}
auth_setting = LoginSerializer.get_auth_setting()
max_attempts = auth_setting.get("max_attempts", 1)
password = instance.get("password")
@ -135,10 +140,53 @@ class CaptchaResponse(serializers.Serializer):
class CaptchaSerializer(serializers.Serializer):
@staticmethod
def generate(username: str, type: str = 'system'):
chars = get_random_chars()
image = ImageCaptcha()
data = image.generate(chars)
captcha = base64.b64encode(data.getbuffer())
cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(),
timeout=300, version=Cache_Version.CAPTCHA.get_version())
return {'captcha': 'data:image/png;base64,' + captcha.decode()}
auth_setting = LoginSerializer.get_auth_setting()
max_attempts = auth_setting.get("max_attempts", 1)
need_captcha = False
if max_attempts == -1:
need_captcha = False
elif max_attempts > 0:
fail_count = cache.get(system_get_key(f'{type}_{username}'), version=system_version) or 0
need_captcha = fail_count >= max_attempts
if need_captcha:
chars = get_random_chars()
image = ImageCaptcha()
data = image.generate(chars)
captcha = base64.b64encode(data.getbuffer())
cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(),
timeout=300, version=Cache_Version.CAPTCHA.get_version())
return {'captcha': 'data:image/png;base64,' + captcha.decode()}
return {'captcha': ''}
@staticmethod
def chat_generate(username: str, type: str = 'chat', access_token: str = ''):
auth_setting = {}
application_access_token = ApplicationAccessToken.objects.filter(
access_token=access_token
).first()
if not application_access_token:
raise AppApiException(1005, _('Invalid access token'))
if application_access_token:
auth_setting = application_access_token.authentication_value
max_attempts = auth_setting.get("max_attempts", 1)
need_captcha = False
if max_attempts == -1:
need_captcha = False
elif max_attempts > 0:
fail_count = cache.get(system_get_key(f'{type}_{username}'), version=system_version) or 0
need_captcha = fail_count >= max_attempts
if need_captcha:
chars = get_random_chars()
image = ImageCaptcha()
data = image.generate(chars)
captcha = base64.b64encode(data.getbuffer())
cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(),
timeout=300, version=Cache_Version.CAPTCHA.get_version())
return {'captcha': 'data:image/png;base64,' + captcha.decode()}
return {'captcha': ''}