# coding=utf-8 """ @project: MaxKB @Author:虎虎 @file: user.py @date:2025/4/14 19:18 @desc: """ import datetime import os import random import re from collections import defaultdict from django.core.cache import cache from django.core.mail.backends.smtp import EmailBackend from django.db import transaction from django.db.models import Q, QuerySet from rest_framework import serializers import uuid_utils.compat as uuid from common.constants.cache_version import Cache_Version from common.constants.exception_code_constants import ExceptionCodeConstants from common.constants.permission_constants import RoleConstants, Auth from common.database_model_manage.database_model_manage import DatabaseModelManage from common.db.search import page_search from common.exception.app_exception import AppApiException from common.utils.common import valid_license, password_encrypt from maxkb.conf import PROJECT_DIR from system_manage.models import SystemSetting, SettingType from users.models import User from django.utils.translation import gettext_lazy as _, to_locale from django.core import validators from django.core.mail import send_mail from django.utils.translation import get_language PASSWORD_REGEX = re.compile( r"^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" r"(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$" ) version, get_key = Cache_Version.SYSTEM.value class UserProfileResponse(serializers.ModelSerializer): is_edit_password = serializers.BooleanField(required=True, label=_('Is Edit Password')) permissions = serializers.ListField(required=True, label=_('permissions')) class Meta: model = User fields = ['id', 'username', 'nick_name', 'email', 'role', 'permissions', 'language', 'is_edit_password'] class CreateUserSerializer(serializers.Serializer): username = serializers.CharField(required=True, label=_('Username')) password = serializers.CharField(required=True, label=_('Password')) email = serializers.EmailField(required=True, label=_('Email')) nick_name = serializers.CharField(required=False, label=_('Nick name')) phone = serializers.CharField(required=False, label=_('Phone')) def is_workspace_manage(user_id: str, workspace_id: str): workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping") role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model") is_x_pack_ee = workspace_user_role_mapping_model is not None and role_permission_mapping_model is not None if is_x_pack_ee: return QuerySet(workspace_user_role_mapping_model).select_related('role', 'user').filter( workspace_id=workspace_id, user_id=user_id, role__type=RoleConstants.WORKSPACE_MANAGE.value.__str__()).exists() return QuerySet(User).filter(id=user_id, role=RoleConstants.ADMIN.value.__str__()).exists() def get_workspace_list_by_user(user_id): get_workspace_list = DatabaseModelManage.get_model('get_workspace_list_by_user') license_is_valid = DatabaseModelManage.get_model('license_is_valid') or (lambda: False) if get_workspace_list is not None and license_is_valid(): return get_workspace_list(user_id) return [{'id': 'default', 'name': 'default'}] class UserProfileSerializer(serializers.Serializer): @staticmethod def profile(user: User, auth: Auth): """ 获取用户详情 @param user: 用户对象 @param auth: 认证对象 @return: """ workspace_list = get_workspace_list_by_user(user.id) return { 'id': user.id, 'username': user.username, 'nick_name': user.nick_name, 'email': user.email, 'role': auth.role_list, 'permissions': auth.permission_list, 'is_edit_password': user.role == RoleConstants.ADMIN.name and user.password == 'd880e722c47a34d8e9fce789fc62389d', 'language': user.language, 'workspace_list': workspace_list } class UserInstanceSerializer(serializers.ModelSerializer): class Meta: model = User fields = ['id', 'username', 'email', 'phone', 'is_active', 'role', 'nick_name', 'create_time', 'update_time', 'source'] class UserManageSerializer(serializers.Serializer): class UserInstance(serializers.Serializer): email = serializers.EmailField( required=True, label=_("Email"), validators=[validators.EmailValidator( message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code )] ) username = serializers.CharField( required=True, label=_("Username"), max_length=20, min_length=4, validators=[ validators.RegexValidator( regex=re.compile("^.{4,20}$"), message=_('Username must be 4-20 characters long') ) ] ) password = serializers.CharField( required=True, label=_("Password"), max_length=20, min_length=6, validators=[ validators.RegexValidator( regex=PASSWORD_REGEX, message=_( "The password must be 6-20 characters long and must be a combination of letters, numbers, and special characters." ) ) ] ) nick_name = serializers.CharField( required=True, label=_("Nick name"), max_length=20, ) phone = serializers.CharField( required=False, label=_("Phone"), max_length=20, allow_null=True, allow_blank=True ) def is_valid(self, *, raise_exception=True): super().is_valid(raise_exception=True) self._check_unique_username_and_email() def _check_unique_username_and_email(self): username = self.data.get('username') email = self.data.get('email') nick_name = self.data.get('nick_name') user = User.objects.filter(Q(username=username) | Q(email=email) | Q(nick_name=nick_name)).first() if user: if user.email == email: raise ExceptionCodeConstants.EMAIL_IS_EXIST.value.to_app_api_exception() if user.username == username: raise ExceptionCodeConstants.USERNAME_IS_EXIST.value.to_app_api_exception() if user.nick_name == nick_name: raise ExceptionCodeConstants.NICKNAME_IS_EXIST.value.to_app_api_exception() class Query(serializers.Serializer): email_or_username = serializers.CharField(required=False, allow_null=True, label=_('Email or username')) def get_query_set(self): email_or_username = self.data.get('email_or_username') query_set = QuerySet(User) if email_or_username is not None: query_set = query_set.filter( Q(username__contains=email_or_username) | Q(email__contains=email_or_username) | Q( nick_name__contains=email_or_username)) query_set = query_set.order_by("-create_time") return query_set def list(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) return [{'id': user_model.id, 'username': user_model.username, 'email': user_model.email} for user_model in self.get_query_set()] def page(self, current_page: int, page_size: int, with_valid=True): if with_valid: self.is_valid(raise_exception=True) result = page_search(current_page, page_size, self.get_query_set(), post_records_handler=lambda u: UserInstanceSerializer(u).data) role_model = DatabaseModelManage.get_model("role_model") user_role_relation_model = DatabaseModelManage.get_model("workspace_user_role_mapping") def _get_user_roles(user_ids): if not (role_model and user_role_relation_model): return {} # 获取所有相关角色关系,并预加载角色信息 # 获取所有相关角色关系,并预加载角色信息 user_role_relations = ( user_role_relation_model.objects .filter(user_id__in=user_ids) .select_related('role') .distinct('user_id', 'role_id', 'workspace_id') # 确保组合唯一性 ) # 构建用户ID到角色名称列表的映射 user_role_mapping = defaultdict(list) # 构建用户ID到角色ID与工作空间ID映射 user_role_setting_mapping = defaultdict(lambda: defaultdict(list)) for relation in user_role_relations: user_id = str(relation.user_id) role_id = relation.role_id workspace_id = relation.workspace_id user_role_mapping[user_id].append(relation.role.role_name) user_role_setting_mapping[user_id][role_id].append(workspace_id) # 转换为所需的结构 result_user_role_setting_mapping = { user_id: [{"role_id": role_id, "workspace_ids": workspace_ids} for role_id, workspace_ids in roles.items()] for user_id, roles in user_role_setting_mapping.items() } return user_role_mapping, result_user_role_setting_mapping if role_model and user_role_relation_model: user_ids = [user['id'] for user in result['records']] user_role_mapping, user_role_setting_mapping = _get_user_roles(user_ids) # 将角色信息添加回用户数据中 for user in result['records']: user_id = str(user['id']) user['role'] = user_role_mapping.get(user_id, []) user['role_setting'] = user_role_setting_mapping.get(user_id, []) return result @valid_license(model=User, count=2, message=_( 'The community version supports up to 2 users. If you need more users, please contact us (https://fit2cloud.com/).')) @transaction.atomic def save(self, instance, with_valid=True): if with_valid: self.UserInstance(data=instance).is_valid(raise_exception=True) user = User( id=uuid.uuid7(), email=instance.get('email'), phone=instance.get('phone', ''), nick_name=instance.get('nick_name', ''), username=instance.get('username'), password=password_encrypt(instance.get('password')), role=RoleConstants.USER.name, source="LOCAL", is_active=True ) update_user_role(instance, user) user.save() return UserInstanceSerializer(user).data class UserEditInstance(serializers.Serializer): email = serializers.EmailField( required=False, label=_("Email"), validators=[validators.EmailValidator( message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code )] ) nick_name = serializers.CharField( required=False, label=_("Name"), max_length=20, ) phone = serializers.CharField( required=False, label=_("Phone"), max_length=20, allow_null=True, allow_blank=True ) is_active = serializers.BooleanField( required=False, label=_("Is Active") ) def is_valid(self, *, user_id=None, raise_exception=False): super().is_valid(raise_exception=True) self._check_unique_email(user_id) self._check_unique_nick_name(user_id) def _check_unique_nick_name(self, user_id): nick_name = self.data.get('nick_name') if nick_name and User.objects.filter(nick_name=nick_name).exclude(id=user_id).exists(): raise AppApiException(1008, _('Nickname is already in use')) def _check_unique_email(self, user_id): email = self.data.get('email') if email and User.objects.filter(email=email).exclude(id=user_id).exists(): raise AppApiException(1004, _('Email is already in use')) class RePasswordInstance(serializers.Serializer): password = serializers.CharField( required=True, label=_("Password"), max_length=20, min_length=6, validators=[ validators.RegexValidator( regex=PASSWORD_REGEX, message=_( "The password must be 6-20 characters long and must be a combination of letters, numbers, and special characters." ) ) ] ) re_password = serializers.CharField( required=True, label=_("Re Password"), validators=[ validators.RegexValidator( regex=PASSWORD_REGEX, message=_( "The confirmation password must be 6-20 characters long and must be a combination of letters, numbers, and special characters." ) ) ] ) def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) self._check_passwords_match() def _check_passwords_match(self): if self.data.get('password') != self.data.get('re_password'): raise ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.to_app_api_exception() class Operate(serializers.Serializer): id = serializers.UUIDField(required=True, label=_('User ID')) def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) self._check_user_exists() def _check_user_exists(self): if not User.objects.filter(id=self.data.get('id')).exists(): raise AppApiException(1004, _('User does not exist')) @transaction.atomic def delete(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) self._check_not_admin() user_id = self.data.get('id') # TODO 需要删除授权关系 User.objects.filter(id=user_id).delete() return True def _check_not_admin(self): user = User.objects.filter(id=self.data.get('id')).first() if user.role == RoleConstants.ADMIN.name: raise AppApiException(1004, _('Unable to delete administrator')) def edit(self, instance, with_valid=True): if with_valid: self.is_valid(raise_exception=True) UserManageSerializer.UserEditInstance(data=instance).is_valid(user_id=self.data.get('id'), raise_exception=True) user = User.objects.filter(id=self.data.get('id')).first() self._check_admin_modification(user, instance) self._update_user_fields(user, instance) update_user_role(instance, user) user.save() return UserInstanceSerializer(user).data @staticmethod def _check_admin_modification(user, instance): if user.role == RoleConstants.ADMIN.name and 'is_active' in instance and instance.get( 'is_active') is not None: raise AppApiException(1004, _('Cannot modify administrator status')) @staticmethod def _update_user_fields(user, instance): update_keys = ['email', 'nick_name', 'phone', 'is_active'] for key in update_keys: if key in instance and instance.get(key) is not None: setattr(user, key, instance.get(key)) def one(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) user = User.objects.filter(id=self.data.get('id')).first() workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping") if workspace_user_role_mapping_model: role_setting = {} workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter( user_id=user.id) for workspace_user_role_mapping in workspace_user_role_mapping_list: role_id = workspace_user_role_mapping.role_id workspace_id = workspace_user_role_mapping.workspace_id if role_id not in role_setting: role_setting[role_id] = [] role_setting[role_id].append(workspace_id) return { 'id': user.id, 'username': user.username, 'email': user.email, 'phone': user.phone, 'nick_name': user.nick_name, 'is_active': user.is_active, 'role_setting': role_setting } return UserInstanceSerializer(user).data def re_password(self, instance, with_valid=True): if with_valid: self.is_valid(raise_exception=True) UserManageSerializer.RePasswordInstance(data=instance).is_valid(raise_exception=True) user = User.objects.filter(id=self.data.get('id')).first() user.password = password_encrypt(instance.get('password')) user.save() return True def get_user_list(self, workspace_id): """ 获取用户列表 :param workspace_id: 工作空间ID :return: 用户列表 """ workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping") if workspace_user_role_mapping_model: user_ids = ( workspace_user_role_mapping_model.objects .filter(workspace_id=workspace_id) .values_list('user_id', flat=True) .distinct() ) else: user_ids = User.objects.values_list('id', flat=True) users = User.objects.filter(id__in=user_ids).values('id', 'nick_name') return list(users) def get_user_members(self, workspace_id): """ 获取工作空间成员列表 :param workspace_id: 工作空间ID :return: 成员列表 """ role_model = DatabaseModelManage.get_model("role_model") user_role_relation_model = DatabaseModelManage.get_model("workspace_user_role_mapping") if user_role_relation_model and role_model: user_role_relations = ( user_role_relation_model.objects .filter(workspace_id=workspace_id, role__type='USER') .exclude(user__in=user_role_relation_model.objects .filter(workspace_id=workspace_id) .exclude(role__type='USER') .values_list('user', flat=True)) .select_related('role', 'user') ) user_dict = {} for relation in user_role_relations: user_id = relation.user.id if user_id not in user_dict: user_dict[user_id] = { 'id': user_id, 'nick_name': relation.user.nick_name, 'email': relation.user.email, 'roles': [relation.role.role_name] } else: user_dict[user_id]['roles'].append(relation.role.role_name) # 将字典值转换为列表形式 return list(user_dict.values()) user_list = User.objects.exclude(role=RoleConstants.ADMIN.name) return [ { 'id': user.id, 'nick_name': user.nick_name, 'email': user.email, 'roles': [RoleConstants.USER.name] } for user in user_list ] class BatchDelete(serializers.Serializer): ids = serializers.ListField(required=True, label=_('User IDs')) def batch_delete(self, with_valid=True): if with_valid: self.is_valid(raise_exception=True) ids = self.data.get('ids') if not ids: raise AppApiException(1004, _('User IDs cannot be empty')) User.objects.filter(id__in=ids).delete() return True def get_all_user_list(self): users = User.objects.all().values('id', 'nick_name', 'username') return list(users) def update_user_role(instance, user): workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping") if workspace_user_role_mapping_model: role_setting = instance.get('role_setting') if not role_setting: return workspace_user_role_mapping_model.objects.filter(user_id=user.id).delete() relations = set() for item in role_setting: role_id = item['role_id'] workspace_ids = item['workspace_ids'] if item['workspace_ids'] else ['None'] for workspace_id in workspace_ids: relations.add((role_id, workspace_id)) for role_id, workspace_id in relations: workspace_user_role_mapping_model.objects.create( id=uuid.uuid7(), role_id=role_id, workspace_id=workspace_id, user_id=user.id ) permission_version, permission_get_key = Cache_Version.PERMISSION_LIST.value cache.delete(permission_get_key(str(user.id)), version=permission_version) class RePasswordSerializer(serializers.Serializer): password = serializers.CharField(required=True, label=_("Password"), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The confirmation password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))]) re_password = serializers.CharField(required=True, label=_("Confirm Password"), validators=[validators.RegexValidator(regex=re.compile( "^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)" "(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$") , message=_( "The confirmation password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."))] ) class Meta: model = User fields = '__all__' def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) if self.data.get('password') != self.data.get('re_password'): raise AppApiException(ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.code, ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.message) return True def reset_password(self, user_id: str): """ 修改密码 :return: 是否成功 """ if self.is_valid(): QuerySet(User).filter(id=user_id).update( password=password_encrypt(self.data.get('password'))) return True class SendEmailSerializer(serializers.Serializer): email = serializers.EmailField( required=True , label=_("Email"), validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) type = serializers.CharField(required=True, label=_("Type"), validators=[ validators.RegexValidator(regex=re.compile("^register|reset_password$"), message=_("The type only supports register|reset_password"), code=500) ]) class Meta: model = User fields = '__all__' def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=raise_exception) user_exists = QuerySet(User).filter(email=self.data.get('email')).exists() if not user_exists and self.data.get('type') == 'reset_password': raise ExceptionCodeConstants.EMAIL_IS_NOT_EXIST.value.to_app_api_exception() elif user_exists and self.data.get('type') == 'register': raise ExceptionCodeConstants.EMAIL_IS_EXIST.value.to_app_api_exception() code_cache_key = self.data.get('email') + ":" + self.data.get("type") code_cache_key_lock = code_cache_key + "_lock" ttl = cache.ttl(code_cache_key_lock) if ttl is not None: raise AppApiException(500, _("Do not send emails again within {seconds} seconds").format( seconds=int(ttl.total_seconds()))) return True def send(self): """ 发送邮件 :return: 是否发送成功 :exception 发送失败异常 """ email = self.data.get("email") state = self.data.get("type") # 生成随机验证码 code = "".join(list(map(lambda i: random.choice(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0' ]), range(6)))) # 获取邮件模板 language = get_language() file = open( os.path.join(PROJECT_DIR, "apps", "common", 'template', f'email_template_{to_locale(language)}.html'), "r", encoding='utf-8') content = file.read() file.close() code_cache_key = email + ":" + state code_cache_key_lock = code_cache_key + "_lock" # 设置缓存 cache.set(get_key(code_cache_key_lock), code, timeout=datetime.timedelta(minutes=1), version=version) system_setting = QuerySet(SystemSetting).filter(type=SettingType.EMAIL.value).first() if system_setting is None: cache.delete(get_key(code_cache_key_lock), version=version) raise AppApiException(1004, _("The email service has not been set up. Please contact the administrator to set up the email service in [Email Settings].")) try: connection = EmailBackend(system_setting.meta.get("email_host"), system_setting.meta.get('email_port'), system_setting.meta.get('email_host_user'), system_setting.meta.get('email_host_password'), system_setting.meta.get('email_use_tls'), False, system_setting.meta.get('email_use_ssl') ) # 发送邮件 send_mail(_('【Intelligent knowledge base question and answer system-{action}】').format( action=_('User registration') if state == 'register' else _('Change password')), '', html_message=f'{content.replace("${code}", code)}', from_email=system_setting.meta.get('from_email'), recipient_list=[email], fail_silently=False, connection=connection) except Exception as e: cache.delete(get_key(code_cache_key_lock)) raise AppApiException(500, f"{str(e)}" + _("Email sending failed")) cache.set(get_key(code_cache_key), code, timeout=datetime.timedelta(minutes=30), version=version) return True class CheckCodeSerializer(serializers.Serializer): """ 校验验证码 """ email = serializers.EmailField( required=True, label=_("Email"), validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) code = serializers.CharField(required=True, label=_("Verification code")) type = serializers.CharField(required=True, label=_("Type"), validators=[ validators.RegexValidator(regex=re.compile("^register|reset_password$"), message=_( "The type only supports register|reset_password"), code=500) ]) def is_valid(self, *, raise_exception=False): super().is_valid() value = cache.get(get_key(self.data.get("email") + ":" + self.data.get("type")), version=version) if value is None or value != self.data.get("code"): raise ExceptionCodeConstants.CODE_ERROR.value.to_app_api_exception() return True class SwitchLanguageSerializer(serializers.Serializer): user_id = serializers.UUIDField(required=True, label=_('user id')) language = serializers.CharField(required=True, label=_('language')) def switch(self): self.is_valid(raise_exception=True) language = self.data.get('language') support_language_list = ['zh-CN', 'zh-Hant', 'en-US'] if not support_language_list.__contains__(language): raise AppApiException(500, _('language only support:') + ','.join(support_language_list)) QuerySet(User).filter(id=self.data.get('user_id')).update(language=language)