diff --git a/apps/chat/views/chat.py b/apps/chat/views/chat.py index 8a59e0060..5e5d29f9d 100644 --- a/apps/chat/views/chat.py +++ b/apps/chat/views/chat.py @@ -191,7 +191,8 @@ class CaptchaView(APIView): responses=CaptchaAPI.get_response()) def get(self, request: Request): username = request.query_params.get('username', None) - return result.success(CaptchaSerializer().generate(username, 'chat')) + accessToken = request.query_params.get('accessToken', None) + return result.success(CaptchaSerializer().chat_generate(username, 'chat', accessToken)) class SpeechToText(APIView): diff --git a/apps/users/serializers/login.py b/apps/users/serializers/login.py index ab5a46ab9..0ab9ae89c 100644 --- a/apps/users/serializers/login.py +++ b/apps/users/serializers/login.py @@ -17,6 +17,7 @@ from django.db.models import QuerySet from django.utils.translation import gettext_lazy as _ from rest_framework import serializers +from application.models import ApplicationAccessToken from common.constants.authentication_type import AuthenticationType from common.constants.cache_version import Cache_Version from common.database_model_manage.database_model_manage import DatabaseModelManage @@ -33,8 +34,7 @@ class LoginRequest(serializers.Serializer): captcha = serializers.CharField(required=False, max_length=64, label=_('captcha'), allow_null=True, allow_blank=True) encryptedData = serializers.CharField(required=False, label=_('encryptedData'), allow_null=True, - allow_blank=True) - + allow_blank=True) system_version, system_get_key = Cache_Version.SYSTEM.value @@ -61,6 +61,20 @@ def record_login_fail(username: str, expire: int = 600): class LoginSerializer(serializers.Serializer): + @staticmethod + def get_auth_setting(): + """获取认证设置""" + auth_setting_model = DatabaseModelManage.get_model('auth_setting') + auth_setting = {} + if auth_setting_model: + setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first() + if setting_obj: + try: + auth_setting = json.loads(setting_obj.param_value) or {} + except Exception: + auth_setting = {} + return auth_setting + @staticmethod def login(instance): username = instance.get("username", "") @@ -73,16 +87,7 @@ class LoginSerializer(serializers.Serializer): except Exception as e: record_login_fail(username) raise e - auth_setting_model = DatabaseModelManage.get_model('auth_setting') - # 默认配置 - auth_setting = {} - if auth_setting_model: - setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first() - if setting_obj: - try: - auth_setting = json.loads(setting_obj.param_value) or {} - except Exception: - auth_setting = {} + auth_setting = LoginSerializer.get_auth_setting() max_attempts = auth_setting.get("max_attempts", 1) password = instance.get("password") @@ -135,10 +140,53 @@ class CaptchaResponse(serializers.Serializer): class CaptchaSerializer(serializers.Serializer): @staticmethod def generate(username: str, type: str = 'system'): - chars = get_random_chars() - image = ImageCaptcha() - data = image.generate(chars) - captcha = base64.b64encode(data.getbuffer()) - cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(), - timeout=300, version=Cache_Version.CAPTCHA.get_version()) - return {'captcha': 'data:image/png;base64,' + captcha.decode()} + auth_setting = LoginSerializer.get_auth_setting() + max_attempts = auth_setting.get("max_attempts", 1) + need_captcha = False + if max_attempts == -1: + need_captcha = False + elif max_attempts > 0: + fail_count = cache.get(system_get_key(f'{type}_{username}'), version=system_version) or 0 + need_captcha = fail_count >= max_attempts + + if need_captcha: + chars = get_random_chars() + image = ImageCaptcha() + data = image.generate(chars) + captcha = base64.b64encode(data.getbuffer()) + cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(), + timeout=300, version=Cache_Version.CAPTCHA.get_version()) + return {'captcha': 'data:image/png;base64,' + captcha.decode()} + return {'captcha': ''} + + @staticmethod + def chat_generate(username: str, type: str = 'chat', access_token: str = ''): + auth_setting = {} + application_access_token = ApplicationAccessToken.objects.filter( + access_token=access_token + ).first() + + if not application_access_token: + raise AppApiException(1005, _('Invalid access token')) + if application_access_token: + auth_setting = application_access_token.authentication_value + max_attempts = auth_setting.get("max_attempts", 1) + need_captcha = False + if max_attempts == -1: + need_captcha = False + elif max_attempts > 0: + fail_count = cache.get(system_get_key(f'{type}_{username}'), version=system_version) or 0 + need_captcha = fail_count >= max_attempts + + if need_captcha: + chars = get_random_chars() + image = ImageCaptcha() + data = image.generate(chars) + captcha = base64.b64encode(data.getbuffer()) + cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(), + timeout=300, version=Cache_Version.CAPTCHA.get_version()) + return {'captcha': 'data:image/png;base64,' + captcha.decode()} + return {'captcha': ''} + + +