From e758f015f34743027f486d9ce3965c558cebb4b1 Mon Sep 17 00:00:00 2001 From: shaohuzhang1 Date: Wed, 19 Nov 2025 16:32:11 +0800 Subject: [PATCH] feat: knowledge workflow --- apps/application/flow/i_step_node.py | 13 +++-- .../flow/knowledge_workflow_manage.py | 51 +++++++++++++++++++ .../impl/base_data_source_local_node.py | 12 +++++ .../migrations/0005_knowledgeaction.py | 32 ++++++++++++ apps/knowledge/models/knowledge_action.py | 49 ++++++++++++++++++ .../serializers/knowledge_workflow.py | 26 ++++++++-- apps/knowledge/urls.py | 3 +- apps/knowledge/views/knowledge_workflow.py | 12 ++++- ui/src/api/knowledge/knowledge.ts | 9 +++- .../component/action/Result.vue | 38 ++++++++++++++ .../component/action/index.vue | 18 +++++-- .../component/UserInputFieldTable.vue | 2 + 12 files changed, 250 insertions(+), 15 deletions(-) create mode 100644 apps/knowledge/migrations/0005_knowledgeaction.py create mode 100644 apps/knowledge/models/knowledge_action.py create mode 100644 ui/src/views/knowledge-workflow/component/action/Result.vue diff --git a/apps/application/flow/i_step_node.py b/apps/application/flow/i_step_node.py index 3ee7400b2..1eee330db 100644 --- a/apps/application/flow/i_step_node.py +++ b/apps/application/flow/i_step_node.py @@ -21,6 +21,7 @@ from application.flow.common import Answer, NodeChunk from application.models import ApplicationChatUserStats from application.models import ChatRecord, ChatUserType from common.field.common import InstanceField +from knowledge.models.knowledge_action import KnowledgeAction, State chat_cache = cache @@ -97,11 +98,13 @@ class WorkFlowPostHandler: class KnowledgeWorkflowPostHandler(WorkFlowPostHandler): - def __init__(self, chat_info): + def __init__(self, chat_info, knowledge_action_id): super().__init__(chat_info) + self.knowledge_action_id = knowledge_action_id def handler(self, workflow): - pass + QuerySet(KnowledgeAction).filter(id=self.knowledge_action_id).update( + state=State.SUCCESS) class NodeResult: @@ -161,7 +164,8 @@ class FlowParamsSerializer(serializers.Serializer): class KnowledgeFlowParamsSerializer(serializers.Serializer): - knowledge_id = serializers.CharField(required=True, label="知识库id") + knowledge_id = serializers.UUIDField(required=True, label="知识库id") + knowledge_action_id = serializers.UUIDField(required=True, label="知识库任务执行器id") data_source = serializers.DictField(required=True, label="数据源") knowledge_base = serializers.DictField(required=False, label="知识库设置") @@ -241,7 +245,8 @@ class INode: self.status = 500 self.answer_text = str(e) self.err_message = str(e) - self.context['run_time'] = time.time() - self.context['start_time'] + current_time = time.time() + self.context['run_time'] = current_time - (self.context.get('start_time') or current_time) def write_error_context(answer, status=200): pass diff --git a/apps/application/flow/knowledge_workflow_manage.py b/apps/application/flow/knowledge_workflow_manage.py index 184ae3d10..bba879aeb 100644 --- a/apps/application/flow/knowledge_workflow_manage.py +++ b/apps/application/flow/knowledge_workflow_manage.py @@ -6,12 +6,20 @@ @date:2025/11/13 19:02 @desc: """ +import traceback +from concurrent.futures import ThreadPoolExecutor + +from django.db.models import QuerySet +from django.utils.translation import get_language from application.flow.common import Workflow from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer from application.flow.workflow_manage import WorkflowManage from common.handle.base_to_response import BaseToResponse from common.handle.impl.response.system_to_response import SystemToResponse +from knowledge.models.knowledge_action import KnowledgeAction, State + +executor = ThreadPoolExecutor(max_workers=200) class KnowledgeWorkflowManage(WorkflowManage): @@ -33,3 +41,46 @@ class KnowledgeWorkflowManage(WorkflowManage): start_node_list = [node for node in self.flow.nodes if self.params.get('data_source', {}).get('node_id') == node.id] return start_node_list[0] + + def run(self): + executor.submit(self._run) + + def _run(self): + QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update( + state=State.STARTED) + language = get_language() + self.run_chain_async(self.start_node, None, language) + while self.is_run(): + pass + self.work_flow_post_handler.handler(self) + + def run_chain(self, current_node, node_result_future=None): + if node_result_future is None: + node_result_future = self.run_node_future(current_node) + try: + result = self.hand_node_result(current_node, node_result_future) + return result + except Exception as e: + traceback.print_exc() + return None + + def hand_node_result(self, current_node, node_result_future): + try: + current_result = node_result_future.result() + result = current_result.write_context(current_node, self) + if result is not None: + # 阻塞获取结果 + list(result) + return current_result + except Exception as e: + traceback.print_exc() + self.status = 500 + current_node.get_write_error_context(e) + self.answer += str(e) + QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update( + details=self.get_runtime_details(), + state=State.FAILURE) + finally: + current_node.node_chunk.end() + QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update( + details=self.get_runtime_details()) diff --git a/apps/application/flow/step_node/data_source_local_node/impl/base_data_source_local_node.py b/apps/application/flow/step_node/data_source_local_node/impl/base_data_source_local_node.py index 24da15898..b43263501 100644 --- a/apps/application/flow/step_node/data_source_local_node/impl/base_data_source_local_node.py +++ b/apps/application/flow/step_node/data_source_local_node/impl/base_data_source_local_node.py @@ -37,3 +37,15 @@ class BaseDataSourceLocalNode(IDataSourceLocalNode): def execute(self, file_type_list, file_size_limit, file_count_limit, **kwargs) -> NodeResult: return NodeResult({'file_list': self.workflow_manage.params.get('data_source', {}).get('file_list')}, self.workflow_manage.params.get('knowledge_base') or {}) + + def get_details(self, index: int, **kwargs): + return { + 'name': self.node.properties.get('stepName'), + "index": index, + 'run_time': self.context.get('run_time'), + 'type': self.node.type, + 'file_list': self.context.get('file_list'), + 'knowledge_base': self.workflow_params.get('knowledge_base'), + 'status': self.status, + 'err_message': self.err_message + } diff --git a/apps/knowledge/migrations/0005_knowledgeaction.py b/apps/knowledge/migrations/0005_knowledgeaction.py new file mode 100644 index 000000000..3633a8d23 --- /dev/null +++ b/apps/knowledge/migrations/0005_knowledgeaction.py @@ -0,0 +1,32 @@ +# Generated by Django 5.2.8 on 2025-11-19 06:06 + +import common.encoder.encoder +import django.db.models.deletion +import uuid_utils.compat +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('knowledge', '0004_alter_document_type_alter_knowledge_type_and_more'), + ] + + operations = [ + migrations.CreateModel( + name='KnowledgeAction', + fields=[ + ('create_time', models.DateTimeField(auto_now_add=True, db_index=True, verbose_name='创建时间')), + ('update_time', models.DateTimeField(auto_now=True, db_index=True, verbose_name='修改时间')), + ('id', models.UUIDField(default=uuid_utils.compat.uuid7, editable=False, primary_key=True, serialize=False, verbose_name='主键id')), + ('state', models.CharField(choices=[('PENDING', 'Pending'), ('STARTED', 'Started'), ('SUCCESS', 'Success'), ('FAILURE', 'Failure'), ('REVOKE', 'Revoke'), ('REVOKED', 'Revoked')], default='STARTED', max_length=20, verbose_name='状态')), + ('details', models.JSONField(default=dict, encoder=common.encoder.encoder.SystemEncoder, verbose_name='执行详情')), + ('run_time', models.FloatField(default=0, verbose_name='运行时长')), + ('meta', models.JSONField(default=dict, verbose_name='元数据')), + ('knowledge', models.ForeignKey(db_constraint=False, on_delete=django.db.models.deletion.DO_NOTHING, to='knowledge.knowledge', verbose_name='知识库')), + ], + options={ + 'db_table': 'knowledge_action', + }, + ), + ] diff --git a/apps/knowledge/models/knowledge_action.py b/apps/knowledge/models/knowledge_action.py new file mode 100644 index 000000000..0825415b3 --- /dev/null +++ b/apps/knowledge/models/knowledge_action.py @@ -0,0 +1,49 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: knowledge_action.py + @date:2025/11/18 17:59 + @desc: +""" +import uuid_utils.compat as uuid + +from django.db import models + +from common.encoder.encoder import SystemEncoder +from common.mixins.app_model_mixin import AppModelMixin +from knowledge.models import Knowledge + + +class State(models.TextChoices): + # 等待 + PENDING = 'PENDING' + # 执行中 + STARTED = 'STARTED' + # 成功 + SUCCESS = 'SUCCESS' + # 失败 + FAILURE = 'FAILURE' + # 取消任务 + REVOKE = 'REVOKE' + # 取消成功 + REVOKED = 'REVOKED' + + +class KnowledgeAction(AppModelMixin): + id = models.UUIDField(primary_key=True, max_length=128, default=uuid.uuid7, editable=False, verbose_name="主键id") + + knowledge = models.ForeignKey(Knowledge, on_delete=models.DO_NOTHING, verbose_name="知识库", db_constraint=False) + + state = models.CharField(verbose_name='状态', max_length=20, + choices=State.choices, + default=State.STARTED) + + details = models.JSONField(verbose_name="执行详情", default=dict, encoder=SystemEncoder) + + run_time = models.FloatField(verbose_name="运行时长", default=0) + + meta = models.JSONField(verbose_name="元数据", default=dict) + + class Meta: + db_table = "knowledge_action" diff --git a/apps/knowledge/serializers/knowledge_workflow.py b/apps/knowledge/serializers/knowledge_workflow.py index 9748f9757..04ac6fa82 100644 --- a/apps/knowledge/serializers/knowledge_workflow.py +++ b/apps/knowledge/serializers/knowledge_workflow.py @@ -14,6 +14,7 @@ from application.flow.knowledge_workflow_manage import KnowledgeWorkflowManage from application.flow.step_node import get_node from common.exception.app_exception import AppApiException from knowledge.models import KnowledgeScope, Knowledge, KnowledgeType, KnowledgeWorkflow +from knowledge.models.knowledge_action import KnowledgeAction, State from knowledge.serializers.knowledge import KnowledgeModelSerializer from system_manage.models import AuthTargetType from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer @@ -34,13 +35,30 @@ class KnowledgeWorkflowActionSerializer(serializers.Serializer): if with_valid: self.is_valid(raise_exception=True) knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id")).first() + knowledge_action_id = uuid.uuid7() + KnowledgeAction(id=knowledge_action_id, knowledge_id=self.data.get("knowledge_id"), state=State.STARTED).save() work_flow_manage = KnowledgeWorkflowManage( Workflow.new_instance(knowledge_workflow.work_flow, WorkflowMode.KNOWLEDGE), - {'knowledge_id': self.data.get("knowledge_id"), 'stream': True, + {'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True, **instance}, - KnowledgeWorkflowPostHandler(None)) - r = work_flow_manage.run() - return r + KnowledgeWorkflowPostHandler(None, knowledge_action_id)) + work_flow_manage.run() + return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED, + 'details': {}} + + class Operate(serializers.Serializer): + workspace_id = serializers.CharField(required=True, label=_('workspace id')) + knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id')) + id = serializers.UUIDField(required=True, label=_('knowledge action id')) + + def one(self, is_valid=True): + if is_valid: + self.is_valid(raise_exception=True) + knowledge_action_id = self.data.get("id") + knowledge_action = QuerySet(KnowledgeAction).filter(id=knowledge_action_id).first() + return {'id': knowledge_action_id, 'knowledge_id': knowledge_action.knowledge_id, + 'state': knowledge_action.state, + 'details': knowledge_action.details} class KnowledgeWorkflowSerializer(serializers.Serializer): diff --git a/apps/knowledge/urls.py b/apps/knowledge/urls.py index 331539ed8..5cbec1b78 100644 --- a/apps/knowledge/urls.py +++ b/apps/knowledge/urls.py @@ -70,6 +70,7 @@ urlpatterns = [ path('workspace//knowledge//document//', views.DocumentView.Page.as_view()), path('workspace//knowledge//', views.KnowledgeView.Page.as_view()), path('workspace//knowledge//form_list//', views.KnowledgeWorkflowFormView.as_view()), - path('workspace//knowledge//action', views.KnowledgeWorkflowActionView.as_view()) + path('workspace//knowledge//action', views.KnowledgeWorkflowActionView.as_view()), + path('workspace//knowledge//action/', views.KnowledgeWorkflowActionView.Operate.as_view()) ] diff --git a/apps/knowledge/views/knowledge_workflow.py b/apps/knowledge/views/knowledge_workflow.py index c4b0fd1f6..f0b0d0c26 100644 --- a/apps/knowledge/views/knowledge_workflow.py +++ b/apps/knowledge/views/knowledge_workflow.py @@ -27,8 +27,16 @@ class KnowledgeWorkflowActionView(APIView): authentication_classes = [TokenAuth] def post(self, request: Request, workspace_id: str, knowledge_id: str): - return KnowledgeWorkflowActionSerializer( - data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id}).action(request.data, True) + return result.success(KnowledgeWorkflowActionSerializer( + data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id}).action(request.data, True)) + + class Operate(APIView): + authentication_classes = [TokenAuth] + + def get(self, request, workspace_id: str, knowledge_id: str, knowledge_action_id: str): + return result.success(KnowledgeWorkflowActionSerializer.Operate( + data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id, 'id': knowledge_action_id}) + .one()) class KnowledgeWorkflowView(APIView): diff --git a/ui/src/api/knowledge/knowledge.ts b/ui/src/api/knowledge/knowledge.ts index d7436e3f8..22ad4995b 100644 --- a/ui/src/api/knowledge/knowledge.ts +++ b/ui/src/api/knowledge/knowledge.ts @@ -337,7 +337,13 @@ const workflowAction: ( ) => Promise> = (knowledge_id: string, instance, loading) => { return post(`${prefix.value}/${knowledge_id}/action`, instance, {}, loading) } - +const getWorkflowAction: ( + knowledge_id: string, + knowledge_action_id: string, + loading?: Ref, +) => Promise> = (knowledge_id: string, knowledge_action_id, loading) => { + return get(`${prefix.value}/${knowledge_id}/action/${knowledge_action_id}`, {}, loading) +} export default { getKnowledgeList, getKnowledgeListPage, @@ -364,4 +370,5 @@ export default { createWorkflowKnowledge, getKnowledgeWorkflowFormList, workflowAction, + getWorkflowAction, } diff --git a/ui/src/views/knowledge-workflow/component/action/Result.vue b/ui/src/views/knowledge-workflow/component/action/Result.vue new file mode 100644 index 000000000..9653dbebb --- /dev/null +++ b/ui/src/views/knowledge-workflow/component/action/Result.vue @@ -0,0 +1,38 @@ + + + diff --git a/ui/src/views/knowledge-workflow/component/action/index.vue b/ui/src/views/knowledge-workflow/component/action/index.vue index feb019d8b..64ea50b16 100644 --- a/ui/src/views/knowledge-workflow/component/action/index.vue +++ b/ui/src/views/knowledge-workflow/component/action/index.vue @@ -1,7 +1,13 @@