feat: add authentication settings API endpoints and enhance login logic with access token validation
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
Typos Check / Spell Check with Typos (push) Waiting to run

This commit is contained in:
wxg0103 2025-09-16 18:25:14 +08:00
parent ceb601d74a
commit 0969f70104
3 changed files with 69 additions and 14 deletions

View File

@ -190,7 +190,8 @@ class CaptchaView(APIView):
tags=[_("Chat")], # type: ignore
responses=CaptchaAPI.get_response())
def get(self, request: Request):
return result.success(CaptchaSerializer().generate())
username = request.query_params.get('username', None)
return result.success(CaptchaSerializer().generate(username, 'chat'))
class SpeechToText(APIView):

View File

@ -8,6 +8,7 @@
"""
import base64
import datetime
import json
from captcha.image import ImageCaptcha
from django.core import signing
@ -18,6 +19,7 @@ from rest_framework import serializers
from common.constants.authentication_type import AuthenticationType
from common.constants.cache_version import Cache_Version
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.exception.app_exception import AppApiException
from common.utils.common import password_encrypt, get_random_chars
from maxkb.const import CONFIG
@ -27,7 +29,11 @@ from users.models import User
class LoginRequest(serializers.Serializer):
username = serializers.CharField(required=True, max_length=64, help_text=_("Username"), label=_("Username"))
password = serializers.CharField(required=True, max_length=128, label=_("Password"))
captcha = serializers.CharField(required=True, max_length=64, label=_('captcha'))
captcha = serializers.CharField(required=False, max_length=64, label=_('captcha'), allow_null=True,
allow_blank=True)
system_version, system_get_key = Cache_Version.SYSTEM.value
class LoginResponse(serializers.Serializer):
@ -37,23 +43,70 @@ class LoginResponse(serializers.Serializer):
token = serializers.CharField(required=True, label=_("token"))
def record_login_fail(username: str, expire: int = 3600):
"""记录登录失败次数"""
if not username:
return
fail_key = system_get_key(f'system_{username}')
fail_count = cache.get(fail_key, version=system_version)
if fail_count is None:
cache.set(fail_key, 1, timeout=expire, version=system_version)
else:
cache.incr(fail_key, 1, version=system_version)
class LoginSerializer(serializers.Serializer):
@staticmethod
def login(instance):
LoginRequest(data=instance).is_valid(raise_exception=True)
username = instance.get('username')
password = instance.get('password')
captcha = instance.get('captcha')
captcha_cache = cache.get(Cache_Version.CAPTCHA.get_key(captcha=captcha.lower()),
version=Cache_Version.CAPTCHA.get_version())
if captcha_cache is None:
raise AppApiException(1005, _("Captcha code error or expiration"))
username = instance.get("username", "")
try:
LoginRequest(data=instance).is_valid(raise_exception=True)
except Exception as e:
record_login_fail(username)
raise e
auth_setting_model = DatabaseModelManage.get_model('auth_setting')
# 默认配置
auth_setting = {}
if auth_setting_model:
setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first()
if setting_obj:
try:
auth_setting = json.loads(setting_obj.param_value) or {}
except Exception:
auth_setting = {}
max_attempts = auth_setting.get("max_attempts", 0)
password = instance.get("password")
captcha = instance.get("captcha", "")
# 判断是否需要验证码
need_captcha = True
if max_attempts == -1:
need_captcha = False
elif max_attempts > 0:
fail_count = cache.get(system_get_key(f'system_{username}'), version=system_version) or 0
need_captcha = fail_count >= max_attempts
if need_captcha:
if not captcha:
raise AppApiException(1005, _("Captcha is required"))
captcha_cache = cache.get(
Cache_Version.CAPTCHA.get_key(captcha=f"system_{username}"),
version=Cache_Version.CAPTCHA.get_version()
)
if captcha_cache is None or captcha.lower() != captcha_cache:
raise AppApiException(1005, _("Captcha code error or expiration"))
user = QuerySet(User).filter(username=username, password=password_encrypt(password)).first()
if user is None:
record_login_fail(username)
raise AppApiException(500, _('The username or password is incorrect'))
if not user.is_active:
record_login_fail(username)
raise AppApiException(1005, _("The user has been disabled, please contact the administrator!"))
cache.delete(system_get_key(f'system_{username}'), version=system_version)
token = signing.dumps({'username': user.username,
'id': str(user.id),
'email': user.email,
@ -73,11 +126,11 @@ class CaptchaResponse(serializers.Serializer):
class CaptchaSerializer(serializers.Serializer):
@staticmethod
def generate():
def generate(username: str, type: str = 'system'):
chars = get_random_chars()
image = ImageCaptcha()
data = image.generate(chars)
captcha = base64.b64encode(data.getbuffer())
cache.set(Cache_Version.CAPTCHA.get_key(captcha=chars.lower()), chars,
timeout=60, version=Cache_Version.CAPTCHA.get_version())
cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(),
timeout=300, version=Cache_Version.CAPTCHA.get_version())
return {'captcha': 'data:image/png;base64,' + captcha.decode()}

View File

@ -73,4 +73,5 @@ class CaptchaView(APIView):
tags=[_("User Management")], # type: ignore
responses=CaptchaAPI.get_response())
def get(self, request: Request):
return result.success(CaptchaSerializer().generate())
username = request.query_params.get('username', None)
return result.success(CaptchaSerializer().generate(username))