feat: enhance user role management by passing user_id to relevant methods

This commit is contained in:
wxg0103 2025-07-14 10:49:47 +08:00
parent e618b37309
commit f0357e5944
2 changed files with 25 additions and 14 deletions

View File

@ -226,7 +226,7 @@ class UserManageSerializer(serializers.Serializer):
return [{'id': user_model.id, 'username': user_model.username, 'email': user_model.email} for user_model in
self.get_query_set()]
def page(self, current_page: int, page_size: int, with_valid=True):
def page(self, current_page: int, page_size: int, user_id: str, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
result = page_search(current_page, page_size,
@ -235,7 +235,7 @@ class UserManageSerializer(serializers.Serializer):
role_model = DatabaseModelManage.get_model("role_model")
user_role_relation_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
def _get_user_roles(user_ids):
def _get_user_roles(user_ids, is_admin=True):
workspace_model = DatabaseModelManage.get_model("workspace_model")
if not (role_model and user_role_relation_model and workspace_model):
return {}
@ -261,7 +261,8 @@ class UserManageSerializer(serializers.Serializer):
user_id = str(relation.user_id)
role_id = relation.role_id
workspace_id = relation.workspace_id
if not is_admin and relation.role.type == RoleConstants.ADMIN.name:
continue
user_role_mapping[user_id].add(relation.role.role_name)
user_role_setting_mapping[user_id][role_id].append(workspace_id)
user_role_workspace_mapping[user_id][relation.role.role_name].append(
@ -285,8 +286,12 @@ class UserManageSerializer(serializers.Serializer):
return user_role_mapping, result_user_role_setting_mapping, result_user_role_workspace_mapping
if role_model and user_role_relation_model:
# 获取当前用户的所有角色 判断是不是内置的系统管理员
is_admin = user_role_relation_model.objects.filter(user_id=user_id,
role_id=RoleConstants.ADMIN.name).exists()
user_ids = [user['id'] for user in result['records']]
user_role_mapping, user_role_setting_mapping, user_role_workspace_mapping = _get_user_roles(user_ids)
user_role_mapping, user_role_setting_mapping, user_role_workspace_mapping = _get_user_roles(user_ids,
is_admin)
# 将角色信息添加回用户数据中
for user in result['records']:
@ -297,7 +302,7 @@ class UserManageSerializer(serializers.Serializer):
return result
@transaction.atomic
def save(self, instance, with_valid=True):
def save(self, instance, user_id, with_valid=True):
if with_valid:
self.UserInstance(data=instance).is_valid(raise_exception=True)
@ -312,7 +317,7 @@ class UserManageSerializer(serializers.Serializer):
source="LOCAL",
is_active=True
)
update_user_role(instance, user)
update_user_role(instance, user, user_id)
user.save()
return UserInstanceSerializer(user).data
@ -419,7 +424,7 @@ class UserManageSerializer(serializers.Serializer):
if user.role == RoleConstants.ADMIN.name or str(user.id) == 'f0dd8f71-e4ee-11ee-8c84-a8a1595801ab':
raise AppApiException(1004, _('Unable to delete administrator'))
def edit(self, instance, with_valid=True):
def edit(self, instance, user_id, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
UserManageSerializer.UserEditInstance(data=instance).is_valid(user_id=self.data.get('id'),
@ -427,7 +432,7 @@ class UserManageSerializer(serializers.Serializer):
user = User.objects.filter(id=self.data.get('id')).first()
self._check_admin_modification(user, instance)
self._update_user_fields(user, instance)
update_user_role(instance, user)
update_user_role(instance, user, user_id)
user.save()
return UserInstanceSerializer(user).data
@ -556,9 +561,11 @@ class UserManageSerializer(serializers.Serializer):
return list(users)
def update_user_role(instance, user):
def update_user_role(instance, user, user_id=None):
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
if workspace_user_role_mapping_model:
is_admin = workspace_user_role_mapping_model.objects.filter(user_id=user_id,
role_id=RoleConstants.ADMIN.name).exists()
role_setting = instance.get('role_setting')
if not role_setting:
return
@ -587,8 +594,11 @@ def update_user_role(instance, user):
if role_id == str(workspace_manage_role_id) or role_id == str(RoleConstants.USER.value):
if default_workspace_id not in workspace_ids:
raise AppApiException(1004, _("Cannot delete built-in role"))
workspace_user_role_mapping_model.objects.filter(user_id=user.id).delete()
if is_admin:
workspace_user_role_mapping_model.objects.filter(user_id=user.id).delete()
else:
workspace_user_role_mapping_model.objects.filter(user_id=user.id).exclude(
role_id=RoleConstants.ADMIN.name).delete()
relations = set()
for item in role_setting:
role_id = item['role_id']

View File

@ -171,7 +171,7 @@ class UserManage(APIView):
@log(menu='User management', operate='Add user',
get_operation_object=lambda r, k: {'name': r.data.get('username', None)})
def post(self, request: Request):
return result.success(UserManageSerializer().save(request.data))
return result.success(UserManageSerializer().save(request.data, str(request.user.id)))
class Password(APIView):
authentication_classes = [TokenAuth]
@ -228,7 +228,8 @@ class UserManage(APIView):
get_operation_object=lambda r, k: get_user_operation_object(k.get('user_id')))
def put(self, request: Request, user_id):
return result.success(
UserManageSerializer.Operate(data={'id': user_id}).edit(request.data, with_valid=True))
UserManageSerializer.Operate(data={'id': user_id}).edit(request.data, str(request.user.id),
with_valid=True))
class BatchDelete(APIView):
authentication_classes = [TokenAuth]
@ -279,7 +280,7 @@ class UserManage(APIView):
def get(self, request: Request, current_page, page_size):
d = UserManageSerializer.Query(
data={**query_params_to_single_dict(request.query_params)})
return result.success(d.page(current_page, page_size))
return result.success(d.page(current_page, page_size, str(request.user.id)))
class RePasswordView(APIView):