feat: delete workflow (#3145)

This commit is contained in:
shaohuzhang1 2025-05-26 16:01:43 +08:00 committed by GitHub
parent c0b676ed16
commit 1aa8fe28ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 2 additions and 399 deletions

View File

@ -280,7 +280,7 @@ def bulk_create_in_batches(model, data, batch_size=1000):
model.objects.bulk_create(batch)
def get_sha256_hash(_bytes):
def get_sha256_hash(_v: str):
sha256 = hashlib.sha256()
sha256.update(_bytes)
sha256.update(_v.encode())
return sha256.hexdigest()

View File

@ -1,3 +0,0 @@
from django.contrib import admin
# Register your models here.

View File

@ -1,6 +0,0 @@
from django.apps import AppConfig
class WorkflowConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'workflow'

View File

@ -1,8 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file __init__.py
@date2025/5/7 15:43
@desc:
"""

View File

@ -1,30 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file workflow.py
@date2025/5/7 15:44
@desc:
"""
from django.db import models
import uuid_utils.compat as uuid
class WorkflowType(models.TextChoices):
# 应用
APPLICATION = "APPLICATION"
# 知识库
KNOWLEDGE = "KNOWLEDGE"
# ....
class Workflow(models.Model):
id = models.UUIDField(primary_key=True, max_length=128, default=uuid.uuid7, editable=False, verbose_name="主键id")
workflow = models.JSONField(verbose_name="工作流数据", default=dict)
type = models.CharField(verbose_name="工作流类型", choices=WorkflowType.choices, default=WorkflowType.APPLICATION)
create_time = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True)
class Meta:
db_table = "workflow"
ordering = ['update_time']

View File

@ -1,3 +0,0 @@
from django.test import TestCase
# Create your tests here.

View File

@ -1,8 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file __init__.py.py
@date2025/5/7 15:43
@desc:
"""

View File

@ -1,8 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file __init__.py
@date2025/5/7 16:15
@desc:
"""

View File

@ -1,214 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file workflow.py
@date2025/5/9 10:58
@desc:
"""
from typing import List, Dict
from queue import Queue, Empty
from common.utils.common import group_by
class Content:
def __init__(self, content: str, reasoning_content: str, **kwargs):
"""
内容
@param content: ai响应内容
@param reasoning_content:思考过程
@param kwargs: 其他参数
"""
self.content = content
self.reasoning_content = reasoning_content
for key in kwargs:
self.__setattr__(key, kwargs.get(key))
class Chunk:
def __init__(self, runtime_id: str, node_id: str, node_name: str, content: Content, node_data, children, loop_index,
**kwargs):
"""
@param runtime_id: 运行时id
@param node_id: 节点id
@param node_name: 节点名称
@param loop_index: 循环下标
@param children: 子块
@param node_data 节点数据
@param content: 内容
"""
self.runtime_id = runtime_id
self.node_id = node_id
self.node_name = node_name
self.loop_index = loop_index
self.children = children
self.content = content
self.node_data = node_data
for key in kwargs:
self.__setattr__(key, kwargs.get(key))
class Channel:
"""
对话管道
"""
messages = Queue()
is_end = False
def write(self, message):
if isinstance(message, Channel) | isinstance(message, Chunk):
if self.is_end:
raise "通道已关闭"
self.messages.put(message)
else:
raise "不支持的管道参数"
def end(self):
self.is_end = True
return self.messages.put(None)
def pop(self):
if self.is_end:
return self.messages.get_nowait()
return self.messages.get()
def generator(self):
while True:
try:
message = self.pop()
if message:
if isinstance(message, Channel):
for chunk in message.generator():
yield chunk
else:
yield message
except Empty:
return
class Node:
def __init__(self, _id: str, _type: str, x: int, y: int, properties: dict, **kwargs):
"""
@param _id: 节点id
@param _type: 类型
@param x: 节点x轴位置
@param y: 节点y轴位置
@param properties:
@param kwargs:
"""
self.id = _id
self.type = _type
self.x = x
self.y = y
self.properties = properties
for keyword in kwargs:
self.__setattr__(keyword, kwargs.get(keyword))
class Edge:
def __init__(self, _id: str, _type: str, sourceNodeId: str, targetNodeId: str, **keywords):
"""
线
@param _id: 线id
@param _type: 线类型
@param sourceNodeId:
@param targetNodeId:
@param keywords:
"""
self.id = _id
self.type = _type
self.sourceNodeId = sourceNodeId
self.targetNodeId = targetNodeId
for keyword in keywords:
self.__setattr__(keyword, keywords.get(keyword))
class EdgeNode:
edge: Edge
node: Node
def __init__(self, edge, node):
self.edge = edge
self.node = node
class Workflow:
"""
节点列表
"""
nodes: List[Node]
"""
线列表
"""
edges: List[Edge]
"""
节点id:node
"""
node_map: Dict[str, Node]
"""
节点id:当前节点id上面的所有节点
"""
up_node_map: Dict[str, List[EdgeNode]]
"""
节点id:当前节点id下面的所有节点
"""
next_node_map: Dict[str, List[EdgeNode]]
def __init__(self, nodes: List[Node], edges: List[Edge]):
self.nodes = nodes
self.edges = edges
self.node_map = {node.id: node for node in nodes}
self.up_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.sourceNodeId)) for
edge in edges] for
key, edges in
group_by(edges, key=lambda edge: edge.targetNodeId).items()}
self.next_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.targetNodeId)) for edge in edges] for
key, edges in
group_by(edges, key=lambda edge: edge.sourceNodeId).items()}
def get_node(self, node_id):
"""
根据node_id 获取节点信息
@param node_id: node_id
@return: 节点信息
"""
return self.node_map.get(node_id)
def get_up_edge_nodes(self, node_id) -> List[EdgeNode]:
"""
根据节点id 获取当前连接前置节点和连线
@param node_id: 节点id
@return: 节点连线列表
"""
return self.up_node_map.get(node_id)
def get_next_edge_nodes(self, node_id) -> List[EdgeNode]:
"""
根据节点id 获取当前连接目标节点和连线
@param node_id: 节点id
@return: 节点连线列表
"""
return self.next_node_map.get(node_id)
def get_up_nodes(self, node_id) -> List[Node]:
"""
根据节点id 获取当前连接前置节点
@param node_id: 节点id
@return: 节点列表
"""
return [en.node for en in self.up_node_map.get(node_id)]
def get_next_nodes(self, node_id) -> List[Node]:
"""
根据节点id 获取当前连接目标节点
@param node_id: 节点id
@return: 节点列表
"""
return [en.node for en in self.next_node_map.get(node_id, [])]

View File

@ -1,54 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file i_node.py
@date2025/5/7 16:41
@desc:
"""
import time
from abc import abstractmethod
from common.utils.common import get_sha256_hash
from workflow.workflow.common import Channel, Chunk
class INode:
# 当前节点支持的工作流类型
supported_workflow_type_list = []
# 节点类型
type = None
# 节点管道
channel = Channel()
def __init__(self, node, workflow_manage, chunk: Chunk = None, up_node_id_list=None, loop_index=None):
self.node = node
self.chunk = chunk
if chunk is not None:
self.context = chunk.node_data | {}
else:
self.context = {}
# 运行时id
self.runtime_node_id = get_sha256_hash("".join(up_node_id_list | []) + node.id + str(loop_index | ""))
self.workflow_manage = workflow_manage
self.node_serializer = self.get_node_serializer()(data=node.properties.get('node_data'))
self.is_valid()
def is_valid(self):
self.node_serializer.is_valid(raise_exception=True)
def execute(self, **kwargs):
pass
def run(self):
start_time = time.time()
self.context['start_time'] = start_time
self._run()
self.context['run_time'] = time.time() - start_time
def _run(self):
return self.execute(**self.node_serializer.data)
@abstractmethod
def get_node_serializer(self):
pass

View File

@ -1,8 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file __init__.py.py
@date2025/5/7 16:15
@desc:
"""

View File

@ -1,10 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file tools.py
@date2025/5/7 18:44
@desc:
"""

View File

@ -1,45 +0,0 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file workflow_manage.py
@date2025/5/9 10:30
@desc:
"""
from builtins import function
from enum import Enum
from typing import List, Dict
from workflow.workflow.common import Workflow, Channel, Chunk
class WorkflowType(Enum):
# 应用
APPLICATION = "APPLICATION"
# 知识库
KNOWLEDGE = "KNOWLEDGE"
# ....
class WorkflowManage:
channel = Channel()
def __init__(self,
workflow: Workflow,
chunk_list: List[Chunk],
start_node: Chunk,
workflow_type: WorkflowType,
body: Dict,
consumer: function):
self.workflow = workflow
self.chunk_list = chunk_list
self.start_node = start_node
self.workflow_type = workflow_type
self.body = body
self.consumer = consumer
def stream(self):
pass
def invoke(self):
pass