MaxKB/apps/folders/serializers/folder.py
2025-10-27 16:37:33 +08:00

315 lines
14 KiB
Python

# -*- coding: utf-8 -*-
import uuid_utils.compat as uuid
from django.db import transaction
from django.db.models import QuerySet, Q, Func, F, TextField
from django.db.models.functions import Cast
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from application.models.application import Application, ApplicationFolder
from application.serializers.application import ApplicationOperateSerializer
from application.serializers.application_folder import ApplicationFolderTreeSerializer
from common.constants.permission_constants import Group, ResourcePermission, ResourcePermissionRole
from common.exception.app_exception import AppApiException
from folders.api.folder import FolderCreateRequest
from knowledge.models import KnowledgeFolder, Knowledge
from knowledge.serializers.knowledge import KnowledgeSerializer
from knowledge.serializers.knowledge_folder import KnowledgeFolderTreeSerializer
from system_manage.models import WorkspaceUserResourcePermission
from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
from tools.models import ToolFolder, Tool
from tools.serializers.tool import ToolSerializer
from tools.serializers.tool_folder import ToolFolderTreeSerializer
from users.serializers.user import is_workspace_manage
def get_source_type(source):
if source == Group.TOOL.name:
return Tool
elif source == Group.APPLICATION.name:
return Application
elif source == Group.KNOWLEDGE.name:
return Knowledge
else:
return None
def get_folder_type(source):
if source == Group.TOOL.name:
return ToolFolder
elif source == Group.APPLICATION.name:
return ApplicationFolder
elif source == Group.KNOWLEDGE.name:
return KnowledgeFolder
else:
return None
def get_folder_tree_serializer(source):
if source == Group.TOOL.name:
return ToolFolderTreeSerializer
elif source == Group.APPLICATION.name:
return ApplicationFolderTreeSerializer
elif source == Group.KNOWLEDGE.name:
return KnowledgeFolderTreeSerializer
else:
return None
FOLDER_DEPTH = 2 # Folder 不能超过3层
def check_depth(source, parent_id, workspace_id, current_depth=0):
# Folder 不能超过3层
Folder = get_folder_type(source) # noqa
if parent_id != workspace_id:
# 计算当前层级
depth = 1 # 当前要创建的节点算一层
current_parent_id = parent_id
# 向上追溯父节点
while current_parent_id != workspace_id:
depth += 1
parent_node = QuerySet(Folder).filter(id=current_parent_id).first()
if parent_node is None:
break
current_parent_id = parent_node.parent_id
# 验证层级深度
if depth + current_depth > FOLDER_DEPTH:
raise serializers.ValidationError(_('Folder depth cannot exceed 3 levels'))
def get_max_depth(current_node):
if not current_node:
return 0
# 获取所有后代节点
descendants = current_node.get_descendants()
if not descendants.exists():
return 0
# 获取最大深度
max_level = descendants.order_by('-level').first().level
current_level = current_node.level
max_depth = max_level - current_level
return max_depth
class FolderSerializer(serializers.Serializer):
id = serializers.CharField(required=True, label=_('folder id'))
name = serializers.CharField(required=True, label=_('folder name'))
desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('folder description'))
user_id = serializers.CharField(required=True, label=_('folder user id'))
workspace_id = serializers.CharField(required=False, label=_('workspace id'))
parent_id = serializers.CharField(required=False, label=_('parent id'))
class Create(serializers.Serializer):
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
user_id = serializers.UUIDField(required=True, label=_('user id'))
source = serializers.CharField(required=True, label=_('source'))
def insert(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
FolderCreateRequest(data=instance).is_valid(raise_exception=True)
workspace_id = self.data.get('workspace_id')
if not workspace_id:
workspace_id = 'default'
parent_id = instance.get('parent_id')
if not parent_id:
parent_id = workspace_id
name = instance.get('name')
Folder = get_folder_type(self.data.get('source')) # noqa
if QuerySet(Folder).filter(name=name, workspace_id=workspace_id, parent_id=parent_id).exists():
raise serializers.ValidationError(_('Folder name already exists'))
# Folder 不能超过3层
check_depth(self.data.get('source'), parent_id, workspace_id)
folder = Folder(
id=uuid.uuid7(),
name=instance.get('name'),
desc=instance.get('desc'),
user_id=self.data.get('user_id'),
workspace_id=workspace_id,
parent_id=parent_id
)
folder.save()
UserResourcePermissionSerializer(data={
'workspace_id': self.data.get('workspace_id'),
'user_id': self.data.get('user_id'),
'auth_target_type': self.data.get('source')
}).auth_resource(str(folder.id), is_folder=True)
return FolderSerializer(folder).data
class Operate(serializers.Serializer):
id = serializers.CharField(required=True, label=_('folder id'))
workspace_id = serializers.CharField(required=True, allow_null=True, allow_blank=True, label=_('workspace id'))
source = serializers.CharField(required=True, label=_('source'))
user_id = serializers.UUIDField(required=True, label=_('user id'))
@transaction.atomic
def edit(self, instance):
self.is_valid(raise_exception=True)
Folder = get_folder_type(self.data.get('source')) # noqa
current_id = self.data.get('id')
current_node = Folder.objects.get(id=current_id)
if current_node is None:
raise serializers.ValidationError(_('Folder does not exist'))
# 模块间的移动
parent_id = instance.get('parent_id')
if parent_id is None:
parent_id = current_node.parent_id
# 如果要修改文件夹名称,检查同级目录下是否存在同名文件夹
new_name = instance.get('name')
if new_name is not None and new_name != current_node.name:
if QuerySet(Folder).filter(
name=new_name,
parent_id=parent_id,
workspace_id=current_node.workspace_id
).exclude(id=current_id).exists():
raise serializers.ValidationError(_('Folder name already exists'))
edit_field_list = ['name', 'desc']
edit_dict = {field: instance.get(field) for field in edit_field_list if (
field in instance and instance.get(field) is not None)}
QuerySet(Folder).filter(id=current_id).update(**edit_dict)
if parent_id is not None and current_id != current_node.workspace_id and current_node.parent_id != parent_id:
# Folder 不能超过3层
current_depth = get_max_depth(current_node)
check_depth(self.data.get('source'), parent_id, current_node.workspace_id, current_depth)
parent = Folder.objects.get(id=parent_id)
current_node.move_to(parent)
return self.one()
def one(self):
self.is_valid(raise_exception=True)
Folder = get_folder_type(self.data.get('source')) # noqa
folder = QuerySet(Folder).filter(id=self.data.get('id')).first()
return FolderSerializer(folder).data
@transaction.atomic
def delete(self):
self.is_valid(raise_exception=True)
Folder = get_folder_type(self.data.get('source')) # noqa
Source = get_source_type(self.data.get('source')) # noqa
folder = Folder.objects.filter(id=self.data.get('id')).first()
if not folder:
raise serializers.ValidationError(_('Folder does not exist'))
if folder.id == folder.workspace_id:
raise serializers.ValidationError(_('Cannot delete root folder'))
# 工作空间管理员可以删除
workspace_manage = is_workspace_manage(self.data.get('user_id'), self.data.get('workspace_id'))
if workspace_manage:
nodes = Folder.objects.filter(id=self.data.get('id')).get_descendants(include_self=True)
for node in nodes:
# print(node)
# 删除相关的资源
self.delete_source(node)
# 删除节点
node.delete()
# 普通用户删除的文件夹内全部都得是自己有权限的资源
else:
nodes = Folder.objects.filter(id=self.data.get('id')).get_descendants(include_self=True)
for node in nodes:
# 删除相关的资源
source_ids = (Source.objects.filter(folder_id=node.id)
.annotate(id_str=Cast('id', TextField()))
.values_list('id_str', flat=True))
# 检查文件夹是否存在未授权当前用户的资源
auth_list = QuerySet(WorkspaceUserResourcePermission).filter(
Q(workspace_id=self.data.get('workspace_id')) &
Q(user_id=self.data.get('user_id')) &
Q(auth_target_type=self.data.get('source')) &
Q(target__in=source_ids) &
Q(permission_list__overlap=[ResourcePermission.MANAGE, ResourcePermissionRole.ROLE])
).count()
if auth_list != len(source_ids):
raise AppApiException(500, _('This folder contains resources that you dont have permission'))
self.delete_source(node)
node.delete()
def delete_source(self, node):
Source = get_source_type(self.data.get('source')) # noqa
source_ids = Source.objects.filter(folder_id=node.id).values_list('id', flat=True)
source = self.data.get('source')
for source_id in source_ids:
if source == Group.TOOL.name:
ToolSerializer.Operate(data={
'workspace_id': self.data.get('workspace_id'),
'id': source_id,
}).delete()
elif source == Group.APPLICATION.name:
ApplicationOperateSerializer(data={
'workspace_id': self.data.get('workspace_id'),
'application_id': source_id,
'user_id': self.data.get('user_id'),
}).delete()
elif source == Group.KNOWLEDGE.name:
KnowledgeSerializer.Operate(data={
'workspace_id': self.data.get('workspace_id'),
'knowledge_id': source_id,
'user_id': self.data.get('user_id'),
}).delete()
class FolderTreeSerializer(serializers.Serializer):
workspace_id = serializers.CharField(required=True, allow_null=True, allow_blank=True, label=_('workspace id'))
source = serializers.CharField(required=True, label=_('source'))
@staticmethod
def _check_tree_integrity(queryset):
"""检查树结构完整性"""
for folder in queryset:
if folder.lft >= folder.rght:
return True # 需要重建
if folder.is_leaf_node() and folder.get_children().exists():
return True # 需要重建
return False
def get_folder_tree(self,
current_user, name=None):
self.is_valid(raise_exception=True)
Folder = get_folder_type(self.data.get('source')) # noqa
# 检查特定工作空间的树结构完整性
workspace_folders = Folder.objects.filter(workspace_id=self.data.get('workspace_id'))
# 如果发现数据不一致,重建整个表(这是 MPTT 的限制)
if self._check_tree_integrity(workspace_folders):
Folder.objects.rebuild()
workspace_manage = is_workspace_manage(current_user.id, self.data.get('workspace_id'))
base_q = Q(workspace_id=self.data.get('workspace_id'))
if name is not None:
base_q &= Q(name__contains=name)
if not workspace_manage:
base_q &= (Q(id__in=WorkspaceUserResourcePermission.objects.filter(user_id=current_user.id,
auth_target_type=self.data.get('source'),
workspace_id=self.data.get(
'workspace_id'),
permission_list__contains=['VIEW'])
.values_list(
'target', flat=True)) | Q(id=self.data.get('workspace_id')))
nodes = Folder.objects.filter(base_q).get_cached_trees()
TreeSerializer = get_folder_tree_serializer(self.data.get('source')) # noqa
serializer = TreeSerializer(nodes, many=True)
return [d for d in serializer.data if d.get('id') == d.get('workspace_id')] # 这是可序列化的字典