From a7bb173cc1bf0b8ad70f9531d5e9f7204b4e5f3c Mon Sep 17 00:00:00 2001 From: zhangzhanwei Date: Thu, 18 Dec 2025 15:42:28 +0800 Subject: [PATCH] feat: Add web node interrupt --- .../impl/base_data_source_web_node.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/apps/application/flow/step_node/data_source_web_node/impl/base_data_source_web_node.py b/apps/application/flow/step_node/data_source_web_node/impl/base_data_source_web_node.py index 9561955aa..8daeceecb 100644 --- a/apps/application/flow/step_node/data_source_web_node/impl/base_data_source_web_node.py +++ b/apps/application/flow/step_node/data_source_web_node/impl/base_data_source_web_node.py @@ -25,7 +25,12 @@ class BaseDataSourceWebNodeForm(BaseForm): 'placeholder': _('The default is body, you can enter .classname/#idname/tagname')}) -def get_collect_handler(): +class InterruptedTaskException(Exception): + def __init__(self, *args, **kwargs): # real signature unknown + pass + + +def get_collect_handler(workflow_manage): results = [] def handler(child_link: ChildLink, response: Fork.Response): @@ -37,8 +42,11 @@ def get_collect_handler(): "name": document_name.strip(), "content": response.content, }) + except Exception as e: - maxkb_logger.error(f'{str(e)}:{traceback.format_exc()}') + maxkb_logger.error(f'{str(e)}:{traceback.format_exc()}') + if workflow_manage.is_the_task_interrupted(): + raise InterruptedTaskException('Task interrupted') return handler, results @@ -60,15 +68,18 @@ class BaseDataSourceWebNode(IDataSourceWebNode): source_url = data_source.get("source_url") selector = data_source.get("selector") or "body" - collect_handler, document_list = get_collect_handler() + collect_handler, document_list = get_collect_handler(self.workflow_manage) try: ForkManage(source_url, selector.split(" ") if selector is not None else []).fork(3, set(), collect_handler) - return NodeResult({'document_list': document_list,'source_url': source_url, 'selector': selector}, + return NodeResult({'document_list': document_list, 'source_url': source_url, 'selector': selector}, self.workflow_manage.params.get('knowledge_base') or {}) except Exception as e: + if isinstance(e,InterruptedTaskException): + return NodeResult({'document_list': document_list, 'source_url': source_url, 'selector': selector}, + self.workflow_manage.params.get('knowledge_base') or {}) maxkb_logger.error(_('data source web node:{node_id} error{error}{traceback}').format( knowledge_id=node_id, error=str(e), traceback=traceback.format_exc()))