mirror of
https://github.com/1Panel-dev/MaxKB.git
synced 2025-12-26 01:33:05 +00:00
feat: workflow init (#3072)
Some checks failed
sync2gitee / repo-sync (push) Has been cancelled
Some checks failed
sync2gitee / repo-sync (push) Has been cancelled
This commit is contained in:
parent
f6ccc95240
commit
5ebfe9b5dd
|
|
@ -0,0 +1,3 @@
|
|||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class ApplicationConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'application'
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: __init__.py
|
||||
@date:2025/5/7 15:14
|
||||
@desc:
|
||||
"""
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: application.py
|
||||
@date:2025/5/7 15:29
|
||||
@desc:
|
||||
"""
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: __init__.py
|
||||
@date:2025/5/9 18:51
|
||||
@desc:
|
||||
"""
|
||||
|
|
@ -265,6 +265,7 @@ def parse_md_image(content: str):
|
|||
image_list = [match.group() for match in matches]
|
||||
return image_list
|
||||
|
||||
|
||||
def bulk_create_in_batches(model, data, batch_size=1000):
|
||||
if len(data) == 0:
|
||||
return
|
||||
|
|
|
|||
|
|
@ -0,0 +1,3 @@
|
|||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class WorkflowConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'workflow'
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: __init__.py
|
||||
@date:2025/5/7 15:43
|
||||
@desc:
|
||||
"""
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: workflow.py
|
||||
@date:2025/5/7 15:44
|
||||
@desc:
|
||||
"""
|
||||
from django.db import models
|
||||
import uuid_utils.compat as uuid
|
||||
|
||||
|
||||
class WorkflowType(models.TextChoices):
|
||||
# 应用
|
||||
APPLICATION = "APPLICATION"
|
||||
# 知识库
|
||||
KNOWLEDGE = "KNOWLEDGE"
|
||||
# ....
|
||||
|
||||
|
||||
class Workflow(models.Model):
|
||||
id = models.UUIDField(primary_key=True, max_length=128, default=uuid.uuid7, editable=False, verbose_name="主键id")
|
||||
workflow = models.JSONField(verbose_name="工作流数据", default=dict)
|
||||
type = models.CharField(verbose_name="工作流类型", choices=WorkflowType.choices, default=WorkflowType.APPLICATION)
|
||||
create_time = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
|
||||
update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True)
|
||||
|
||||
class Meta:
|
||||
db_table = "workflow"
|
||||
ordering = ['update_time']
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: __init__.py.py
|
||||
@date:2025/5/7 15:43
|
||||
@desc:
|
||||
"""
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: __init__.py
|
||||
@date:2025/5/7 16:15
|
||||
@desc:
|
||||
"""
|
||||
|
|
@ -0,0 +1,214 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: workflow.py
|
||||
@date:2025/5/9 10:58
|
||||
@desc:
|
||||
"""
|
||||
from typing import List, Dict
|
||||
from queue import Queue, Empty
|
||||
|
||||
from common.utils.common import group_by
|
||||
|
||||
|
||||
class Content:
|
||||
def __init__(self, content: str, reasoning_content: str, **kwargs):
|
||||
"""
|
||||
内容
|
||||
@param content: ai响应内容
|
||||
@param reasoning_content:思考过程
|
||||
@param kwargs: 其他参数
|
||||
"""
|
||||
self.content = content
|
||||
self.reasoning_content = reasoning_content
|
||||
for key in kwargs:
|
||||
self.__setattr__(key, kwargs.get(key))
|
||||
|
||||
|
||||
class Chunk:
|
||||
|
||||
def __init__(self, runtime_id: str, node_id: str, node_name: str, content: Content, node_data, children, loop_index,
|
||||
**kwargs):
|
||||
"""
|
||||
|
||||
@param runtime_id: 运行时id
|
||||
@param node_id: 节点id
|
||||
@param node_name: 节点名称
|
||||
@param loop_index: 循环下标
|
||||
@param children: 子块
|
||||
@param node_data 节点数据
|
||||
@param content: 内容
|
||||
"""
|
||||
self.runtime_id = runtime_id
|
||||
self.node_id = node_id
|
||||
self.node_name = node_name
|
||||
self.loop_index = loop_index
|
||||
self.children = children
|
||||
self.content = content
|
||||
self.node_data = node_data
|
||||
for key in kwargs:
|
||||
self.__setattr__(key, kwargs.get(key))
|
||||
|
||||
|
||||
class Channel:
|
||||
"""
|
||||
对话管道
|
||||
"""
|
||||
messages = Queue()
|
||||
is_end = False
|
||||
|
||||
def write(self, message):
|
||||
if isinstance(message, Channel) | isinstance(message, Chunk):
|
||||
if self.is_end:
|
||||
raise "通道已关闭"
|
||||
self.messages.put(message)
|
||||
else:
|
||||
raise "不支持的管道参数"
|
||||
|
||||
def end(self):
|
||||
self.is_end = True
|
||||
return self.messages.put(None)
|
||||
|
||||
def pop(self):
|
||||
if self.is_end:
|
||||
return self.messages.get_nowait()
|
||||
return self.messages.get()
|
||||
|
||||
def generator(self):
|
||||
while True:
|
||||
try:
|
||||
message = self.pop()
|
||||
if message:
|
||||
if isinstance(message, Channel):
|
||||
for chunk in message.generator():
|
||||
yield chunk
|
||||
else:
|
||||
yield message
|
||||
except Empty:
|
||||
return
|
||||
|
||||
|
||||
class Node:
|
||||
|
||||
def __init__(self, _id: str, _type: str, x: int, y: int, properties: dict, **kwargs):
|
||||
"""
|
||||
|
||||
@param _id: 节点id
|
||||
@param _type: 类型
|
||||
@param x: 节点x轴位置
|
||||
@param y: 节点y轴位置
|
||||
@param properties:
|
||||
@param kwargs:
|
||||
"""
|
||||
self.id = _id
|
||||
self.type = _type
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.properties = properties
|
||||
for keyword in kwargs:
|
||||
self.__setattr__(keyword, kwargs.get(keyword))
|
||||
|
||||
|
||||
class Edge:
|
||||
def __init__(self, _id: str, _type: str, sourceNodeId: str, targetNodeId: str, **keywords):
|
||||
"""
|
||||
线
|
||||
@param _id: 线id
|
||||
@param _type: 线类型
|
||||
@param sourceNodeId:
|
||||
@param targetNodeId:
|
||||
@param keywords:
|
||||
"""
|
||||
self.id = _id
|
||||
self.type = _type
|
||||
self.sourceNodeId = sourceNodeId
|
||||
self.targetNodeId = targetNodeId
|
||||
for keyword in keywords:
|
||||
self.__setattr__(keyword, keywords.get(keyword))
|
||||
|
||||
|
||||
class EdgeNode:
|
||||
edge: Edge
|
||||
node: Node
|
||||
|
||||
def __init__(self, edge, node):
|
||||
self.edge = edge
|
||||
self.node = node
|
||||
|
||||
|
||||
class Workflow:
|
||||
"""
|
||||
节点列表
|
||||
"""
|
||||
nodes: List[Node]
|
||||
"""
|
||||
线列表
|
||||
"""
|
||||
edges: List[Edge]
|
||||
"""
|
||||
节点id:node
|
||||
"""
|
||||
node_map: Dict[str, Node]
|
||||
"""
|
||||
节点id:当前节点id上面的所有节点
|
||||
"""
|
||||
up_node_map: Dict[str, List[EdgeNode]]
|
||||
"""
|
||||
节点id:当前节点id下面的所有节点
|
||||
"""
|
||||
next_node_map: Dict[str, List[EdgeNode]]
|
||||
|
||||
def __init__(self, nodes: List[Node], edges: List[Edge]):
|
||||
self.nodes = nodes
|
||||
self.edges = edges
|
||||
self.node_map = {node.id: node for node in nodes}
|
||||
|
||||
self.up_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.sourceNodeId)) for
|
||||
edge in edges] for
|
||||
key, edges in
|
||||
group_by(edges, key=lambda edge: edge.targetNodeId).items()}
|
||||
|
||||
self.next_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.targetNodeId)) for edge in edges] for
|
||||
key, edges in
|
||||
group_by(edges, key=lambda edge: edge.sourceNodeId).items()}
|
||||
|
||||
def get_node(self, node_id):
|
||||
"""
|
||||
根据node_id 获取节点信息
|
||||
@param node_id: node_id
|
||||
@return: 节点信息
|
||||
"""
|
||||
return self.node_map.get(node_id)
|
||||
|
||||
def get_up_edge_nodes(self, node_id) -> List[EdgeNode]:
|
||||
"""
|
||||
根据节点id 获取当前连接前置节点和连线
|
||||
@param node_id: 节点id
|
||||
@return: 节点连线列表
|
||||
"""
|
||||
return self.up_node_map.get(node_id)
|
||||
|
||||
def get_next_edge_nodes(self, node_id) -> List[EdgeNode]:
|
||||
"""
|
||||
根据节点id 获取当前连接目标节点和连线
|
||||
@param node_id: 节点id
|
||||
@return: 节点连线列表
|
||||
"""
|
||||
return self.next_node_map.get(node_id)
|
||||
|
||||
def get_up_nodes(self, node_id) -> List[Node]:
|
||||
"""
|
||||
根据节点id 获取当前连接前置节点
|
||||
@param node_id: 节点id
|
||||
@return: 节点列表
|
||||
"""
|
||||
return [en.node for en in self.up_node_map.get(node_id)]
|
||||
|
||||
def get_next_nodes(self, node_id) -> List[Node]:
|
||||
"""
|
||||
根据节点id 获取当前连接目标节点
|
||||
@param node_id: 节点id
|
||||
@return: 节点列表
|
||||
"""
|
||||
return [en.node for en in self.next_node_map.get(node_id, [])]
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: i_node.py
|
||||
@date:2025/5/7 16:41
|
||||
@desc:
|
||||
"""
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
|
||||
from common.utils.common import get_sha256_hash
|
||||
from workflow.workflow.common import Channel, Chunk
|
||||
|
||||
|
||||
class INode:
|
||||
# 当前节点支持的工作流类型
|
||||
supported_workflow_type_list = []
|
||||
# 节点类型
|
||||
type = None
|
||||
# 节点管道
|
||||
channel = Channel()
|
||||
|
||||
def __init__(self, node, workflow_manage, chunk: Chunk = None, up_node_id_list=None, loop_index=None):
|
||||
self.node = node
|
||||
self.chunk = chunk
|
||||
if chunk is not None:
|
||||
self.context = chunk.node_data | {}
|
||||
else:
|
||||
self.context = {}
|
||||
# 运行时id
|
||||
self.runtime_node_id = get_sha256_hash("".join(up_node_id_list | []) + node.id + str(loop_index | ""))
|
||||
self.workflow_manage = workflow_manage
|
||||
self.node_serializer = self.get_node_serializer()(data=node.properties.get('node_data'))
|
||||
self.is_valid()
|
||||
|
||||
def is_valid(self):
|
||||
self.node_serializer.is_valid(raise_exception=True)
|
||||
|
||||
def execute(self, **kwargs):
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
start_time = time.time()
|
||||
self.context['start_time'] = start_time
|
||||
self._run()
|
||||
self.context['run_time'] = time.time() - start_time
|
||||
|
||||
def _run(self):
|
||||
return self.execute(**self.node_serializer.data)
|
||||
|
||||
@abstractmethod
|
||||
def get_node_serializer(self):
|
||||
pass
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: __init__.py.py
|
||||
@date:2025/5/7 16:15
|
||||
@desc:
|
||||
"""
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: tools.py
|
||||
@date:2025/5/7 18:44
|
||||
@desc:
|
||||
"""
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
@project: MaxKB
|
||||
@Author:虎虎
|
||||
@file: workflow_manage.py
|
||||
@date:2025/5/9 10:30
|
||||
@desc:
|
||||
"""
|
||||
from builtins import function
|
||||
from enum import Enum
|
||||
from typing import List, Dict
|
||||
|
||||
from workflow.workflow.common import Workflow, Channel, Chunk
|
||||
|
||||
|
||||
class WorkflowType(Enum):
|
||||
# 应用
|
||||
APPLICATION = "APPLICATION"
|
||||
# 知识库
|
||||
KNOWLEDGE = "KNOWLEDGE"
|
||||
# ....
|
||||
|
||||
|
||||
class WorkflowManage:
|
||||
channel = Channel()
|
||||
|
||||
def __init__(self,
|
||||
workflow: Workflow,
|
||||
chunk_list: List[Chunk],
|
||||
start_node: Chunk,
|
||||
workflow_type: WorkflowType,
|
||||
body: Dict,
|
||||
consumer: function):
|
||||
self.workflow = workflow
|
||||
self.chunk_list = chunk_list
|
||||
self.start_node = start_node
|
||||
self.workflow_type = workflow_type
|
||||
self.body = body
|
||||
self.consumer = consumer
|
||||
|
||||
def stream(self):
|
||||
pass
|
||||
|
||||
def invoke(self):
|
||||
pass
|
||||
Loading…
Reference in New Issue