MaxKB/apps/knowledge/serializers/knowledge_workflow.py
shaohuzhang1 bfae088df6
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
Typos Check / Spell Check with Typos (push) Waiting to run
feat: knowledge workflow (#4399)
* feat: init knowledge workflow

* feat: add knowledge workflow and version models, serializers, and API views

* feat: knowledge workflow

* feat: knowledge workflow

* feat: add KnowledgeWorkflowModelSerializer and Operate class for workflow management

* fix: route

* feat: knowledge workflow

* feat: Knowledge workflow permission

* feat: knowledge workflow

* feat: knowledge workflow

* feat: knowledge workflow

* feat: knowledge workflow

* feat: Data source web node

* fix: Back route

* feat: knowledge workflow

* feat: knowledge workflow

* feat: Knowledge write node

* feat: add Data Source tool functionality and localization

* feat: add Data Source tool functionality and localization

* feat: knowledge workflow

* feat: knowledge workflow

* fix: simplify export tool permission check in ToolListContainer.vue

* fix: simplify export condition in ToolResourceIndex.vue

* fix: simplify condition for copying tool in ToolListContainer

* feat: knowledge workflow

* fix: Upload local files and add output fields

* feat: Knowledge write

* feat: add Document Split Node functionality and localization

* feat: add Document Split Node functionality and localization

* feat: Knowledge write

* feat: enhance Document Split Node with result processing and problem list generation

* fix: Allow problem be blank

* feat: enhance Document Split Node with result processing and problem list generation

* feat: tool datasource

* fix: Optimization of knowledge base workflow execution logic

* refactor: streamline image handling by updating application and knowledge ID management

* refactor: streamline image handling by updating application and knowledge ID management

* feat: extend support modes in variable aggregation node to include knowledge workflows

* feat: Chunks stored

* refactor: simplify file handling in document extraction by removing unnecessary byte conversion and enhancing file saving logic

* refactor: update file ID assignment in document extraction to use provided metadata

* feat: Workflow menu that distinguishes between applications and knowledge bases

* refactor: update file ID assignment in document extraction to use provided metadata

* fix: Add workspace ID as workflow execution parameter

* feat: add code template for Data Source tool form functionality

* refactor: remove unused sys import and improve module handling

* feat: Execution details support loading status

* refactor: update tool type handling and improve category merging logic

* feat: Alter fork depth

* fix: ensure filterList is properly initialized and updated in getList function

* refactor: simplify ToolStoreDialog by removing unused toolType logic

* perf: Optimize the style

* style: adjust div width for improved layout in Tree component

* refactor: improve polling mechanism for knowledge workflow action

* fix: Get workspace_id from workflow params

* fix: filter out 'file_bytes' from result in get_details method

* feat: add recursive filtering for file_bytes in context data

* fix: append results to paragraph_list instead of replacing it

* perf: Optimize translation files

* fix: include document name in bytes_to_uploaded_file call for better file handling

* refactor: optimize buffer retrieval in document processing

* refactor: remove redundant parameter from bytes_to_uploaded_file call

* fix: Page style optimization

* feat: add slider for setting limit in document rules form

* feat: add workflow knowledge management endpoints and related functionality

* fix: swap file size and file count limits in form inputs

* refactor: update tool_config args to use list format for improved readability

* feat: Node supports knowledge base workflow

* feat: Node supports knowledge base workflow

* fix: Basic node data cannot be obtained in the workflow

* style: Knowledge base workflow debugging page style adjustment

* fix: Loop nodes cannot be used in the knowledge base workflow

* fix: Knowledge base workflow variable assignment node

* feat: add chunk size slider to form for custom split strategy

* fix: Workflow style optimization

---------

Co-authored-by: CaptainB <bin@fit2cloud.com>
Co-authored-by: zhangzhanwei <zhanwei.zhang@fit2cloud.com>
Co-authored-by: wangdan-fit2cloud <dan.wang@fit2cloud.com>
2025-11-28 15:38:20 +08:00

187 lines
8.8 KiB
Python

# coding=utf-8
import asyncio
import json
from typing import Dict
import uuid_utils.compat as uuid
from django.db import transaction
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from application.flow.common import Workflow, WorkflowMode
from application.flow.i_step_node import KnowledgeWorkflowPostHandler
from application.flow.knowledge_workflow_manage import KnowledgeWorkflowManage
from application.flow.step_node import get_node
from application.serializers.application import get_mcp_tools
from common.exception.app_exception import AppApiException
from common.utils.rsa_util import rsa_long_decrypt
from common.utils.tool_code import ToolExecutor
from knowledge.models import KnowledgeScope, Knowledge, KnowledgeType, KnowledgeWorkflow
from knowledge.models.knowledge_action import KnowledgeAction, State
from knowledge.serializers.knowledge import KnowledgeModelSerializer
from maxkb.const import CONFIG
from system_manage.models import AuthTargetType
from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
from tools.models import Tool
tool_executor = ToolExecutor(CONFIG.get('SANDBOX'))
class KnowledgeWorkflowModelSerializer(serializers.ModelSerializer):
class Meta:
model = KnowledgeWorkflow
fields = '__all__'
class KnowledgeWorkflowActionSerializer(serializers.Serializer):
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
def action(self, instance: Dict, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id")).first()
knowledge_action_id = uuid.uuid7()
KnowledgeAction(id=knowledge_action_id, knowledge_id=self.data.get("knowledge_id"), state=State.STARTED).save()
work_flow_manage = KnowledgeWorkflowManage(
Workflow.new_instance(knowledge_workflow.work_flow, WorkflowMode.KNOWLEDGE),
{'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True,
'workspace_id': self.data.get("workspace_id"),
**instance},
KnowledgeWorkflowPostHandler(None, knowledge_action_id))
work_flow_manage.run()
return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED,
'details': {}}
class Operate(serializers.Serializer):
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
id = serializers.UUIDField(required=True, label=_('knowledge action id'))
def one(self, is_valid=True):
if is_valid:
self.is_valid(raise_exception=True)
knowledge_action_id = self.data.get("id")
knowledge_action = QuerySet(KnowledgeAction).filter(id=knowledge_action_id).first()
return {'id': knowledge_action_id, 'knowledge_id': knowledge_action.knowledge_id,
'state': knowledge_action.state,
'details': knowledge_action.details}
class KnowledgeWorkflowSerializer(serializers.Serializer):
class Datasource(serializers.Serializer):
type = serializers.CharField(required=True, label=_('type'))
id = serializers.CharField(required=True, label=_('type'))
params = serializers.DictField(required=True, label="")
function_name = serializers.CharField(required=True, label=_('function_name'))
def action(self):
self.is_valid(raise_exception=True)
if self.data.get('type') == 'local':
node = get_node(self.data.get('id'), WorkflowMode.KNOWLEDGE)
return node.__getattribute__(node, self.data.get("function_name"))(**self.data.get("params"))
elif self.data.get('type') == 'tool':
tool = QuerySet(Tool).filter(id=self.data.get("id")).first()
init_params = json.loads(rsa_long_decrypt(tool.init_params))
return tool_executor.exec_code(tool.code, {**init_params, **self.data.get('params')},
self.data.get('function_name'))
class Create(serializers.Serializer):
user_id = serializers.UUIDField(required=True, label=_('user id'))
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
scope = serializers.ChoiceField(
required=False, label=_('scope'), default=KnowledgeScope.WORKSPACE, choices=KnowledgeScope.choices
)
@transaction.atomic
def save_workflow(self, instance: Dict):
self.is_valid(raise_exception=True)
folder_id = instance.get('folder_id', self.data.get('workspace_id'))
if QuerySet(Knowledge).filter(
workspace_id=self.data.get('workspace_id'), folder_id=folder_id, name=instance.get('name')
).exists():
raise AppApiException(500, _('Knowledge base name duplicate!'))
knowledge_id = uuid.uuid7()
knowledge = Knowledge(
id=knowledge_id,
name=instance.get('name'),
desc=instance.get('desc'),
user_id=self.data.get('user_id'),
type=instance.get('type', KnowledgeType.WORKFLOW),
scope=self.data.get('scope', KnowledgeScope.WORKSPACE),
folder_id=folder_id,
workspace_id=self.data.get('workspace_id'),
embedding_model_id=instance.get('embedding_model_id'),
meta={},
)
knowledge.save()
# 自动资源给授权当前用户
UserResourcePermissionSerializer(data={
'workspace_id': self.data.get('workspace_id'),
'user_id': self.data.get('user_id'),
'auth_target_type': AuthTargetType.KNOWLEDGE.value
}).auth_resource(str(knowledge_id))
knowledge_workflow = KnowledgeWorkflow(
id=uuid.uuid7(),
knowledge_id=knowledge_id,
workspace_id=self.data.get('workspace_id'),
work_flow=instance.get('work_flow', {}),
)
knowledge_workflow.save()
return {**KnowledgeModelSerializer(knowledge).data, 'document_list': []}
class Operate(serializers.Serializer):
user_id = serializers.UUIDField(required=True, label=_('user id'))
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
def edit(self, instance: Dict):
pass
def one(self):
self.is_valid(raise_exception=True)
workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get('knowledge_id')).first()
return {**KnowledgeWorkflowModelSerializer(workflow).data}
class McpServersSerializer(serializers.Serializer):
mcp_servers = serializers.JSONField(required=True)
class KnowledgeWorkflowMcpSerializer(serializers.Serializer):
knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
user_id = serializers.UUIDField(required=True, label=_("User ID"))
workspace_id = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_("Workspace ID"))
def is_valid(self, *, raise_exception=False):
super().is_valid(raise_exception=True)
workspace_id = self.data.get('workspace_id')
query_set = QuerySet(Knowledge).filter(id=self.data.get('knowledge_id'))
if workspace_id:
query_set = query_set.filter(workspace_id=workspace_id)
if not query_set.exists():
raise AppApiException(500, _('Knowledge id does not exist'))
def get_mcp_servers(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
McpServersSerializer(data=instance).is_valid(raise_exception=True)
servers = json.loads(instance.get('mcp_servers'))
for server, config in servers.items():
if config.get('transport') not in ['sse', 'streamable_http']:
raise AppApiException(500, _('Only support transport=sse or transport=streamable_http'))
tools = []
for server in servers:
tools += [
{
'server': server,
'name': tool.name,
'description': tool.description,
'args_schema': tool.args_schema,
}
for tool in asyncio.run(get_mcp_tools({server: servers[server]}))]
return tools