MaxKB/apps/users/serializers/user.py
2025-05-08 10:51:01 +08:00

375 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file user.py
@date2025/4/14 19:18
@desc:
"""
import re
from django.db import transaction
from django.db.models import Q, QuerySet
from rest_framework import serializers
import uuid_utils.compat as uuid
from common.constants.exception_code_constants import ExceptionCodeConstants
from common.constants.permission_constants import RoleConstants, Auth
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.db.search import page_search
from common.exception.app_exception import AppApiException
from common.utils.common import valid_license, password_encrypt
from users.models import User
from django.utils.translation import gettext_lazy as _
from django.core import validators
PASSWORD_REGEX = re.compile(
r"^(?![a-zA-Z]+$)(?![A-Z0-9]+$)(?![A-Z_!@#$%^&*`~.()-+=]+$)(?![a-z0-9]+$)(?![a-z_!@#$%^&*`~()-+=]+$)"
r"(?![0-9_!@#$%^&*`~()-+=]+$)[a-zA-Z0-9_!@#$%^&*`~.()-+=]{6,20}$"
)
class UserProfileResponse(serializers.ModelSerializer):
is_edit_password = serializers.BooleanField(required=True, label=_('Is Edit Password'))
permissions = serializers.ListField(required=True, label=_('permissions'))
class Meta:
model = User
fields = ['id', 'username', 'nick_name', 'email', 'role', 'permissions', 'language', 'is_edit_password']
class CreateUserSerializer(serializers.Serializer):
username = serializers.CharField(required=True, label=_('Username'))
password = serializers.CharField(required=True, label=_('Password'))
email = serializers.EmailField(required=True, label=_('Email'))
nick_name = serializers.CharField(required=False, label=_('Nick name'))
phone = serializers.CharField(required=False, label=_('Phone'))
class UserProfileSerializer(serializers.Serializer):
@staticmethod
def profile(user: User, auth: Auth):
"""
获取用户详情
@param user: 用户对象
@param auth: 认证对象
@return:
"""
return {
'id': user.id,
'username': user.username,
'nick_name': user.nick_name,
'email': user.email,
'role': auth.role_list,
'permissions': auth.permission_list,
'is_edit_password': user.role == RoleConstants.ADMIN.name and user.password == 'd880e722c47a34d8e9fce789fc62389d',
'language': user.language,
}
class UserInstanceSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['id', 'username', 'email', 'phone', 'is_active', 'role', 'nick_name', 'create_time', 'update_time',
'source']
class UserManageSerializer(serializers.Serializer):
class UserInstance(serializers.Serializer):
email = serializers.EmailField(
required=True,
label=_("Email"),
validators=[validators.EmailValidator(
message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message,
code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code
)]
)
username = serializers.CharField(
required=True,
label=_("Username"),
max_length=20,
min_length=6,
validators=[
validators.RegexValidator(
regex=re.compile("^.{6,20}$"),
message=_('Username must be 6-20 characters long')
)
]
)
password = serializers.CharField(
required=True,
label=_("Password"),
max_length=20,
min_length=6,
validators=[
validators.RegexValidator(
regex=PASSWORD_REGEX,
message=_(
"The password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."
)
)
]
)
nick_name = serializers.CharField(
required=False,
label=_("Nick name"),
max_length=64,
allow_null=True,
allow_blank=True
)
phone = serializers.CharField(
required=False,
label=_("Phone"),
max_length=20,
allow_null=True,
allow_blank=True
)
def is_valid(self, *, raise_exception=True):
super().is_valid(raise_exception=True)
self._check_unique_username_and_email()
def _check_unique_username_and_email(self):
username = self.data.get('username')
email = self.data.get('email')
user = User.objects.filter(Q(username=username) | Q(email=email)).first()
if user:
if user.email == email:
raise ExceptionCodeConstants.EMAIL_IS_EXIST.value.to_app_api_exception()
if user.username == username:
raise ExceptionCodeConstants.USERNAME_IS_EXIST.value.to_app_api_exception()
class Query(serializers.Serializer):
email_or_username = serializers.CharField(required=False, allow_null=True,
label=_('Email or username'))
def get_query_set(self):
email_or_username = self.data.get('email_or_username')
query_set = QuerySet(User)
if email_or_username is not None:
query_set = query_set.filter(
Q(username__contains=email_or_username) | Q(email__contains=email_or_username))
query_set = query_set.order_by("-create_time")
return query_set
def list(self, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
return [{'id': user_model.id, 'username': user_model.username, 'email': user_model.email} for user_model in
self.get_query_set()]
def page(self, current_page: int, page_size: int, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
return page_search(current_page, page_size,
self.get_query_set(),
post_records_handler=lambda u: UserInstanceSerializer(u).data)
@valid_license(model=User, count=2,
message=_(
'The community version supports up to 2 users. If you need more users, please contact us (https://fit2cloud.com/).'))
@transaction.atomic
def save(self, instance, with_valid=True):
if with_valid:
self.UserInstance(data=instance).is_valid(raise_exception=True)
user = User(
id=uuid.uuid7(),
email=instance.get('email'),
phone=instance.get('phone', ''),
nick_name=instance.get('nick_name', ''),
username=instance.get('username'),
password=password_encrypt(instance.get('password')),
role=RoleConstants.USER.name,
source="LOCAL",
is_active=True
)
# 企业版本 增加用户的角色设置 参数是 role_id [workspace_id]
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
if workspace_user_role_mapping_model:
# 获取当前用户的角色设置
role_setting = instance.get('role_setting')
# role_setting 的数据类型是{"role_id": [workspace_id1, workspace_id2]}
# 如果是系统层级的 workspace_id 是 SYSTEM 或者是NONE
for role_id, workspace_ids in role_setting.items():
for workspace_id in workspace_ids:
# 创建用户角色映射关系
workspace_user_role_mapping_model.objects.create(
user_id=user.id,
role_id=role_id,
workspace_id=workspace_id
)
user.save()
return UserInstanceSerializer(user).data
class UserEditInstance(serializers.Serializer):
email = serializers.EmailField(
required=False,
label=_("Email"),
validators=[validators.EmailValidator(
message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message,
code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code
)]
)
nick_name = serializers.CharField(
required=False,
label=_("Name"),
max_length=64,
allow_null=True,
allow_blank=True
)
phone = serializers.CharField(
required=False,
label=_("Phone"),
max_length=20,
allow_null=True,
allow_blank=True
)
is_active = serializers.BooleanField(
required=False,
label=_("Is Active")
)
def is_valid(self, *, user_id=None, raise_exception=False):
super().is_valid(raise_exception=True)
self._check_unique_email(user_id)
def _check_unique_email(self, user_id):
email = self.data.get('email')
if email and User.objects.filter(email=email).exclude(id=user_id).exists():
raise AppApiException(1004, _('Email is already in use'))
class RePasswordInstance(serializers.Serializer):
password = serializers.CharField(
required=True,
label=_("Password"),
max_length=20,
min_length=6,
validators=[
validators.RegexValidator(
regex=PASSWORD_REGEX,
message=_(
"The password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."
)
)
]
)
re_password = serializers.CharField(
required=True,
label=_("Re Password"),
validators=[
validators.RegexValidator(
regex=PASSWORD_REGEX,
message=_(
"The confirmation password must be 6-20 characters long and must be a combination of letters, numbers, and special characters."
)
)
]
)
def is_valid(self, *, raise_exception=False):
super().is_valid(raise_exception=True)
self._check_passwords_match()
def _check_passwords_match(self):
if self.data.get('password') != self.data.get('re_password'):
raise ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.to_app_api_exception()
class Operate(serializers.Serializer):
id = serializers.UUIDField(required=True, label=_('User ID'))
def is_valid(self, *, raise_exception=False):
super().is_valid(raise_exception=True)
self._check_user_exists()
def _check_user_exists(self):
if not User.objects.filter(id=self.data.get('id')).exists():
raise AppApiException(1004, _('User does not exist'))
@transaction.atomic
def delete(self, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
self._check_not_admin()
user_id = self.data.get('id')
# TODO 需要删除授权关系
User.objects.filter(id=user_id).delete()
return True
def _check_not_admin(self):
user = User.objects.filter(id=self.data.get('id')).first()
if user.role == RoleConstants.ADMIN.name:
raise AppApiException(1004, _('Unable to delete administrator'))
def edit(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
UserManageSerializer.UserEditInstance(data=instance).is_valid(user_id=self.data.get('id'),
raise_exception=True)
user = User.objects.filter(id=self.data.get('id')).first()
self._check_admin_modification(user, instance)
self._update_user_fields(user, instance)
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
if workspace_user_role_mapping_model:
# 获取当前用户的角色设置
role_setting = instance.get('role_setting')
# 全部删除重新添加
workspace_user_role_mapping_model.objects.filter(user_id=user.id).delete()
for role_id, workspace_ids in role_setting.items():
for workspace_id in workspace_ids:
# 创建用户角色映射关系
workspace_user_role_mapping_model.objects.create(
user_id=user.id,
role_id=role_id,
workspace_id=workspace_id
)
user.save()
return UserInstanceSerializer(user).data
@staticmethod
def _check_admin_modification(user, instance):
if user.role == RoleConstants.ADMIN.name and 'is_active' in instance and instance.get(
'is_active') is not None:
raise AppApiException(1004, _('Cannot modify administrator status'))
@staticmethod
def _update_user_fields(user, instance):
update_keys = ['email', 'nick_name', 'phone', 'is_active']
for key in update_keys:
if key in instance and instance.get(key) is not None:
setattr(user, key, instance.get(key))
def one(self, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
user = User.objects.filter(id=self.data.get('id')).first()
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
if workspace_user_role_mapping_model:
role_setting = {}
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(
user_id=user.id)
for workspace_user_role_mapping in workspace_user_role_mapping_list:
role_id = workspace_user_role_mapping.role_id
workspace_id = workspace_user_role_mapping.workspace_id
if role_id not in role_setting:
role_setting[role_id] = []
role_setting[role_id].append(workspace_id)
return {
'id': user.id,
'username': user.username,
'email': user.email,
'phone': user.phone,
'nick_name': user.nick_name,
'is_active': user.is_active,
'role_setting': role_setting
}
return UserInstanceSerializer(user).data
def re_password(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
UserManageSerializer.RePasswordInstance(data=instance).is_valid(raise_exception=True)
user = User.objects.filter(id=self.data.get('id')).first()
user.password = password_encrypt(instance.get('password'))
user.save()
return True