chore: add default permission handling and update user permissions logic

This commit is contained in:
wxg0103 2025-12-23 18:09:41 +08:00
parent ed45fabcce
commit 6d5664c674
2 changed files with 261 additions and 3 deletions

View File

@ -21,13 +21,15 @@ import uuid_utils.compat as uuid
from common.constants.cache_version import Cache_Version
from common.constants.exception_code_constants import ExceptionCodeConstants
from common.constants.permission_constants import RoleConstants, Auth
from common.constants.permission_constants import RoleConstants, Auth, ResourceAuthType, ResourcePermissionRole, \
ResourcePermission
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.db.search import page_search
from common.exception.app_exception import AppApiException
from common.utils.common import valid_license, password_encrypt
from maxkb import settings
from maxkb.conf import PROJECT_DIR
from system_manage.models import SystemSetting, SettingType
from system_manage.models import SystemSetting, SettingType, AuthTargetType, WorkspaceUserResourcePermission
from users.models import User
from django.utils.translation import gettext_lazy as _, to_locale
from django.core import validators
@ -62,6 +64,7 @@ class CreateUserSerializer(serializers.Serializer):
nick_name = serializers.CharField(required=False, label=_('Nick name'))
phone = serializers.CharField(required=False, label=_('Phone'))
source = serializers.CharField(required=False, label=_('Source'), default='LOCAL')
defaultPermission = serializers.CharField(required=False, label=_('defaultPermission'))
def is_workspace_manage(user_id: str, workspace_id: str):
@ -181,7 +184,6 @@ class UserManageSerializer(serializers.Serializer):
default="LOCAL"
)
def is_valid(self, *, raise_exception=True):
super().is_valid(raise_exception=True)
self._check_unique_username_and_email()
@ -345,6 +347,7 @@ class UserManageSerializer(serializers.Serializer):
is_active=True
)
update_user_role(instance, user, user_id)
set_default_permission(user.id, instance)
user.save()
return UserInstanceSerializer(user).data
@ -639,6 +642,7 @@ def update_user_role(instance, user, user_id=None):
else:
workspace_user_role_mapping_model.objects.filter(user_id=user.id).exclude(
role__type=RoleConstants.ADMIN.name).delete()
relations = set()
for item in role_setting:
role_id = item['role_id']
@ -656,6 +660,258 @@ def update_user_role(instance, user, user_id=None):
cache.delete(permission_get_key(str(user.id)), version=permission_version)
def set_default_permission(user_id, instance):
"""
为用户设置默认权限
"""
default_permission = instance.get('defaultPermission', 'NOT_AUTH')
# 获取工作空间ID列表
workspace_ids = _get_workspace_ids(instance, default_permission)
if not workspace_ids:
return
# 根据权限类型确定认证类型
auth_type = (ResourceAuthType.ROLE
if default_permission == ResourceAuthType.ROLE
else ResourceAuthType.RESOURCE_PERMISSION_GROUP)
# 设置根目录权限
_set_root_permissions(user_id, workspace_ids)
# 如果是无权限设置,直接返回
if default_permission == 'NOT_AUTH':
return
# 设置具体资源权限
_set_resource_permissions(user_id, workspace_ids, default_permission, auth_type)
def _get_workspace_ids(instance, default_permission):
"""
获取工作空间ID列表
"""
role_setting_model = DatabaseModelManage.get_model("role_model")
if not role_setting_model:
return ['default']
# 检查许可证有效性
license_is_valid = DatabaseModelManage.get_model('license_is_valid') or (lambda: False)
if default_permission == ResourceAuthType.ROLE and not license_is_valid():
return []
role_setting = instance.get('role_setting')
if not role_setting:
return ['default']
# 获取用户角色的工作空间ID
all_role_ids = [item['role_id'] for item in role_setting]
user_role_ids = set(role_setting_model.objects.filter(
id__in=all_role_ids,
type=RoleConstants.USER.name
).values_list('id', flat=True))
workspace_ids = set()
for item in role_setting:
role_id = item['role_id']
if role_id in user_role_ids:
workspace_ids.update(item.get('workspace_ids', []))
return list(workspace_ids) if workspace_ids else []
def _set_root_permissions(user_id, workspace_ids):
"""
设置根目录权限默认为查看权限
"""
root_permissions = []
for ws in workspace_ids:
root_permissions.extend([
WorkspaceUserResourcePermission(
target=ws,
auth_target_type=auth_target_type,
permission_list=[ResourcePermission.VIEW],
workspace_id=ws,
user_id=user_id,
auth_type=ResourceAuthType.RESOURCE_PERMISSION_GROUP
)
for auth_target_type in [
AuthTargetType.APPLICATION.value,
AuthTargetType.KNOWLEDGE.value,
AuthTargetType.TOOL.value
]
])
_batch_create_permissions(root_permissions)
def _set_resource_permissions(user_id, workspace_ids, default_permission, auth_type):
"""
设置具体资源权限
"""
# 批量查询资源并按工作空间分组
resource_maps = _get_resource_maps(workspace_ids)
# 构造权限实例
instances = []
for ws in workspace_ids:
instances.extend(_create_resource_permission_instances(
ws, resource_maps, user_id, default_permission, auth_type))
# 批量创建权限
_batch_create_permissions(instances)
def _get_resource_maps(workspace_ids):
"""
获取各类型资源按工作空间的映射
"""
from application.models import Application, ApplicationFolder
from knowledge.models import Knowledge, KnowledgeFolder
from tools.models import Tool, ToolFolder
from models_provider.models import Model
from collections import defaultdict
resource_maps = {
'apps': defaultdict(list),
'app_folders': defaultdict(list),
'knowledge': defaultdict(list),
'knowledge_folders': defaultdict(list),
'tools': defaultdict(list),
'tool_folders': defaultdict(list),
'models': defaultdict(list)
}
# 查询应用资源
for ws, rid in Application.objects.filter(workspace_id__in=workspace_ids).values_list('workspace_id', 'id'):
resource_maps['apps'][ws].append(rid)
for ws, fid in ApplicationFolder.objects.filter(workspace_id__in=workspace_ids).exclude(
id__in=workspace_ids).values_list('workspace_id', 'id'):
resource_maps['app_folders'][ws].append(fid)
# 查询知识库资源
for ws, kid in Knowledge.objects.filter(workspace_id__in=workspace_ids).values_list('workspace_id', 'id'):
resource_maps['knowledge'][ws].append(kid)
for ws, kfid in KnowledgeFolder.objects.filter(workspace_id__in=workspace_ids).exclude(
id__in=workspace_ids).values_list('workspace_id', 'id'):
resource_maps['knowledge_folders'][ws].append(kfid)
# 查询工具资源
for ws, tid in Tool.objects.filter(workspace_id__in=workspace_ids).values_list('workspace_id', 'id'):
resource_maps['tools'][ws].append(tid)
for ws, tfid in ToolFolder.objects.filter(workspace_id__in=workspace_ids).exclude(
id__in=workspace_ids).values_list('workspace_id', 'id'):
resource_maps['tool_folders'][ws].append(tfid)
# 查询模型资源
for ws, mid in Model.objects.filter(workspace_id__in=workspace_ids).values_list('workspace_id', 'id'):
resource_maps['models'][ws].append(mid)
return resource_maps
def _create_resource_permission_instances(workspace_id, resource_maps, user_id, permission, auth_type):
"""
创建资源权限实例列表
"""
instances = []
# 应用权限
for rid in resource_maps['apps'].get(workspace_id, []):
instances.append(WorkspaceUserResourcePermission(
target=rid,
auth_target_type=AuthTargetType.APPLICATION.value,
permission_list=[permission],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
))
# 应用文件夹权限
for fid in resource_maps['app_folders'].get(workspace_id, []):
instances.append(WorkspaceUserResourcePermission(
target=fid,
auth_target_type=AuthTargetType.APPLICATION.value,
permission_list=[permission],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
))
# 知识库权限
for kid in resource_maps['knowledge'].get(workspace_id, []):
instances.append(WorkspaceUserResourcePermission(
target=kid,
auth_target_type=AuthTargetType.KNOWLEDGE.value,
permission_list=[permission],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
))
# 知识库文件夹权限
for kf in resource_maps['knowledge_folders'].get(workspace_id, []):
instances.append(WorkspaceUserResourcePermission(
target=kf,
auth_target_type=AuthTargetType.KNOWLEDGE.value,
permission_list=[permission],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
))
# 工具权限
for tid in resource_maps['tools'].get(workspace_id, []):
instances.append(WorkspaceUserResourcePermission(
target=tid,
auth_target_type=AuthTargetType.TOOL.value,
permission_list=[permission],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
))
# 工具文件夹权限
for tf in resource_maps['tool_folders'].get(workspace_id, []):
instances.append(WorkspaceUserResourcePermission(
target=tf,
auth_target_type=AuthTargetType.TOOL.value,
permission_list=[permission],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
))
# 模型权限
for mid in resource_maps['models'].get(workspace_id, []):
instances.append(WorkspaceUserResourcePermission(
target=mid,
auth_target_type=AuthTargetType.MODEL.value,
permission_list=[permission],
workspace_id=workspace_id,
user_id=user_id,
auth_type=auth_type
))
return instances
def _batch_create_permissions(instances, batch_size=500):
"""
批量创建权限实例
"""
if not instances:
return
objs = WorkspaceUserResourcePermission.objects
for i in range(0, len(instances), batch_size):
objs.bulk_create(instances[i:i + batch_size])
class RePasswordSerializer(serializers.Serializer):
email = serializers.EmailField(
required=True,

View File

@ -125,6 +125,8 @@ class UserList(APIView):
operation_id=_("Get all user"), # type: ignore
tags=[_("User Management")], # type: ignore
responses=UserListApi.get_response())
@has_permissions(RoleConstants.WORKSPACE_MANAGE, RoleConstants.ADMIN, RoleConstants.EXTENDS_ADMIN,
RoleConstants.EXTENDS_WORKSPACE_MANAGE)
def get(self, request: Request):
return result.success(UserManageSerializer().get_all_user_list())