MaxKB/apps/folders/serializers/folder.py
2025-12-25 11:09:02 +08:00

360 lines
16 KiB
Python

# -*- coding: utf-8 -*-
import uuid_utils.compat as uuid
from django.db import transaction
from django.db.models import QuerySet, Q, Func, F, TextField
from django.db.models.functions import Cast
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from application.models.application import Application, ApplicationFolder
from application.serializers.application import ApplicationOperateSerializer
from application.serializers.application_folder import ApplicationFolderTreeSerializer
from common.constants.permission_constants import Group, ResourcePermission, ResourcePermissionRole, RoleConstants
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.exception.app_exception import AppApiException
from folders.api.folder import FolderCreateRequest
from knowledge.models import KnowledgeFolder, Knowledge
from knowledge.serializers.knowledge import KnowledgeSerializer
from knowledge.serializers.knowledge_folder import KnowledgeFolderTreeSerializer
from system_manage.models import WorkspaceUserResourcePermission
from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
from tools.models import ToolFolder, Tool
from tools.serializers.tool import ToolSerializer
from tools.serializers.tool_folder import ToolFolderTreeSerializer
from users.serializers.user import is_workspace_manage
def get_source_type(source):
if source == Group.TOOL.name:
return Tool
elif source == Group.APPLICATION.name:
return Application
elif source == Group.KNOWLEDGE.name:
return Knowledge
else:
return None
def get_folder_type(source):
if source == Group.TOOL.name:
return ToolFolder
elif source == Group.APPLICATION.name:
return ApplicationFolder
elif source == Group.KNOWLEDGE.name:
return KnowledgeFolder
else:
return None
def get_folder_tree_serializer(source):
if source == Group.TOOL.name:
return ToolFolderTreeSerializer
elif source == Group.APPLICATION.name:
return ApplicationFolderTreeSerializer
elif source == Group.KNOWLEDGE.name:
return KnowledgeFolderTreeSerializer
else:
return None
FOLDER_DEPTH = 10000
def check_depth(source, parent_id, workspace_id, current_depth=0):
# Folder 不能超过3层
Folder = get_folder_type(source) # noqa
if parent_id != workspace_id:
# 计算当前层级
depth = 1 # 当前要创建的节点算一层
current_parent_id = parent_id
# 向上追溯父节点
while current_parent_id != workspace_id:
depth += 1
parent_node = QuerySet(Folder).filter(id=current_parent_id).first()
if parent_node is None:
break
current_parent_id = parent_node.parent_id
# 验证层级深度
if depth + current_depth > FOLDER_DEPTH:
raise serializers.ValidationError(_('Folder depth cannot exceed 10000 levels'))
def get_max_depth(current_node):
if not current_node:
return 0
# 获取所有后代节点
descendants = current_node.get_descendants()
if not descendants.exists():
return 0
# 获取最大深度
max_level = descendants.order_by('-level').first().level
current_level = current_node.level
max_depth = max_level - current_level
return max_depth
def has_target_permission(workspace_id, source, user_id, target):
return QuerySet(WorkspaceUserResourcePermission).filter(workspace_id=workspace_id, user_id=user_id,
auth_target_type=source, target=target,
permission_list__contains=['MANAGE']).exists()
class FolderSerializer(serializers.Serializer):
id = serializers.CharField(required=True, label=_('folder id'))
name = serializers.CharField(required=True, label=_('folder name'))
desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('folder description'))
user_id = serializers.CharField(required=True, label=_('folder user id'))
workspace_id = serializers.CharField(required=False, label=_('workspace id'))
parent_id = serializers.CharField(required=False, label=_('parent id'))
class Create(serializers.Serializer):
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
user_id = serializers.UUIDField(required=True, label=_('user id'))
source = serializers.CharField(required=True, label=_('source'))
def insert(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
FolderCreateRequest(data=instance).is_valid(raise_exception=True)
workspace_id = self.data.get('workspace_id')
if not workspace_id:
workspace_id = 'default'
parent_id = instance.get('parent_id')
if not parent_id:
parent_id = workspace_id
name = instance.get('name')
Folder = get_folder_type(self.data.get('source')) # noqa
if QuerySet(Folder).filter(name=name, workspace_id=workspace_id, parent_id=parent_id).exists():
raise serializers.ValidationError(_('Folder name already exists'))
# Folder 不能超过3层
check_depth(self.data.get('source'), parent_id, workspace_id)
folder = Folder(
id=uuid.uuid7(),
name=instance.get('name'),
desc=instance.get('desc'),
user_id=self.data.get('user_id'),
workspace_id=workspace_id,
parent_id=parent_id
)
folder.save()
UserResourcePermissionSerializer(data={
'workspace_id': self.data.get('workspace_id'),
'user_id': self.data.get('user_id'),
'auth_target_type': self.data.get('source')
}).auth_resource(str(folder.id), is_folder=True)
return FolderSerializer(folder).data
class Operate(serializers.Serializer):
id = serializers.CharField(required=True, label=_('folder id'))
workspace_id = serializers.CharField(required=True, allow_null=True, allow_blank=True, label=_('workspace id'))
source = serializers.CharField(required=True, label=_('source'))
user_id = serializers.UUIDField(required=True, label=_('user id'))
@transaction.atomic
def edit(self, instance):
self.is_valid(raise_exception=True)
Folder = get_folder_type(self.data.get('source')) # noqa
current_id = self.data.get('id')
current_node = Folder.objects.get(id=current_id)
if current_node is None:
raise serializers.ValidationError(_('Folder does not exist'))
# 模块间的移动
parent_id = instance.get('parent_id')
if parent_id is None:
parent_id = current_node.parent_id
# 如果要修改文件夹名称,检查同级目录下是否存在同名文件夹
new_name = instance.get('name')
if new_name is not None and new_name != current_node.name:
if QuerySet(Folder).filter(
name=new_name,
parent_id=parent_id,
workspace_id=current_node.workspace_id
).exclude(id=current_id).exists():
raise serializers.ValidationError(_('Folder name already exists'))
edit_field_list = ['name', 'desc']
edit_dict = {field: instance.get(field) for field in edit_field_list if (
field in instance and instance.get(field) is not None)}
QuerySet(Folder).filter(id=current_id).update(**edit_dict)
current_node.refresh_from_db()
if parent_id is not None and current_id != current_node.workspace_id and current_node.parent_id != parent_id:
source_type = self.data.get('source')
if has_target_permission(current_node.workspace_id, source_type, self.data.get('user_id'),
parent_id) or is_workspace_manage(self.data.get('user_id'),
current_node.workspace_id):
current_depth = get_max_depth(current_node)
check_depth(self.data.get('source'), parent_id, current_node.workspace_id, current_depth)
parent = Folder.objects.get(id=parent_id)
if QuerySet(Folder).filter(name=current_node.name, parent_id=parent_id,
workspace_id=current_node.workspace_id).exists():
raise serializers.ValidationError(_('Folder name already exists'))
current_node.parent = parent
current_node.save()
current_node.refresh_from_db()
else:
raise AppApiException(403, _('No permission for the target folder'))
return self.one()
def one(self):
self.is_valid(raise_exception=True)
Folder = get_folder_type(self.data.get('source')) # noqa
folder = QuerySet(Folder).filter(id=self.data.get('id')).first()
return FolderSerializer(folder).data
@transaction.atomic
def delete(self):
self.is_valid(raise_exception=True)
Folder = get_folder_type(self.data.get('source')) # noqa
Source = get_source_type(self.data.get('source')) # noqa
folder = Folder.objects.filter(id=self.data.get('id')).first()
if not folder:
raise serializers.ValidationError(_('Folder does not exist'))
if folder.id == folder.workspace_id:
raise serializers.ValidationError(_('Cannot delete root folder'))
# 工作空间管理员可以删除
workspace_manage = is_workspace_manage(self.data.get('user_id'), self.data.get('workspace_id'))
if workspace_manage:
nodes = Folder.objects.filter(id=self.data.get('id')).get_descendants(include_self=True)
for node in nodes:
# print(node)
# 删除相关的资源
self.delete_source(node)
# 删除节点
node.delete()
# 普通用户删除的文件夹内全部都得是自己有权限的资源
else:
nodes = Folder.objects.filter(id=self.data.get('id')).get_descendants(include_self=True)
for node in nodes:
# 删除相关的资源
source_ids = (Source.objects.filter(folder_id=node.id)
.annotate(id_str=Cast('id', TextField()))
.values_list('id_str', flat=True))
# 检查文件夹是否存在未授权当前用户的资源
auth_list = QuerySet(WorkspaceUserResourcePermission).filter(
Q(workspace_id=self.data.get('workspace_id')) &
Q(user_id=self.data.get('user_id')) &
Q(auth_target_type=self.data.get('source')) &
Q(target__in=source_ids) &
Q(permission_list__overlap=[ResourcePermission.MANAGE, ResourcePermissionRole.ROLE])
).count()
if auth_list != len(source_ids):
raise AppApiException(500, _('This folder contains resources that you dont have permission'))
self.delete_source(node)
node.delete()
def delete_source(self, node):
Source = get_source_type(self.data.get('source')) # noqa
source_ids = Source.objects.filter(folder_id=node.id).values_list('id', flat=True)
source = self.data.get('source')
for source_id in source_ids:
if source == Group.TOOL.name:
ToolSerializer.Operate(data={
'workspace_id': self.data.get('workspace_id'),
'id': source_id,
}).delete()
elif source == Group.APPLICATION.name:
ApplicationOperateSerializer(data={
'workspace_id': self.data.get('workspace_id'),
'application_id': source_id,
'user_id': self.data.get('user_id'),
}).delete()
elif source == Group.KNOWLEDGE.name:
KnowledgeSerializer.Operate(data={
'workspace_id': self.data.get('workspace_id'),
'knowledge_id': source_id,
'user_id': self.data.get('user_id'),
}).delete()
class FolderTreeSerializer(serializers.Serializer):
workspace_id = serializers.CharField(required=True, allow_null=True, allow_blank=True, label=_('workspace id'))
source = serializers.CharField(required=True, label=_('source'))
@staticmethod
def _check_tree_integrity(queryset):
"""检查树结构完整性"""
for folder in queryset:
if folder.lft >= folder.rght:
return True # 需要重建
if folder.is_leaf_node() and folder.get_children().exists():
return True # 需要重建
return False
@staticmethod
def _having_read_permission_by_role(user_id: str, workspace_id: str, source: str):
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
is_x_pack_ee = workspace_user_role_mapping_model is not None and role_permission_mapping_model is not None
if is_x_pack_ee:
return QuerySet(workspace_user_role_mapping_model).select_related('role', 'user').filter(
workspace_id=workspace_id, user_id=user_id,
role__type=RoleConstants.USER.value.__str__(),
role__rolepermission__permission_id=f"{source}_FOLDER:READ"
).exists()
return False
def get_folder_tree(self,
current_user, name=None):
self.is_valid(raise_exception=True)
user_id = current_user.id
workspace_id = self.data.get('workspace_id')
source = self.data.get('source')
Folder = get_folder_type(source) # noqa
# 检查特定工作空间的树结构完整性
workspace_folders = Folder.objects.filter(workspace_id=workspace_id)
# 如果发现数据不一致,重建整个表(这是 MPTT 的限制)
if self._check_tree_integrity(workspace_folders):
Folder.objects.rebuild()
workspace_manage = is_workspace_manage(user_id, workspace_id)
base_q = Q(workspace_id=workspace_id)
if name is not None:
base_q &= Q(name__contains=name)
if not workspace_manage:
having_read_permission_by_role = self._having_read_permission_by_role(user_id, workspace_id, source)
permission_condition = ['VIEW']
if having_read_permission_by_role:
permission_condition = ['VIEW', 'ROLE']
base_q &= (Q(id__in=WorkspaceUserResourcePermission.objects.filter(user_id=current_user.id,
auth_target_type=self.data.get('source'),
workspace_id=self.data.get(
'workspace_id'),
permission_list__overlap=permission_condition)
.values_list(
'target', flat=True)) | Q(id=self.data.get('workspace_id')))
nodes = Folder.objects.filter(base_q).get_cached_trees()
TreeSerializer = get_folder_tree_serializer(self.data.get('source')) # noqa
serializer = TreeSerializer(nodes, many=True)
return [d for d in serializer.data if
d.get('id') == d.get('workspace_id')] if name is None else serializer.data # 这是可序列化的字典