diff --git a/apps/application/__init__.py b/apps/application/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/application/admin.py b/apps/application/admin.py new file mode 100644 index 000000000..8c38f3f3d --- /dev/null +++ b/apps/application/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/apps/application/apps.py b/apps/application/apps.py new file mode 100644 index 000000000..30c0916b0 --- /dev/null +++ b/apps/application/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class ApplicationConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'application' diff --git a/apps/application/migrations/__init__.py b/apps/application/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/application/models/__init__.py b/apps/application/models/__init__.py new file mode 100644 index 000000000..898bb79df --- /dev/null +++ b/apps/application/models/__init__.py @@ -0,0 +1,8 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: __init__.py + @date:2025/5/7 15:14 + @desc: +""" diff --git a/apps/application/models/application.py b/apps/application/models/application.py new file mode 100644 index 000000000..bfa431f8f --- /dev/null +++ b/apps/application/models/application.py @@ -0,0 +1,8 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: application.py + @date:2025/5/7 15:29 + @desc: +""" diff --git a/apps/application/tests.py b/apps/application/tests.py new file mode 100644 index 000000000..7ce503c2d --- /dev/null +++ b/apps/application/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/apps/application/views/__init__.py b/apps/application/views/__init__.py new file mode 100644 index 000000000..9b6036777 --- /dev/null +++ b/apps/application/views/__init__.py @@ -0,0 +1,8 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: __init__.py + @date:2025/5/9 18:51 + @desc: +""" diff --git a/apps/common/utils/common.py b/apps/common/utils/common.py index 4a741852f..e0bf38c75 100644 --- a/apps/common/utils/common.py +++ b/apps/common/utils/common.py @@ -265,6 +265,7 @@ def parse_md_image(content: str): image_list = [match.group() for match in matches] return image_list + def bulk_create_in_batches(model, data, batch_size=1000): if len(data) == 0: return diff --git a/apps/workflow/__init__.py b/apps/workflow/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/workflow/admin.py b/apps/workflow/admin.py new file mode 100644 index 000000000..8c38f3f3d --- /dev/null +++ b/apps/workflow/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/apps/workflow/apps.py b/apps/workflow/apps.py new file mode 100644 index 000000000..da0a58b32 --- /dev/null +++ b/apps/workflow/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class WorkflowConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'workflow' diff --git a/apps/workflow/migrations/__init__.py b/apps/workflow/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/workflow/models/__init__.py b/apps/workflow/models/__init__.py new file mode 100644 index 000000000..3f7762ff3 --- /dev/null +++ b/apps/workflow/models/__init__.py @@ -0,0 +1,8 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: __init__.py + @date:2025/5/7 15:43 + @desc: +""" diff --git a/apps/workflow/models/workflow.py b/apps/workflow/models/workflow.py new file mode 100644 index 000000000..74df03030 --- /dev/null +++ b/apps/workflow/models/workflow.py @@ -0,0 +1,30 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: workflow.py + @date:2025/5/7 15:44 + @desc: +""" +from django.db import models +import uuid_utils.compat as uuid + + +class WorkflowType(models.TextChoices): + # 应用 + APPLICATION = "APPLICATION" + # 知识库 + KNOWLEDGE = "KNOWLEDGE" + # .... + + +class Workflow(models.Model): + id = models.UUIDField(primary_key=True, max_length=128, default=uuid.uuid7, editable=False, verbose_name="主键id") + workflow = models.JSONField(verbose_name="工作流数据", default=dict) + type = models.CharField(verbose_name="工作流类型", choices=WorkflowType.choices, default=WorkflowType.APPLICATION) + create_time = models.DateTimeField(verbose_name="创建时间", auto_now_add=True) + update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True) + + class Meta: + db_table = "workflow" + ordering = ['update_time'] diff --git a/apps/workflow/tests.py b/apps/workflow/tests.py new file mode 100644 index 000000000..7ce503c2d --- /dev/null +++ b/apps/workflow/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/apps/workflow/views/__init__.py b/apps/workflow/views/__init__.py new file mode 100644 index 000000000..29c531629 --- /dev/null +++ b/apps/workflow/views/__init__.py @@ -0,0 +1,8 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: __init__.py.py + @date:2025/5/7 15:43 + @desc: +""" diff --git a/apps/workflow/workflow/__init__.py b/apps/workflow/workflow/__init__.py new file mode 100644 index 000000000..a386ef230 --- /dev/null +++ b/apps/workflow/workflow/__init__.py @@ -0,0 +1,8 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: __init__.py + @date:2025/5/7 16:15 + @desc: +""" diff --git a/apps/workflow/workflow/common.py b/apps/workflow/workflow/common.py new file mode 100644 index 000000000..818f59c50 --- /dev/null +++ b/apps/workflow/workflow/common.py @@ -0,0 +1,214 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: workflow.py + @date:2025/5/9 10:58 + @desc: +""" +from typing import List, Dict +from queue import Queue, Empty + +from common.utils.common import group_by + + +class Content: + def __init__(self, content: str, reasoning_content: str, **kwargs): + """ + 内容 + @param content: ai响应内容 + @param reasoning_content:思考过程 + @param kwargs: 其他参数 + """ + self.content = content + self.reasoning_content = reasoning_content + for key in kwargs: + self.__setattr__(key, kwargs.get(key)) + + +class Chunk: + + def __init__(self, runtime_id: str, node_id: str, node_name: str, content: Content, node_data, children, loop_index, + **kwargs): + """ + + @param runtime_id: 运行时id + @param node_id: 节点id + @param node_name: 节点名称 + @param loop_index: 循环下标 + @param children: 子块 + @param node_data 节点数据 + @param content: 内容 + """ + self.runtime_id = runtime_id + self.node_id = node_id + self.node_name = node_name + self.loop_index = loop_index + self.children = children + self.content = content + self.node_data = node_data + for key in kwargs: + self.__setattr__(key, kwargs.get(key)) + + +class Channel: + """ + 对话管道 + """ + messages = Queue() + is_end = False + + def write(self, message): + if isinstance(message, Channel) | isinstance(message, Chunk): + if self.is_end: + raise "通道已关闭" + self.messages.put(message) + else: + raise "不支持的管道参数" + + def end(self): + self.is_end = True + return self.messages.put(None) + + def pop(self): + if self.is_end: + return self.messages.get_nowait() + return self.messages.get() + + def generator(self): + while True: + try: + message = self.pop() + if message: + if isinstance(message, Channel): + for chunk in message.generator(): + yield chunk + else: + yield message + except Empty: + return + + +class Node: + + def __init__(self, _id: str, _type: str, x: int, y: int, properties: dict, **kwargs): + """ + + @param _id: 节点id + @param _type: 类型 + @param x: 节点x轴位置 + @param y: 节点y轴位置 + @param properties: + @param kwargs: + """ + self.id = _id + self.type = _type + self.x = x + self.y = y + self.properties = properties + for keyword in kwargs: + self.__setattr__(keyword, kwargs.get(keyword)) + + +class Edge: + def __init__(self, _id: str, _type: str, sourceNodeId: str, targetNodeId: str, **keywords): + """ + 线 + @param _id: 线id + @param _type: 线类型 + @param sourceNodeId: + @param targetNodeId: + @param keywords: + """ + self.id = _id + self.type = _type + self.sourceNodeId = sourceNodeId + self.targetNodeId = targetNodeId + for keyword in keywords: + self.__setattr__(keyword, keywords.get(keyword)) + + +class EdgeNode: + edge: Edge + node: Node + + def __init__(self, edge, node): + self.edge = edge + self.node = node + + +class Workflow: + """ + 节点列表 + """ + nodes: List[Node] + """ + 线列表 + """ + edges: List[Edge] + """ + 节点id:node + """ + node_map: Dict[str, Node] + """ + 节点id:当前节点id上面的所有节点 + """ + up_node_map: Dict[str, List[EdgeNode]] + """ + 节点id:当前节点id下面的所有节点 + """ + next_node_map: Dict[str, List[EdgeNode]] + + def __init__(self, nodes: List[Node], edges: List[Edge]): + self.nodes = nodes + self.edges = edges + self.node_map = {node.id: node for node in nodes} + + self.up_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.sourceNodeId)) for + edge in edges] for + key, edges in + group_by(edges, key=lambda edge: edge.targetNodeId).items()} + + self.next_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.targetNodeId)) for edge in edges] for + key, edges in + group_by(edges, key=lambda edge: edge.sourceNodeId).items()} + + def get_node(self, node_id): + """ + 根据node_id 获取节点信息 + @param node_id: node_id + @return: 节点信息 + """ + return self.node_map.get(node_id) + + def get_up_edge_nodes(self, node_id) -> List[EdgeNode]: + """ + 根据节点id 获取当前连接前置节点和连线 + @param node_id: 节点id + @return: 节点连线列表 + """ + return self.up_node_map.get(node_id) + + def get_next_edge_nodes(self, node_id) -> List[EdgeNode]: + """ + 根据节点id 获取当前连接目标节点和连线 + @param node_id: 节点id + @return: 节点连线列表 + """ + return self.next_node_map.get(node_id) + + def get_up_nodes(self, node_id) -> List[Node]: + """ + 根据节点id 获取当前连接前置节点 + @param node_id: 节点id + @return: 节点列表 + """ + return [en.node for en in self.up_node_map.get(node_id)] + + def get_next_nodes(self, node_id) -> List[Node]: + """ + 根据节点id 获取当前连接目标节点 + @param node_id: 节点id + @return: 节点列表 + """ + return [en.node for en in self.next_node_map.get(node_id, [])] diff --git a/apps/workflow/workflow/i_node.py b/apps/workflow/workflow/i_node.py new file mode 100644 index 000000000..fec0ac048 --- /dev/null +++ b/apps/workflow/workflow/i_node.py @@ -0,0 +1,54 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: i_node.py + @date:2025/5/7 16:41 + @desc: +""" +import time +from abc import abstractmethod + +from common.utils.common import get_sha256_hash +from workflow.workflow.common import Channel, Chunk + + +class INode: + # 当前节点支持的工作流类型 + supported_workflow_type_list = [] + # 节点类型 + type = None + # 节点管道 + channel = Channel() + + def __init__(self, node, workflow_manage, chunk: Chunk = None, up_node_id_list=None, loop_index=None): + self.node = node + self.chunk = chunk + if chunk is not None: + self.context = chunk.node_data | {} + else: + self.context = {} + # 运行时id + self.runtime_node_id = get_sha256_hash("".join(up_node_id_list | []) + node.id + str(loop_index | "")) + self.workflow_manage = workflow_manage + self.node_serializer = self.get_node_serializer()(data=node.properties.get('node_data')) + self.is_valid() + + def is_valid(self): + self.node_serializer.is_valid(raise_exception=True) + + def execute(self, **kwargs): + pass + + def run(self): + start_time = time.time() + self.context['start_time'] = start_time + self._run() + self.context['run_time'] = time.time() - start_time + + def _run(self): + return self.execute(**self.node_serializer.data) + + @abstractmethod + def get_node_serializer(self): + pass diff --git a/apps/workflow/workflow/nodes/__init__.py b/apps/workflow/workflow/nodes/__init__.py new file mode 100644 index 000000000..9dfe7d2fd --- /dev/null +++ b/apps/workflow/workflow/nodes/__init__.py @@ -0,0 +1,8 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: __init__.py.py + @date:2025/5/7 16:15 + @desc: +""" diff --git a/apps/workflow/workflow/tools.py b/apps/workflow/workflow/tools.py new file mode 100644 index 000000000..1cc71c5df --- /dev/null +++ b/apps/workflow/workflow/tools.py @@ -0,0 +1,10 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: tools.py + @date:2025/5/7 18:44 + @desc: +""" + + diff --git a/apps/workflow/workflow/workflow_manage.py b/apps/workflow/workflow/workflow_manage.py new file mode 100644 index 000000000..87d69879e --- /dev/null +++ b/apps/workflow/workflow/workflow_manage.py @@ -0,0 +1,45 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: workflow_manage.py + @date:2025/5/9 10:30 + @desc: +""" +from builtins import function +from enum import Enum +from typing import List, Dict + +from workflow.workflow.common import Workflow, Channel, Chunk + + +class WorkflowType(Enum): + # 应用 + APPLICATION = "APPLICATION" + # 知识库 + KNOWLEDGE = "KNOWLEDGE" + # .... + + +class WorkflowManage: + channel = Channel() + + def __init__(self, + workflow: Workflow, + chunk_list: List[Chunk], + start_node: Chunk, + workflow_type: WorkflowType, + body: Dict, + consumer: function): + self.workflow = workflow + self.chunk_list = chunk_list + self.start_node = start_node + self.workflow_type = workflow_type + self.body = body + self.consumer = consumer + + def stream(self): + pass + + def invoke(self): + pass