From 8a9de5964d838214ae89d40405ae4242d596805c Mon Sep 17 00:00:00 2001 From: shaohuzhang1 <80892890+shaohuzhang1@users.noreply.github.com> Date: Mon, 8 Dec 2025 14:26:44 +0800 Subject: [PATCH] fix: update file handling functions (#4455) --- .../tool_lib_node/impl/base_tool_lib_node.py | 47 ++++++++----------- apps/common/utils/tool_code.py | 43 ++++++++++++++++- 2 files changed, 62 insertions(+), 28 deletions(-) diff --git a/apps/application/flow/step_node/tool_lib_node/impl/base_tool_lib_node.py b/apps/application/flow/step_node/tool_lib_node/impl/base_tool_lib_node.py index 2cbd3ca21..7b036291e 100644 --- a/apps/application/flow/step_node/tool_lib_node/impl/base_tool_lib_node.py +++ b/apps/application/flow/step_node/tool_lib_node/impl/base_tool_lib_node.py @@ -6,7 +6,8 @@ @date:2024/8/8 17:49 @desc: """ -import base64 + +import ast import io import json import mimetypes @@ -195,31 +196,23 @@ class BaseToolLibNodeNode(IToolLibNode): else: all_params = init_params_default_value | params if self.node.properties.get('kind') == 'data-source': - download_file_list = [] - download_list = function_executor.exec_code( - tool_lib.code, - {**all_params, **self.workflow_params.get('data_source')}, - function_name='get_down_file_list' - ) - for item in download_list: - result = function_executor.exec_code( - tool_lib.code, - {**all_params, **self.workflow_params.get('data_source'), - 'download_item': item}, - function_name='download' - ) - file_bytes = result.get('file_bytes', []) - chunks = [] - for chunk in file_bytes: - chunks.append(base64.b64decode(chunk)) - file = bytes_to_uploaded_file(b''.join(chunks), result.get('name')) - file_url = self.upload_knowledge_file(file) - download_file_list.append({'file_id': file_url.split('/')[-1], 'name': result.get('name')}) - all_params = { - **all_params, **self.workflow_params.get('data_source'), - 'download_file_list': download_file_list - } - result = download_file_list + exist = function_executor.exist_function(tool_lib.code, 'get_download_file_list') + if exist: + download_file_list = [] + download_list = function_executor.exec_code(tool_lib.code, + {**all_params, **self.workflow_params.get('data_source')}, + function_name='get_download_file_list') + for item in download_list: + result = function_executor.exec_code(tool_lib.code, + {**all_params, **self.workflow_params.get('data_source'), + 'download_item': item}, + function_name='download') + file = bytes_to_uploaded_file(ast.literal_eval(result.get('file_bytes')), result.get('name')) + file_url = self.upload_knowledge_file(file) + download_file_list.append({'file_id': file_url, 'name': result.get('name')}) + result = download_file_list + else: + result = function_executor.exec_code(tool_lib.code, all_params) else: result = function_executor.exec_code(tool_lib.code, all_params) return NodeResult({'result': result}, @@ -237,7 +230,7 @@ class BaseToolLibNodeNode(IToolLibNode): 'meta': meta, 'source_id': knowledge_id, 'source_type': FileSourceType.KNOWLEDGE.value - }).upload() + }).upload().replace("./oss/file/", '') file.close() return file_url diff --git a/apps/common/utils/tool_code.py b/apps/common/utils/tool_code.py index 2e9263787..a57262d64 100644 --- a/apps/common/utils/tool_code.py +++ b/apps/common/utils/tool_code.py @@ -74,6 +74,47 @@ class ToolExecutor: except Exception as e: maxkb_logger.error(f'Exception: {e}', exc_info=True) + def exist_function(self, code_str, name): + _id = str(uuid.uuid7()) + python_paths = CONFIG.get_sandbox_python_package_paths().split(',') + set_run_user = f'os.setgid({pwd.getpwnam(_run_user).pw_gid});os.setuid({pwd.getpwnam(_run_user).pw_uid});' if _enable_sandbox else '' + _exec_code = f""" +try: + import os, sys, json + path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps'] + sys.path = [p for p in sys.path if p not in path_to_exclude] + sys.path += {python_paths} + locals_v={{}} + globals_v={{}} + {set_run_user} + os.environ.clear() + exec({dedent(code_str)!a}, globals_v, locals_v) + exec_result=locals_v.__contains__('{name}') + sys.stdout.write("\\n{_id}:") + json.dump({{'code':200,'msg':'success','data':exec_result}}, sys.stdout, default=str) +except Exception as e: + if isinstance(e, MemoryError): e = Exception("Cannot allocate more memory: exceeded the limit of {_process_limit_mem_mb} MB.") + sys.stdout.write("\\n{_id}:") + json.dump({{'code':500,'msg':str(e),'data':False}}, sys.stdout, default=str) +sys.stdout.flush() + """ + maxkb_logger.debug(f"Sandbox execute code: {_exec_code}") + with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=True) as f: + f.write(_exec_code) + f.flush() + subprocess_result = self._exec(f.name) + if subprocess_result.returncode != 0: + raise Exception(subprocess_result.stderr or subprocess_result.stdout or "Unknown exception occurred") + lines = subprocess_result.stdout.splitlines() + result_line = [line for line in lines if line.startswith(_id)] + if not result_line: + maxkb_logger.error("\n".join(lines)) + raise Exception("No result found.") + result = json.loads(result_line[-1].split(":", 1)[1]) + if result.get('code') == 200: + return result.get('data') + raise Exception(result.get('msg')) + def exec_code(self, code_str, keywords, function_name=None): _id = str(uuid.uuid7()) action_function = f'({function_name !a}, locals_v.get({function_name !a}))' if function_name else 'locals_v.popitem()' @@ -213,7 +254,7 @@ exec({dedent(code)!a}) ], 'cwd': _sandbox_path, 'env': { - 'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so', + 'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so', }, 'transport': 'stdio', }