feat: loopNode
Some checks failed
sync2gitee / repo-sync (push) Has been cancelled

This commit is contained in:
shaohuzhang1 2025-03-14 15:36:52 +08:00
parent bfb03a6239
commit 722d885506
2 changed files with 15 additions and 13 deletions

View File

@ -46,7 +46,12 @@ class ILoopNode(INode):
return ILoopNodeSerializer
def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
array = self.node_params_serializer.data.get('array')
if self.node_params_serializer.data.get('loop_type') == 'ARRAY':
array = self.workflow_manage.get_reference_field(
array[0],
array[1:])
return self.execute(**{**self.node_params_serializer.data, "array": array}, **self.flow_params_serializer.data)
def execute(self, loop_type, array, number, loop_body, stream, **kwargs) -> NodeResult:
pass

View File

@ -71,7 +71,7 @@ def write_context(node_variable: Dict, workflow_variable: Dict, node: INode, wor
_write_context(node_variable, workflow_variable, node, workflow, content, reasoning_content)
def loop_number(number: int, loop_body, workflow_manage_new_instance, workflow):
def loop_number(number: int, workflow_manage_new_instance, node: INode):
loop_global_data = {}
for index in range(number):
"""
@ -91,8 +91,9 @@ def loop_number(number: int, loop_body, workflow_manage_new_instance, workflow):
loop_global_data = instance.context
def loop_array(array, loop_body, workflow_manage_new_instance, workflow):
def loop_array(array, workflow_manage_new_instance, node: INode):
loop_global_data = {}
loop_execute_details = []
for item, index in zip(array, range(len(array))):
"""
指定次数循环
@ -100,22 +101,19 @@ def loop_array(array, loop_body, workflow_manage_new_instance, workflow):
"""
instance = workflow_manage_new_instance({'index': index, 'item': item}, loop_global_data)
response = instance.stream()
answer = ''
reasoning_content = ''
for chunk in response:
content_chunk = chunk.get('content', '')
reasoning_content_chunk = chunk.get('reasoning_content', '')
reasoning_content += reasoning_content_chunk
answer += content_chunk
yield chunk
loop_global_data = instance.context
runtime_details = instance.get_runtime_details()
loop_execute_details.append(runtime_details)
node.context['loop_execute_details'] = loop_execute_details
def get_write_context(loop_type, array, number, loop_body, stream):
def inner_write_context(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
if loop_type == 'ARRAY':
return loop_array(array, loop_body, node_variable['workflow_manage_new_instance'], workflow)
return loop_number(number, loop_body, node_variable['workflow_manage_new_instance'], workflow)
return loop_array(array, node_variable['workflow_manage_new_instance'], node)
return loop_number(number, node_variable['workflow_manage_new_instance'], node)
return inner_write_context
@ -138,8 +136,7 @@ class BaseLoopNode(ILoopNode):
def workflow_manage_new_instance(start_data, global_data):
workflow_manage = WorkflowManage(Flow.new_instance(loop_body), self.workflow_manage.params,
LoopWorkFlowPostHandler(
self.workflow_manage.work_flow_post_handler.chat_info
,
self.workflow_manage.work_flow_post_handler.chat_info,
self.workflow_manage.work_flow_post_handler.client_id,
self.workflow_manage.work_flow_post_handler.client_type)
, base_to_response=LoopToResponse(),