feat: Node supports knowledge base workflow

This commit is contained in:
zhangzhanwei 2025-11-27 18:09:21 +08:00 committed by zhanweizhang7
parent bcdea17d83
commit 6434ba71b7
12 changed files with 185 additions and 34 deletions

View File

@ -38,4 +38,4 @@ class IConditionNode(INode):
type = 'condition-node'
support = [WorkflowMode.APPLICATION_LOOP]
support = [WorkflowMode.APPLICATION, WorkflowMode.APPLICATION_LOOP, WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP]

View File

@ -35,8 +35,8 @@ class ImageToVideoNodeSerializer(serializers.Serializer):
class IImageToVideoNode(INode):
type = 'image-to-video-node'
support = [WorkflowMode.APPLICATION, WorkflowMode.APPLICATION_LOOP]
support = [WorkflowMode.APPLICATION, WorkflowMode.APPLICATION_LOOP, WorkflowMode.KNOWLEDGE,
WorkflowMode.KNOWLEDGE_LOOP]
def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
return ImageToVideoNodeSerializer
@ -55,10 +55,15 @@ class IImageToVideoNode(INode):
self.node_params_serializer.data.get('last_frame_url')[1:])
node_params_data = {k: v for k, v in self.node_params_serializer.data.items()
if k not in ['first_frame_url', 'last_frame_url']}
return self.execute(first_frame_url=first_frame_url, last_frame_url=last_frame_url,
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
self.workflow_manage.flow.workflow_mode):
return self.execute(first_frame_url=first_frame_url, last_frame_url=last_frame_url, **node_params_data, **self.flow_params_serializer.data,
**{'history_chat_record': [], 'stream': True, 'chat_id': None, 'chat_record_id': None})
else:
return self.execute(first_frame_url=first_frame_url, last_frame_url=last_frame_url,
**node_params_data, **self.flow_params_serializer.data)
def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record, chat_id,
def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record,
model_params_setting,
chat_record_id,
first_frame_url, last_frame_url,

View File

@ -7,6 +7,7 @@ import requests
from django.db.models import QuerySet
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from application.flow.common import WorkflowMode
from application.flow.i_step_node import NodeResult
from application.flow.step_node.image_to_video_step_node.i_image_to_video_node import IImageToVideoNode
from common.utils.common import bytes_to_uploaded_file
@ -23,12 +24,11 @@ class BaseImageToVideoNode(IImageToVideoNode):
if self.node_params.get('is_result', False):
self.answer_text = details.get('answer')
def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record, chat_id,
def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record,
model_params_setting,
chat_record_id,
first_frame_url, last_frame_url=None,
**kwargs) -> NodeResult:
application = self.workflow_manage.work_flow_post_handler.chat_info.application
workspace_id = self.workflow_manage.get_body().get('workspace_id')
ttv_model = get_model_instance_by_model_workspace_id(model_id, workspace_id,
**model_params_setting)
@ -54,17 +54,7 @@ class BaseImageToVideoNode(IImageToVideoNode):
if isinstance(video_urls, str) and video_urls.startswith('http'):
video_urls = requests.get(video_urls).content
file = bytes_to_uploaded_file(video_urls, file_name)
meta = {
'debug': False if application.id else True,
'chat_id': chat_id,
'application_id': str(application.id) if application.id else None,
}
file_url = FileSerializer(data={
'file': file,
'meta': meta,
'source_id': meta['application_id'],
'source_type': FileSourceType.APPLICATION.value
}).upload()
file_url = self.upload_file(file)
video_label = f'<video src="{file_url}" controls style="max-width: 100%; width: 100%; height: auto; max-height: 60vh;"></video>'
video_list = [{'file_id': file_url.split('/')[-1], 'file_name': file_name, 'url': file_url}]
return NodeResult({'answer': video_label, 'chat_model': ttv_model, 'message_list': message_list,
@ -88,6 +78,42 @@ class BaseImageToVideoNode(IImageToVideoNode):
raise ValueError(
gettext("Failed to obtain the image"))
def upload_file(self, file):
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
self.workflow_manage.flow.workflow_mode):
return self.upload_knowledge_file(file)
return self.upload_application_file(file)
def upload_knowledge_file(self, file):
knowledge_id = self.workflow_params.get('knowledge_id')
meta = {
'debug': False,
'knowledge_id': knowledge_id
}
file_url = FileSerializer(data={
'file': file,
'meta': meta,
'source_id': knowledge_id,
'source_type': FileSourceType.KNOWLEDGE.value
}).upload()
return file_url
def upload_application_file(self, file):
application = self.workflow_manage.work_flow_post_handler.chat_info.application
chat_id = self.workflow_params.get('chat_id')
meta = {
'debug': False if application.id else True,
'chat_id': chat_id,
'application_id': str(application.id) if application.id else None,
}
file_url = FileSerializer(data={
'file': file,
'meta': meta,
'source_id': meta['application_id'],
'source_type': FileSourceType.APPLICATION.value
}).upload()
return file_url
def generate_history_ai_message(self, chat_record):
for val in chat_record.details.values():
if self.node.id == val['node_id'] and 'image_list' in val:

View File

@ -39,9 +39,14 @@ class ITextToVideoNode(INode):
return TextToVideoNodeSerializer
def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
self.workflow_manage.flow.workflow_mode):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data,
**{'history_chat_record': [], 'stream': True, 'chat_id': None, 'chat_record_id': None})
else:
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record, chat_id,
def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record,
model_params_setting,
chat_record_id,
**kwargs) -> NodeResult:

View File

@ -5,6 +5,7 @@ from typing import List
import requests
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from application.flow.common import WorkflowMode
from application.flow.i_step_node import NodeResult
from application.flow.step_node.text_to_video_step_node.i_text_to_video_node import ITextToVideoNode
from common.utils.common import bytes_to_uploaded_file
@ -20,11 +21,10 @@ class BaseTextToVideoNode(ITextToVideoNode):
if self.node_params.get('is_result', False):
self.answer_text = details.get('answer')
def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record, chat_id,
def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record,
model_params_setting,
chat_record_id,
**kwargs) -> NodeResult:
application = self.workflow_manage.work_flow_post_handler.chat_info.application
workspace_id = self.workflow_manage.get_body().get('workspace_id')
ttv_model = get_model_instance_by_model_workspace_id(model_id, workspace_id,
**model_params_setting)
@ -44,6 +44,36 @@ class BaseTextToVideoNode(ITextToVideoNode):
if isinstance(video_urls, str) and video_urls.startswith('http'):
video_urls = requests.get(video_urls).content
file = bytes_to_uploaded_file(video_urls, file_name)
file_url = self.upload_file(file)
video_label = f'<video src="{file_url}" controls style="max-width: 100%; width: 100%; height: auto;"></video>'
video_list = [{'file_id': file_url.split('/')[-1], 'file_name': file_name, 'url': file_url}]
return NodeResult({'answer': video_label, 'chat_model': ttv_model, 'message_list': message_list,
'video': video_list,
'history_message': history_message, 'question': question}, {})
def upload_file(self, file):
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
self.workflow_manage.flow.workflow_mode):
return self.upload_knowledge_file(file)
return self.upload_application_file(file)
def upload_knowledge_file(self, file):
knowledge_id = self.workflow_params.get('knowledge_id')
meta = {
'debug': False,
'knowledge_id': knowledge_id
}
file_url = FileSerializer(data={
'file': file,
'meta': meta,
'source_id': knowledge_id,
'source_type': FileSourceType.KNOWLEDGE.value
}).upload()
return file_url
def upload_application_file(self, file):
application = self.workflow_manage.work_flow_post_handler.chat_info.application
chat_id = self.workflow_params.get('chat_id')
meta = {
'debug': False if application.id else True,
'chat_id': chat_id,
@ -55,11 +85,7 @@ class BaseTextToVideoNode(ITextToVideoNode):
'source_id': meta['application_id'],
'source_type': FileSourceType.APPLICATION.value
}).upload()
video_label = f'<video src="{file_url}" controls style="max-width: 100%; width: 100%; height: auto;"></video>'
video_list = [{'file_id': file_url.split('/')[-1], 'file_name': file_name, 'url': file_url}]
return NodeResult({'answer': video_label, 'chat_model': ttv_model, 'message_list': message_list,
'video': video_list,
'history_message': history_message, 'question': question}, {})
return file_url
def generate_history_ai_message(self, chat_record):
for val in chat_record.details.values():

View File

@ -39,9 +39,15 @@ class IVideoUnderstandNode(INode):
def _run(self):
res = self.workflow_manage.get_reference_field(self.node_params_serializer.data.get('video_list')[0],
self.node_params_serializer.data.get('video_list')[1:])
return self.execute(video=res, **self.node_params_serializer.data, **self.flow_params_serializer.data)
def execute(self, model_id, system, prompt, dialogue_number, dialogue_type, history_chat_record, stream, chat_id,
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
self.workflow_manage.flow.workflow_mode):
return self.execute(video=res, **self.node_params_serializer.data, **self.flow_params_serializer.data,
**{'history_chat_record': [], 'stream': True, 'chat_id': None, 'chat_record_id': None})
else:
return self.execute(video=res, **self.node_params_serializer.data, **self.flow_params_serializer.data)
def execute(self, model_id, system, prompt, dialogue_number, dialogue_type, history_chat_record, stream,
model_params_setting,
chat_record_id,
video,

View File

@ -70,7 +70,7 @@ class BaseVideoUnderstandNode(IVideoUnderstandNode):
if self.node_params.get('is_result', False):
self.answer_text = details.get('answer')
def execute(self, model_id, system, prompt, dialogue_number, dialogue_type, history_chat_record, stream, chat_id,
def execute(self, model_id, system, prompt, dialogue_number, dialogue_type, history_chat_record, stream,
model_params_setting,
chat_record_id,
video,

View File

@ -1,4 +1,5 @@
# coding=utf-8
import asyncio
import json
from typing import Dict
@ -12,6 +13,7 @@ from application.flow.common import Workflow, WorkflowMode
from application.flow.i_step_node import KnowledgeWorkflowPostHandler
from application.flow.knowledge_workflow_manage import KnowledgeWorkflowManage
from application.flow.step_node import get_node
from application.serializers.application import get_mcp_tools
from common.exception.app_exception import AppApiException
from common.utils.rsa_util import rsa_long_decrypt
from common.utils.tool_code import ToolExecutor
@ -146,3 +148,40 @@ class KnowledgeWorkflowSerializer(serializers.Serializer):
self.is_valid(raise_exception=True)
workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get('knowledge_id')).first()
return {**KnowledgeWorkflowModelSerializer(workflow).data}
class McpServersSerializer(serializers.Serializer):
mcp_servers = serializers.JSONField(required=True)
class KnowledgeWorkflowMcpSerializer(serializers.Serializer):
knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
user_id = serializers.UUIDField(required=True, label=_("User ID"))
workspace_id = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_("Workspace ID"))
def is_valid(self, *, raise_exception=False):
super().is_valid(raise_exception=True)
workspace_id = self.data.get('workspace_id')
query_set = QuerySet(Knowledge).filter(id=self.data.get('knowledge_id'))
if workspace_id:
query_set = query_set.filter(workspace_id=workspace_id)
if not query_set.exists():
raise AppApiException(500, _('Knowledge id does not exist'))
def get_mcp_servers(self, instance, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
McpServersSerializer(data=instance).is_valid(raise_exception=True)
servers = json.loads(instance.get('mcp_servers'))
for server, config in servers.items():
if config.get('transport') not in ['sse', 'streamable_http']:
raise AppApiException(500, _('Only support transport=sse or transport=streamable_http'))
tools = []
for server in servers:
tools += [
{
'server': server,
'name': tool.name,
'description': tool.description,
'args_schema': tool.args_schema,
}
for tool in asyncio.run(get_mcp_tools({server: servers[server]}))]
return tools

View File

@ -72,6 +72,6 @@ urlpatterns = [
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/datasource/<str:type>/<str:id>/form_list', views.KnowledgeDatasourceFormListView.as_view()),
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/datasource/<str:type>/<str:id>/<str:function_name>', views.KnowledgeDatasourceView.as_view()),
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action', views.KnowledgeWorkflowActionView.as_view()),
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action/<str:knowledge_action_id>', views.KnowledgeWorkflowActionView.Operate.as_view())
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action/<str:knowledge_action_id>', views.KnowledgeWorkflowActionView.Operate.as_view()),
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/mcp_tools', views.McpServers.as_view()),
]

View File

@ -5,6 +5,7 @@ from drf_spectacular.utils import extend_schema
from rest_framework.request import Request
from rest_framework.views import APIView
from application.api.application_api import SpeechToTextAPI
from common.auth import TokenAuth
from common.auth.authentication import has_permissions
from common.constants.permission_constants import PermissionConstants, RoleConstants, ViewPermission, CompareConstants
@ -12,7 +13,8 @@ from common.log.log import log
from common.result import result
from knowledge.api.knowledge_workflow import KnowledgeWorkflowApi
from knowledge.serializers.common import get_knowledge_operation_object
from knowledge.serializers.knowledge_workflow import KnowledgeWorkflowSerializer, KnowledgeWorkflowActionSerializer
from knowledge.serializers.knowledge_workflow import KnowledgeWorkflowSerializer, KnowledgeWorkflowActionSerializer, \
KnowledgeWorkflowMcpSerializer
class KnowledgeDatasourceFormListView(APIView):
@ -125,3 +127,29 @@ class KnowledgeWorkflowView(APIView):
class KnowledgeWorkflowVersionView(APIView):
pass
class McpServers(APIView):
authentication_classes = [TokenAuth]
@extend_schema(
methods=['GET'],
description=_("speech to text"),
summary=_("speech to text"),
operation_id=_("speech to text"), # type: ignore
parameters=SpeechToTextAPI.get_parameters(),
request=SpeechToTextAPI.get_request(),
responses=SpeechToTextAPI.get_response(),
tags=[_('Knowledge Base')] # type: ignore
)
@has_permissions(PermissionConstants.KNOWLEDGE_READ.get_workspace_application_permission(),
PermissionConstants.KNOWLEDGE_READ.get_workspace_permission_workspace_manage_role(),
ViewPermission([RoleConstants.USER.get_workspace_role()],
[PermissionConstants.KNOWLEDGE.get_workspace_application_permission()],
CompareConstants.AND),
RoleConstants.WORKSPACE_MANAGE.get_workspace_role())
def post(self, request: Request, workspace_id, knowledge_id: str):
return result.success(KnowledgeWorkflowMcpSerializer(
data={'mcp_servers': request.query_params.get('mcp_servers'), 'workspace_id': workspace_id,
'user_id': request.user.id,
'knowledge_id': knowledge_id}).get_mcp_servers(request.data))

View File

@ -371,6 +371,19 @@ const getWorkflowAction: (
) => Promise<Result<any>> = (knowledge_id: string, knowledge_action_id, loading) => {
return get(`${prefix.value}/${knowledge_id}/action/${knowledge_action_id}`, {}, loading)
}
/**
* mcp
*/
const getMcpTools: (
knowledge_id: string,
mcp_servers: any,
loading?: Ref<boolean>,
) => Promise<Result<any>> = (knowledge_id, mcp_servers, loading) => {
return post(`${prefix.value}/${knowledge_id}/mcp_tools`, { mcp_servers }, {}, loading)
}
export default {
getKnowledgeList,
getKnowledgeListPage,
@ -399,4 +412,5 @@ export default {
workflowAction,
getWorkflowAction,
getKnowledgeWorkflowDatasourceDetails,
getMcpTools,
}

View File

@ -272,6 +272,7 @@ import McpServerInputDialog from './component/McpServerInputDialog.vue'
import { useRoute } from 'vue-router'
import { loadSharedApi } from '@/utils/dynamics-api/shared-api'
import { resetUrl } from '@/utils/common'
import { WorkflowMode } from '@/enums/application'
const props = defineProps<{ nodeModel: any }>()
@ -280,6 +281,7 @@ const {
params: { id },
} = route as any
const getResourceDetail = inject('getResourceDetail') as any
const workflow_mode:WorkflowMode = inject('workflowMode') || WorkflowMode.Application
const resource = getResourceDetail()
const apiType = computed(() => {
@ -366,7 +368,7 @@ function getTools() {
}
function _getTools(mcp_servers: any) {
loadSharedApi({ type: 'application', systemType: apiType.value })
loadSharedApi({ type: [WorkflowMode.Application,WorkflowMode.ApplicationLoop].includes(workflow_mode)?'application':'knowledge', systemType: apiType.value })
.getMcpTools(id, mcp_servers, loading)
.then((res: any) => {
form_data.value.mcp_tools = res.data