From 0969f701049acbd2caf41e9090a17968a585d7f9 Mon Sep 17 00:00:00 2001 From: wxg0103 <727495428@qq.com> Date: Tue, 16 Sep 2025 18:25:14 +0800 Subject: [PATCH] feat: add authentication settings API endpoints and enhance login logic with access token validation --- apps/chat/views/chat.py | 3 +- apps/users/serializers/login.py | 77 ++++++++++++++++++++++++++++----- apps/users/views/login.py | 3 +- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/apps/chat/views/chat.py b/apps/chat/views/chat.py index 82c8bd8d0..8a59e0060 100644 --- a/apps/chat/views/chat.py +++ b/apps/chat/views/chat.py @@ -190,7 +190,8 @@ class CaptchaView(APIView): tags=[_("Chat")], # type: ignore responses=CaptchaAPI.get_response()) def get(self, request: Request): - return result.success(CaptchaSerializer().generate()) + username = request.query_params.get('username', None) + return result.success(CaptchaSerializer().generate(username, 'chat')) class SpeechToText(APIView): diff --git a/apps/users/serializers/login.py b/apps/users/serializers/login.py index deef0045c..9381f289c 100644 --- a/apps/users/serializers/login.py +++ b/apps/users/serializers/login.py @@ -8,6 +8,7 @@ """ import base64 import datetime +import json from captcha.image import ImageCaptcha from django.core import signing @@ -18,6 +19,7 @@ from rest_framework import serializers from common.constants.authentication_type import AuthenticationType from common.constants.cache_version import Cache_Version +from common.database_model_manage.database_model_manage import DatabaseModelManage from common.exception.app_exception import AppApiException from common.utils.common import password_encrypt, get_random_chars from maxkb.const import CONFIG @@ -27,7 +29,11 @@ from users.models import User class LoginRequest(serializers.Serializer): username = serializers.CharField(required=True, max_length=64, help_text=_("Username"), label=_("Username")) password = serializers.CharField(required=True, max_length=128, label=_("Password")) - captcha = serializers.CharField(required=True, max_length=64, label=_('captcha')) + captcha = serializers.CharField(required=False, max_length=64, label=_('captcha'), allow_null=True, + allow_blank=True) + + +system_version, system_get_key = Cache_Version.SYSTEM.value class LoginResponse(serializers.Serializer): @@ -37,23 +43,70 @@ class LoginResponse(serializers.Serializer): token = serializers.CharField(required=True, label=_("token")) +def record_login_fail(username: str, expire: int = 3600): + """记录登录失败次数""" + if not username: + return + fail_key = system_get_key(f'system_{username}') + fail_count = cache.get(fail_key, version=system_version) + if fail_count is None: + cache.set(fail_key, 1, timeout=expire, version=system_version) + else: + cache.incr(fail_key, 1, version=system_version) + + class LoginSerializer(serializers.Serializer): @staticmethod def login(instance): - LoginRequest(data=instance).is_valid(raise_exception=True) - username = instance.get('username') - password = instance.get('password') - captcha = instance.get('captcha') - captcha_cache = cache.get(Cache_Version.CAPTCHA.get_key(captcha=captcha.lower()), - version=Cache_Version.CAPTCHA.get_version()) - if captcha_cache is None: - raise AppApiException(1005, _("Captcha code error or expiration")) + username = instance.get("username", "") + try: + LoginRequest(data=instance).is_valid(raise_exception=True) + except Exception as e: + record_login_fail(username) + raise e + auth_setting_model = DatabaseModelManage.get_model('auth_setting') + # 默认配置 + auth_setting = {} + if auth_setting_model: + setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first() + if setting_obj: + try: + auth_setting = json.loads(setting_obj.param_value) or {} + except Exception: + auth_setting = {} + + max_attempts = auth_setting.get("max_attempts", 0) + password = instance.get("password") + captcha = instance.get("captcha", "") + + # 判断是否需要验证码 + need_captcha = True + if max_attempts == -1: + need_captcha = False + elif max_attempts > 0: + fail_count = cache.get(system_get_key(f'system_{username}'), version=system_version) or 0 + need_captcha = fail_count >= max_attempts + + if need_captcha: + if not captcha: + raise AppApiException(1005, _("Captcha is required")) + + captcha_cache = cache.get( + Cache_Version.CAPTCHA.get_key(captcha=f"system_{username}"), + version=Cache_Version.CAPTCHA.get_version() + ) + if captcha_cache is None or captcha.lower() != captcha_cache: + raise AppApiException(1005, _("Captcha code error or expiration")) + user = QuerySet(User).filter(username=username, password=password_encrypt(password)).first() if user is None: + record_login_fail(username) raise AppApiException(500, _('The username or password is incorrect')) if not user.is_active: + record_login_fail(username) raise AppApiException(1005, _("The user has been disabled, please contact the administrator!")) + cache.delete(system_get_key(f'system_{username}'), version=system_version) token = signing.dumps({'username': user.username, 'id': str(user.id), 'email': user.email, @@ -73,11 +126,11 @@ class CaptchaResponse(serializers.Serializer): class CaptchaSerializer(serializers.Serializer): @staticmethod - def generate(): + def generate(username: str, type: str = 'system'): chars = get_random_chars() image = ImageCaptcha() data = image.generate(chars) captcha = base64.b64encode(data.getbuffer()) - cache.set(Cache_Version.CAPTCHA.get_key(captcha=chars.lower()), chars, - timeout=60, version=Cache_Version.CAPTCHA.get_version()) + cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(), + timeout=300, version=Cache_Version.CAPTCHA.get_version()) return {'captcha': 'data:image/png;base64,' + captcha.decode()} diff --git a/apps/users/views/login.py b/apps/users/views/login.py index 4bdbba1f0..03bf8a0a4 100644 --- a/apps/users/views/login.py +++ b/apps/users/views/login.py @@ -73,4 +73,5 @@ class CaptchaView(APIView): tags=[_("User Management")], # type: ignore responses=CaptchaAPI.get_response()) def get(self, request: Request): - return result.success(CaptchaSerializer().generate()) + username = request.query_params.get('username', None) + return result.success(CaptchaSerializer().generate(username))