diff --git a/apps/application/flow/step_node/__init__.py b/apps/application/flow/step_node/__init__.py index e5582f1cf..ffb0c7828 100644 --- a/apps/application/flow/step_node/__init__.py +++ b/apps/application/flow/step_node/__init__.py @@ -32,6 +32,7 @@ from .text_to_speech_step_node.impl.base_text_to_speech_node import BaseTextToSp from .text_to_video_step_node.impl.base_text_to_video_node import BaseTextToVideoNode from .tool_lib_node import * from .tool_node import * +from .variable_aggregation_node.impl.base_variable_aggregation_node import BaseVariableAggregationNode from .variable_assign_node import BaseVariableAssignNode from .variable_splitting_node import BaseVariableSplittingNode from .video_understand_step_node import BaseVideoUnderstandNode @@ -45,7 +46,7 @@ node_list = [BaseStartStepNode, BaseChatNode, BaseSearchKnowledgeNode, BaseSearc BaseVideoUnderstandNode, BaseIntentNode, BaseLoopNode, BaseLoopStartStepNode, BaseLoopContinueNode, - BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode] + BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode, BaseVariableAggregationNode] def get_node(node_type): diff --git a/apps/application/flow/step_node/variable_aggregation_node/__init__.py b/apps/application/flow/step_node/variable_aggregation_node/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/application/flow/step_node/variable_aggregation_node/i_variable_aggregation_node.py b/apps/application/flow/step_node/variable_aggregation_node/i_variable_aggregation_node.py new file mode 100644 index 000000000..f91030036 --- /dev/null +++ b/apps/application/flow/step_node/variable_aggregation_node/i_variable_aggregation_node.py @@ -0,0 +1,42 @@ +# coding=utf-8 + +from typing import Type + +from django.utils.translation import gettext_lazy as _ +from rest_framework import serializers + +from application.flow.i_step_node import INode, NodeResult + + + + +class VariableListSerializer(serializers.Serializer): + v_id = serializers.CharField(required=True, label=_("Variable id")) + variable = serializers.ListField(required=True, label=_("Variable")) + + +class VariableGroupSerializer(serializers.Serializer): + id = serializers.CharField(required=True, label=_("Group id")) + group_name = serializers.CharField(required=True, label=_("group_name")) + variable_list = VariableListSerializer(many=True) + + +class VariableAggregationNodeSerializer(serializers.Serializer): + strategy = serializers.CharField(required=True, label=_("Strategy")) + group_list = VariableGroupSerializer(many=True) + + +class IVariableAggregation(INode): + type = 'variable-aggregation-node' + + + def get_node_params_serializer_class(self) -> Type[serializers.Serializer]: + return VariableAggregationNodeSerializer + + def _run(self): + return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data) + + def execute(self,strategy,group_list,**kwargs) -> NodeResult: + pass + + diff --git a/apps/application/flow/step_node/variable_aggregation_node/impl/__init__.py b/apps/application/flow/step_node/variable_aggregation_node/impl/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/application/flow/step_node/variable_aggregation_node/impl/base_variable_aggregation_node.py b/apps/application/flow/step_node/variable_aggregation_node/impl/base_variable_aggregation_node.py new file mode 100644 index 000000000..f5665ce06 --- /dev/null +++ b/apps/application/flow/step_node/variable_aggregation_node/impl/base_variable_aggregation_node.py @@ -0,0 +1,54 @@ +#coding=utf-8 +""" + @project: MaxKB + @Author:虎² + @file: base_variable_aggregation_node.py + @date:2025/10/23 17:42 + @desc: +""" +from application.flow.i_step_node import NodeResult +from application.flow.step_node.variable_aggregation_node.i_variable_aggregation_node import IVariableAggregation + + +class BaseVariableAggregationNode(IVariableAggregation): + + def save_context(self, details, workflow_manage): + for key, value in details.get('result').items(): + self.context['key'] = value + self.context['result'] = details.get('result') + + def get_first_non_null(self, variable_list): + + for variable in variable_list: + v = self.workflow_manage.get_reference_field( + variable.get('variable')[0], + variable.get('variable')[1:]) + if v is not None: + return v + return None + + def set_variable_to_json(self, variable_list): + + return {variable.get('variable')[1:][0]: self.workflow_manage.get_reference_field( + variable.get('variable')[0], + variable.get('variable')[1:]) for variable in variable_list} + + def execute(self,strategy,group_list,**kwargs) -> NodeResult: + strategy_map = {'first_non_null':self.get_first_non_null, + 'variable_to_json': self.set_variable_to_json, + } + + result = { item.get('group_name'):strategy_map[strategy](item.get('variable_list')) for item in group_list} + + return NodeResult({'result': result,**result},{}) + + def get_details(self, index: int, **kwargs): + return { + 'name': self.node.properties.get('stepName'), + "index": index, + 'run_time': self.context.get('run_time'), + 'type': self.node.type, + 'result': self.context.get('result'), + 'status': self.status, + 'err_message': self.err_message + } \ No newline at end of file diff --git a/ui/src/locales/lang/en-US/views/application-workflow.ts b/ui/src/locales/lang/en-US/views/application-workflow.ts index 658e71fb0..a64d3474a 100644 --- a/ui/src/locales/lang/en-US/views/application-workflow.ts +++ b/ui/src/locales/lang/en-US/views/application-workflow.ts @@ -263,6 +263,15 @@ You are a master of problem optimization, adept at accurately inferring user int variableAggregationNode: { label: 'Variable Aggregation', text: 'Perform aggregation processing on the outputs of multiple branches', + Strategy: 'Aggregation Strategy', + placeholder: 'Return the first non-null value of each group', + placeholder1: 'Structurally aggregate each group of variables', + group: { + placeholder: 'Please select a variable', + noneError: 'Name cannot be empty', + dupError: 'Name cannot be duplicated', + }, + add: 'Add Group', }, mcpNode: { label: 'MCP Node', diff --git a/ui/src/locales/lang/zh-CN/views/application-workflow.ts b/ui/src/locales/lang/zh-CN/views/application-workflow.ts index 7ed00d01c..c6d33c481 100644 --- a/ui/src/locales/lang/zh-CN/views/application-workflow.ts +++ b/ui/src/locales/lang/zh-CN/views/application-workflow.ts @@ -264,6 +264,15 @@ export default { variableAggregationNode: { label: '变量聚合', text: '对多个分支的输出进行聚合处理', + Strategy: '聚合策略', + placeholder: '返回每组的第一个非空值', + placeholder1: '结构化聚合每组变量', + group: { + placeholder: '请选择变量', + noneError: '名称不能为空', + dupError: '名称不能重复', + }, + add: '添加分组', }, variableAssignNode: { label: '变量赋值', diff --git a/ui/src/locales/lang/zh-Hant/views/application-workflow.ts b/ui/src/locales/lang/zh-Hant/views/application-workflow.ts index 38083ac7e..08bdc440c 100644 --- a/ui/src/locales/lang/zh-Hant/views/application-workflow.ts +++ b/ui/src/locales/lang/zh-Hant/views/application-workflow.ts @@ -263,6 +263,15 @@ export default { variableAggregationNode: { label: '變量聚合', text: '對多個分支的輸出進行聚合處理', + Strategy: '聚合策略', + placeholder: '返回每組的第一個非空值', + placeholder1: '結構化聚合每組變量', + group: { + placeholder: '請選擇變量', + noneError: '名稱不能為空', + dupError: '名稱不能重複', + }, + add: '新增分組', }, mcpNode: { label: 'MCP 調用', diff --git a/ui/src/workflow/common/data.ts b/ui/src/workflow/common/data.ts index e22e40a5a..f095f79da 100644 --- a/ui/src/workflow/common/data.ts +++ b/ui/src/workflow/common/data.ts @@ -345,7 +345,9 @@ export const variableAggregationNode = { height: 252, properties: { stepName: t('views.applicationWorkflow.nodes.variableAggregationNode.label'), - config: {}, + config: { + fields: [], + }, }, } diff --git a/ui/src/workflow/nodes/variable-aggregation-node/index.vue b/ui/src/workflow/nodes/variable-aggregation-node/index.vue index b2551a24c..f36b7d8db 100644 --- a/ui/src/workflow/nodes/variable-aggregation-node/index.vue +++ b/ui/src/workflow/nodes/variable-aggregation-node/index.vue @@ -1,20 +1,297 @@