# coding=utf-8 """ @project: qabot @Author:虎 @file: team_serializers.py @date:2023/9/5 16:32 @desc: """ import base64 import datetime import json import os import random import re import uuid from captcha.image import ImageCaptcha from django.conf import settings from django.core import validators, signing, cache from django.core.mail import send_mail from django.core.mail.backends.smtp import EmailBackend from django.db import transaction from django.db.models import Q, QuerySet, Prefetch from django.utils.translation import get_language from django.utils.translation import gettext_lazy as _, to_locale from drf_yasg import openapi from rest_framework import serializers from application.models import Application from common.constants.authentication_type import AuthenticationType from common.constants.exception_code_constants import ExceptionCodeConstants from common.constants.permission_constants import RoleConstants, get_permission_list_by_role from common.db.search import page_search from common.exception.app_exception import AppApiException from common.mixins.api_mixin import ApiMixin from common.models.db_model_manage import DBModelManage from common.response.result import get_api_response from common.util.common import valid_license, get_random_chars from common.util.field_message import ErrMessage from common.util.lock import lock from common.util.rsa_util import decrypt, get_key_pair_by_sql from dataset.models import DataSet, Document, Paragraph, Problem, ProblemParagraphMapping from embedding.task import delete_embedding_by_dataset_id_list from function_lib.models.function import FunctionLib from setting.models import Team, SystemSetting, SettingType, Model, TeamMember, TeamMemberPermission from smartdoc.conf import PROJECT_DIR from users.models.user import User, password_encrypt, get_user_dynamics_permission user_cache = cache.caches['user_cache'] captcha_cache = cache.caches['captcha_cache'] class CaptchaSerializer(ApiMixin, serializers.Serializer): @staticmethod def get_response_body_api(): return get_api_response(openapi.Schema( type=openapi.TYPE_STRING, title="captcha", default="xxxx", description="captcha" )) @staticmethod def generate(): chars = get_random_chars() image = ImageCaptcha() data = image.generate(chars) captcha = base64.b64encode(data.getbuffer()) captcha_cache.set(f"LOGIN:{chars.lower()}", chars, timeout=5 * 60) return 'data:image/png;base64,' + captcha.decode() class SystemSerializer(ApiMixin, serializers.Serializer): @staticmethod def get_profile(): version = os.environ.get('MAXKB_VERSION') xpack_cache = DBModelManage.get_model('xpack_cache') return {'version': version, 'IS_XPACK': hasattr(settings, 'IS_XPACK'), 'XPACK_LICENSE_IS_VALID': False if xpack_cache is None else xpack_cache.get('XPACK_LICENSE_IS_VALID', False), 'ras': get_key_pair_by_sql().get('key')} @staticmethod def get_response_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, required=[], properties={ 'version': openapi.Schema(type=openapi.TYPE_STRING, title=_("System version number"), description=_("System version number")), } ) class LoginSerializer(ApiMixin, serializers.Serializer): username = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Username"))) password = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Password"))) captcha = serializers.CharField(required=True, error_messages=ErrMessage.char(_("captcha"))) encryptedData = serializers.CharField(required=False, label=_('encryptedData'), allow_null=True, allow_blank=True) def get_user_token(self, user): """ :return: User Token (authentication information) """ token = signing.dumps({'username': user.username, 'id': str(user.id), 'email': user.email, 'type': AuthenticationType.USER.value}) return token class Meta: model = User fields = '__all__' def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['username', 'encryptedData'], properties={ 'username': openapi.Schema(type=openapi.TYPE_STRING, title=_("Username"), description=_("Username")), 'password': openapi.Schema(type=openapi.TYPE_STRING, title=_("Password"), description=_("Password")), 'captcha': openapi.Schema(type=openapi.TYPE_STRING, title=_("captcha"), description=_("captcha")), 'encryptedData': openapi.Schema(type=openapi.TYPE_STRING, title=_("encryptedData"), description=_("encryptedData")) } ) def get_response_body_api(self): return get_api_response(openapi.Schema( type=openapi.TYPE_STRING, title="token", default="xxxx", description="认证token" )) @staticmethod def login(instance): username = instance.get("username", "") encryptedData = instance.get("encryptedData", "") if encryptedData: json_data = json.loads(decrypt(encryptedData)) instance.update(json_data) LoginSerializer(data=instance).is_valid(raise_exception=True) password = instance.get("password") captcha = instance.get("captcha", "") captcha_value = captcha_cache.get(f"LOGIN:{captcha.lower()}") if captcha_value is None: raise AppApiException(1005, _("Captcha code error or expiration")) user = QuerySet(User).filter(Q(username=username, password=password_encrypt(password)) | Q(email=username, password=password_encrypt( password))).first() if user is None: raise ExceptionCodeConstants.INCORRECT_USERNAME_AND_PASSWORD.value.to_app_api_exception() if not user.is_active: raise AppApiException(1005, _("The user has been disabled, please contact the administrator!")) return user class RegisterSerializer(ApiMixin, serializers.Serializer): """ Register request object """ email = serializers.EmailField( required=True, error_messages=ErrMessage.char(_("Email")), validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) username = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Username")), max_length=20, min_length=6, validators=[ validators.RegexValidator(regex=re.compile("^.{6,20}$"), message=_("Username must be 6-20 characters long")) ]) password = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Password")), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))]) re_password = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Confirm Password")), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))]) code = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Verification code"))) class Meta: model = User fields = '__all__' @lock(lock_key=lambda this, raise_exception: ( this.initial_data.get("email") + ":register" )) def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) if self.data.get('password') != self.data.get('re_password'): raise ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.to_app_api_exception() username = self.data.get("username") email = self.data.get("email") code = self.data.get("code") code_cache_key = email + ":register" cache_code = user_cache.get(code_cache_key) if code != cache_code: raise ExceptionCodeConstants.CODE_ERROR.value.to_app_api_exception() u = QuerySet(User).filter(Q(username=username) | Q(email=email)).first() if u is not None: if u.email == email: raise ExceptionCodeConstants.EMAIL_IS_EXIST.value.to_app_api_exception() if u.username == username: raise ExceptionCodeConstants.USERNAME_IS_EXIST.value.to_app_api_exception() return True @valid_license(model=User, count=2, message=_( "The community version supports up to 2 users. If you need more users, please contact us (https://fit2cloud.com/).")) @transaction.atomic def save(self, **kwargs): m = User( **{'id': uuid.uuid1(), 'email': self.data.get("email"), 'username': self.data.get("username"), 'role': RoleConstants.USER.name}) m.set_password(self.data.get("password")) m.save() Team(**{'user': m, 'name': m.username + _("team")}).save() email = self.data.get("email") code_cache_key = email + ":register" user_cache.delete(code_cache_key) @staticmethod def get_request_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['username', 'email', 'password', 're_password', 'code'], properties={ 'username': openapi.Schema(type=openapi.TYPE_STRING, title=_("Username"), description=_("Username")), 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")), 'password': openapi.Schema(type=openapi.TYPE_STRING, title=_("Password"), description=_("Password")), 're_password': openapi.Schema(type=openapi.TYPE_STRING, title=_("Confirm Password"), description=_("Confirm Password")), 'code': openapi.Schema(type=openapi.TYPE_STRING, title=_("Verification code"), description=_("Verification code")) } ) class CheckCodeSerializer(ApiMixin, serializers.Serializer): """ 校验验证码 """ email = serializers.EmailField( required=True, error_messages=ErrMessage.char(_("Email")), validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) code = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Verification code"))) type = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Type")), validators=[ validators.RegexValidator(regex=re.compile("^register|reset_password$"), message=_( "The type only supports register|reset_password"), code=500) ]) def is_valid(self, *, raise_exception=False): super().is_valid() value = user_cache.get(self.data.get("email") + ":" + self.data.get("type")) if value is None or value != self.data.get("code"): raise ExceptionCodeConstants.CODE_ERROR.value.to_app_api_exception() return True class Meta: model = User fields = '__all__' def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'code', 'type'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")), 'code': openapi.Schema(type=openapi.TYPE_STRING, title=_("Verification code"), description=_("Verification code")), 'type': openapi.Schema(type=openapi.TYPE_STRING, title=_("Type"), description="register|reset_password") } ) def get_response_body_api(self): return get_api_response(openapi.Schema( type=openapi.TYPE_BOOLEAN, title=_('Is it successful'), default=True, description=_('Error message'))) class SwitchLanguageSerializer(serializers.Serializer): user_id = serializers.UUIDField(required=True, error_messages=ErrMessage.char(_('user id')), ) language = serializers.CharField(required=True, error_messages=ErrMessage.char(_('language'))) def switch(self): self.is_valid(raise_exception=True) language = self.data.get('language') support_language_list = ['zh-CN', 'zh-Hant', 'en-US'] if not support_language_list.__contains__(language): raise AppApiException(500, _('language only support:') + ','.join(support_language_list)) QuerySet(User).filter(id=self.data.get('user_id')).update(language=language) class RePasswordSerializer(ApiMixin, serializers.Serializer): email = serializers.EmailField( required=True, error_messages=ErrMessage.char(_("Email")), validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) code = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Verification code"))) password = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Password")), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The confirmation password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))]) re_password = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Confirm Password")), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The confirmation password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))] ) class Meta: model = User fields = '__all__' def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) email = self.data.get("email") cache_code = user_cache.get(email + ':reset_password') if self.data.get('password') != self.data.get('re_password'): raise AppApiException(ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.code, ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.message) if cache_code != self.data.get('code'): raise AppApiException(ExceptionCodeConstants.CODE_ERROR.value.code, ExceptionCodeConstants.CODE_ERROR.value.message) return True def reset_password(self): """ 修改密码 :return: 是否成功 """ if self.is_valid(): email = self.data.get("email") QuerySet(User).filter(email=email).update( password=password_encrypt(self.data.get('password'))) code_cache_key = email + ":reset_password" # 删除验证码缓存 user_cache.delete(code_cache_key) return True def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'code', "password", 're_password'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")), 'code': openapi.Schema(type=openapi.TYPE_STRING, title=_("Verification code"), description=_("Verification code")), 'password': openapi.Schema(type=openapi.TYPE_STRING, title=_("Password"), description=_("Password")), 're_password': openapi.Schema(type=openapi.TYPE_STRING, title=_("Confirm Password"), description=_("Confirm Password")) } ) class SendEmailSerializer(ApiMixin, serializers.Serializer): email = serializers.EmailField( required=True , error_messages=ErrMessage.char(_("Email")), validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) type = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Type")), validators=[ validators.RegexValidator(regex=re.compile("^register|reset_password$"), message=_("The type only supports register|reset_password"), code=500) ]) class Meta: model = User fields = '__all__' def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=raise_exception) user_exists = QuerySet(User).filter(email=self.data.get('email')).exists() if not user_exists and self.data.get('type') == 'reset_password': raise ExceptionCodeConstants.EMAIL_IS_NOT_EXIST.value.to_app_api_exception() elif user_exists and self.data.get('type') == 'register': raise ExceptionCodeConstants.EMAIL_IS_EXIST.value.to_app_api_exception() code_cache_key = self.data.get('email') + ":" + self.data.get("type") code_cache_key_lock = code_cache_key + "_lock" ttl = user_cache.ttl(code_cache_key_lock) if ttl is not None: raise AppApiException(500, _("Do not send emails again within {seconds} seconds").format( seconds=int(ttl.total_seconds()))) return True def send(self): """ 发送邮件 :return: 是否发送成功 :exception 发送失败异常 """ email = self.data.get("email") state = self.data.get("type") # 生成随机验证码 code = "".join(list(map(lambda i: random.choice(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0' ]), range(6)))) # 获取邮件模板 language = get_language() file = open( os.path.join(PROJECT_DIR, "apps", "common", 'template', f'email_template_{to_locale(language)}.html'), "r", encoding='utf-8') content = file.read() file.close() code_cache_key = email + ":" + state code_cache_key_lock = code_cache_key + "_lock" # 设置缓存 user_cache.set(code_cache_key_lock, code, timeout=datetime.timedelta(minutes=1)) system_setting = QuerySet(SystemSetting).filter(type=SettingType.EMAIL.value).first() if system_setting is None: user_cache.delete(code_cache_key_lock) raise AppApiException(1004, _("The email service has not been set up. Please contact the administrator to set up the email service in [Email Settings].")) try: connection = EmailBackend(system_setting.meta.get("email_host"), system_setting.meta.get('email_port'), system_setting.meta.get('email_host_user'), system_setting.meta.get('email_host_password'), system_setting.meta.get('email_use_tls'), False, system_setting.meta.get('email_use_ssl') ) # 发送邮件 send_mail(_('【Intelligent knowledge base question and answer system-{action}】').format( action=_('User registration') if state == 'register' else _('Change password')), '', html_message=f'{content.replace("${code}", code)}', from_email=system_setting.meta.get('from_email'), recipient_list=[email], fail_silently=False, connection=connection) except Exception as e: user_cache.delete(code_cache_key_lock) raise AppApiException(500, f"{str(e)}" + _("Email sending failed")) user_cache.set(code_cache_key, code, timeout=datetime.timedelta(minutes=30)) return True def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'type'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_('Email')), 'type': openapi.Schema(type=openapi.TYPE_STRING, title=_('Type'), description="register|reset_password") } ) def get_response_body_api(self): return get_api_response(openapi.Schema(type=openapi.TYPE_STRING, default=True)) class UserProfile(ApiMixin): @staticmethod def get_user_profile(user: User): """ 获取用户详情 :param user: 用户对象 :return: """ permission_list = get_user_dynamics_permission(str(user.id)) permission_list += [p.value for p in get_permission_list_by_role(RoleConstants[user.role])] return {'id': user.id, 'username': user.username, 'email': user.email, 'role': user.role, 'permissions': [str(p) for p in permission_list], 'is_edit_password': user.password == 'd880e722c47a34d8e9fce789fc62389d' if user.role == 'ADMIN' else False, 'language': user.language} @staticmethod def get_response_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['id', 'username', 'email', 'role', 'is_active'], properties={ 'id': openapi.Schema(type=openapi.TYPE_STRING, title="ID", description="ID"), 'username': openapi.Schema(type=openapi.TYPE_STRING, title=_("Username"), description=_("Username")), 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")), 'role': openapi.Schema(type=openapi.TYPE_STRING, title=_("Role"), description=_("Role")), 'is_active': openapi.Schema(type=openapi.TYPE_STRING, title=_("Is active"), description=_("Is active")), "permissions": openapi.Schema(type=openapi.TYPE_ARRAY, title=_("Permissions"), description=_("Permissions"), items=openapi.Schema(type=openapi.TYPE_STRING)) } ) class UserSerializer(ApiMixin, serializers.ModelSerializer): class Meta: model = User fields = ["email", "id", "username", ] def get_response_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['id', 'username', 'email', 'role', 'is_active'], properties={ 'id': openapi.Schema(type=openapi.TYPE_STRING, title="ID", description="ID"), 'username': openapi.Schema(type=openapi.TYPE_STRING, title=_("Username"), description=_("Username")), 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")), 'role': openapi.Schema(type=openapi.TYPE_STRING, title=_("Role"), description=_("Role")), 'is_active': openapi.Schema(type=openapi.TYPE_STRING, title=_("Is active"), description=_("Is active")) } ) class Query(ApiMixin, serializers.Serializer): email_or_username = serializers.CharField(required=True) @staticmethod def get_request_params_api(): return [openapi.Parameter(name='email_or_username', in_=openapi.IN_QUERY, type=openapi.TYPE_STRING, required=True, description=_("Email or username"))] @staticmethod def get_response_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['username', 'email', 'id'], properties={ 'id': openapi.Schema(type=openapi.TYPE_STRING, title='ID', description="ID"), 'username': openapi.Schema(type=openapi.TYPE_STRING, title=_("Username"), description=_("Username")), 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")) } ) def list(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) email_or_username = self.data.get('email_or_username') return [{'id': user_model.id, 'username': user_model.username, 'email': user_model.email} for user_model in QuerySet(User).filter(Q(username=email_or_username) | Q(email=email_or_username))] def listByType(self, type, user_id): teamIds = TeamMember.objects.filter(user_id=user_id).values_list('id', flat=True) targets = TeamMemberPermission.objects.filter( member_id__in=teamIds, auth_target_type=type, operate__contains=['USE'] ).values_list('target', flat=True) prefetch_users = Prefetch('user', queryset=User.objects.only('id', 'username')) user_list = [] if type == 'DATASET': user_list = DataSet.objects.filter( Q(id__in=targets) | Q(user_id=user_id) ).prefetch_related(prefetch_users).distinct('user_id') elif type == 'APPLICATION': user_list = Application.objects.filter( Q(id__in=targets) | Q(user_id=user_id) ).prefetch_related(prefetch_users).distinct('user_id') elif type == 'FUNCTION': user_list = FunctionLib.objects.filter( Q(permission_type='PUBLIC') | Q(user_id=user_id) ).prefetch_related(prefetch_users).distinct('user_id') other_users = [ {'id': app.user.id, 'username': app.user.username} for app in user_list if app.user.id != user_id ] users = [ {'id': 'all', 'username': _('All')}, {'id': user_id, 'username': _('Me')} ] users.extend(other_users) return users class UserInstanceSerializer(ApiMixin, serializers.ModelSerializer): class Meta: model = User fields = ['id', 'username', 'email', 'phone', 'is_active', 'role', 'nick_name', 'create_time', 'update_time', 'source'] @staticmethod def get_response_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['id', 'username', 'email', 'phone', 'is_active', 'role', 'nick_name', 'create_time', 'update_time'], properties={ 'id': openapi.Schema(type=openapi.TYPE_STRING, title="ID", description="ID"), 'username': openapi.Schema(type=openapi.TYPE_STRING, title=_("Username"), description=_("Username")), 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")), 'phone': openapi.Schema(type=openapi.TYPE_STRING, title=_("Phone"), description=_("Phone")), 'is_active': openapi.Schema(type=openapi.TYPE_BOOLEAN, title=_("Is active"), description=_("Is active")), 'role': openapi.Schema(type=openapi.TYPE_STRING, title=_("Role"), description=_("Role")), 'source': openapi.Schema(type=openapi.TYPE_STRING, title=_("Source"), description=_("Source")), 'nick_name': openapi.Schema(type=openapi.TYPE_STRING, title=_("Name"), description=_("Name")), 'create_time': openapi.Schema(type=openapi.TYPE_STRING, title=_("Create time"), description=_("Create time")), 'update_time': openapi.Schema(type=openapi.TYPE_STRING, title=_("Update time"), description=_("Update time")) } ) @staticmethod def get_request_params_api(): return [openapi.Parameter(name='user_id', in_=openapi.IN_PATH, type=openapi.TYPE_STRING, required=True, description='ID') ] class UserManageSerializer(serializers.Serializer): class Query(ApiMixin, serializers.Serializer): email_or_username = serializers.CharField(required=False, allow_null=True, error_messages=ErrMessage.char(_('Email or username'))) @staticmethod def get_request_params_api(): return [openapi.Parameter(name='email_or_username', in_=openapi.IN_QUERY, type=openapi.TYPE_STRING, required=False, description=_("Email or username"))] @staticmethod def get_response_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['username', 'email', 'id'], properties={ 'id': openapi.Schema(type=openapi.TYPE_STRING, title='ID', description="ID"), 'username': openapi.Schema(type=openapi.TYPE_STRING, title=_("Username"), description=_("Username")), 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")) } ) def get_query_set(self): email_or_username = self.data.get('email_or_username') query_set = QuerySet(User) if email_or_username is not None: query_set = query_set.filter( Q(username__contains=email_or_username) | Q(email__contains=email_or_username)) query_set = query_set.order_by("-create_time") return query_set def list(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) return [{'id': user_model.id, 'username': user_model.username, 'email': user_model.email} for user_model in self.get_query_set()] def page(self, current_page: int, page_size: int, with_valid=True): if with_valid: self.is_valid(raise_exception=True) return page_search(current_page, page_size, self.get_query_set(), post_records_handler=lambda u: UserInstanceSerializer(u).data) class UserInstance(ApiMixin, serializers.Serializer): email = serializers.EmailField( required=True, error_messages=ErrMessage.char(_("Email")), validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) username = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Username")), max_length=20, min_length=6, validators=[ validators.RegexValidator(regex=re.compile("^.{6,20}$"), message=_( 'Username must be 6-20 characters long')) ]) password = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Password")), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))]) nick_name = serializers.CharField(required=False, error_messages=ErrMessage.char(_("Name")), max_length=64, allow_null=True, allow_blank=True) phone = serializers.CharField(required=False, error_messages=ErrMessage.char(_("Phone")), max_length=20, allow_null=True, allow_blank=True) def is_valid(self, *, raise_exception=True): super().is_valid(raise_exception=True) username = self.data.get('username') email = self.data.get('email') u = QuerySet(User).filter(Q(username=username) | Q(email=email)).first() if u is not None: if u.email == email: raise ExceptionCodeConstants.EMAIL_IS_EXIST.value.to_app_api_exception() if u.username == username: raise ExceptionCodeConstants.USERNAME_IS_EXIST.value.to_app_api_exception() @staticmethod def get_request_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['username', 'email', 'password'], properties={ 'username': openapi.Schema(type=openapi.TYPE_STRING, title=_("Username"), description=_("Username")), 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")), 'password': openapi.Schema(type=openapi.TYPE_STRING, title=_("Password"), description=_("Password")), 'phone': openapi.Schema(type=openapi.TYPE_STRING, title=_("Phone"), description=_("Phone")), 'nick_name': openapi.Schema(type=openapi.TYPE_STRING, title=_("Name"), description=_("Name")) } ) class UserEditInstance(ApiMixin, serializers.Serializer): email = serializers.EmailField( required=False, error_messages=ErrMessage.char(_("Email")), validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) nick_name = serializers.CharField(required=False, error_messages=ErrMessage.char(_("Name")), max_length=64, allow_null=True, allow_blank=True) phone = serializers.CharField(required=False, error_messages=ErrMessage.char(_("Phone")), max_length=20, allow_null=True, allow_blank=True) is_active = serializers.BooleanField(required=False, error_messages=ErrMessage.char(_("Is active"))) def is_valid(self, *, user_id=None, raise_exception=False): super().is_valid(raise_exception=True) if self.data.get('email') is not None and QuerySet(User).filter(email=self.data.get('email')).exclude( id=user_id).exists(): raise AppApiException(1004, _('Email is already in use')) @staticmethod def get_request_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING, title=_("Email"), description=_("Email")), 'nick_name': openapi.Schema(type=openapi.TYPE_STRING, title=_("Name"), description=_("Name")), 'phone': openapi.Schema(type=openapi.TYPE_STRING, title=_("Phone"), description=_("Phone")), 'is_active': openapi.Schema(type=openapi.TYPE_BOOLEAN, title=_("Is active"), description=_("Is active")), } ) class RePasswordInstance(ApiMixin, serializers.Serializer): password = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Password")), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))]) re_password = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Confirm Password")), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The confirmation password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))] ) @staticmethod def get_request_body_api(): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['password', 're_password'], properties={ 'password': openapi.Schema(type=openapi.TYPE_STRING, title=_("Password"), description=_("Password")), 're_password': openapi.Schema(type=openapi.TYPE_STRING, title=_("Confirm Password"), description=_("Confirm Password")), } ) def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) if self.data.get('password') != self.data.get('re_password'): raise ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.to_app_api_exception() @valid_license(model=User, count=2, message=_( 'The community version supports up to 2 users. If you need more users, please contact us (https://fit2cloud.com/).')) @transaction.atomic def save(self, instance, with_valid=True): if with_valid: UserManageSerializer.UserInstance(data=instance).is_valid(raise_exception=True) user = User(id=uuid.uuid1(), email=instance.get('email'), phone="" if instance.get('phone') is None else instance.get('phone'), nick_name="" if instance.get('nick_name') is None else instance.get('nick_name') , username=instance.get('username'), password=password_encrypt(instance.get('password')), role=RoleConstants.USER.name, source="LOCAL", is_active=True) user.save() # 初始化用户团队 Team(**{'user': user, 'name': user.username + _('team')}).save() return UserInstanceSerializer(user).data class Operate(serializers.Serializer): id = serializers.UUIDField(required=True, error_messages=ErrMessage.char("ID")) def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) if not QuerySet(User).filter(id=self.data.get('id')).exists(): raise AppApiException(1004, _('User does not exist')) @transaction.atomic def delete(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) user = QuerySet(User).filter(id=self.data.get('id')).first() if user.role == RoleConstants.ADMIN.name: raise AppApiException(1004, _('Unable to delete administrator')) user_id = self.data.get('id') team_member_list = QuerySet(TeamMember).filter(Q(user_id=user_id) | Q(team_id=user_id)) # 删除团队成员权限 QuerySet(TeamMemberPermission).filter( member_id__in=[team_member.id for team_member in team_member_list]).delete() # 删除团队成员 team_member_list.delete() # 删除应用相关 因为应用相关都是级联删除所以不需要手动删除 QuerySet(Application).filter(user_id=self.data.get('id')).delete() # 删除数据集相关 dataset_list = QuerySet(DataSet).filter(user_id=self.data.get('id')) dataset_id_list = [str(dataset.id) for dataset in dataset_list] QuerySet(Document).filter(dataset_id__in=dataset_id_list).delete() QuerySet(Paragraph).filter(dataset_id__in=dataset_id_list).delete() QuerySet(ProblemParagraphMapping).filter(dataset_id__in=dataset_id_list).delete() QuerySet(Problem).filter(dataset_id__in=dataset_id_list).delete() delete_embedding_by_dataset_id_list(dataset_id_list) dataset_list.delete() # 删除团队 QuerySet(Team).filter(user_id=self.data.get('id')).delete() # 删除模型 QuerySet(Model).filter(user_id=self.data.get('id')).delete() # 删除用户 QuerySet(User).filter(id=self.data.get('id')).delete() return True def edit(self, instance, with_valid=True): if with_valid: self.is_valid(raise_exception=True) UserManageSerializer.UserEditInstance(data=instance).is_valid(user_id=self.data.get('id'), raise_exception=True) user = QuerySet(User).filter(id=self.data.get('id')).first() if user.role == RoleConstants.ADMIN.name and 'is_active' in instance and instance.get( 'is_active') is not None: raise AppApiException(1004, _('Cannot modify administrator status')) update_keys = ['email', 'nick_name', 'phone', 'is_active'] for update_key in update_keys: if update_key in instance and instance.get(update_key) is not None: user.__setattr__(update_key, instance.get(update_key)) user.save() return UserInstanceSerializer(user).data def one(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) user = QuerySet(User).filter(id=self.data.get('id')).first() return UserInstanceSerializer(user).data def re_password(self, instance, with_valid=True): if with_valid: self.is_valid(raise_exception=True) UserManageSerializer.RePasswordInstance(data=instance).is_valid(raise_exception=True) user = QuerySet(User).filter(id=self.data.get('id')).first() user.password = password_encrypt(instance.get('password')) user.save() return True