fix: update file handling functions (#4455)

This commit is contained in:
shaohuzhang1 2025-12-08 14:26:44 +08:00 committed by GitHub
parent bd43d2f135
commit 8a9de5964d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 62 additions and 28 deletions

View File

@ -6,7 +6,8 @@
@date2024/8/8 17:49
@desc:
"""
import base64
import ast
import io
import json
import mimetypes
@ -195,31 +196,23 @@ class BaseToolLibNodeNode(IToolLibNode):
else:
all_params = init_params_default_value | params
if self.node.properties.get('kind') == 'data-source':
download_file_list = []
download_list = function_executor.exec_code(
tool_lib.code,
{**all_params, **self.workflow_params.get('data_source')},
function_name='get_down_file_list'
)
for item in download_list:
result = function_executor.exec_code(
tool_lib.code,
{**all_params, **self.workflow_params.get('data_source'),
'download_item': item},
function_name='download'
)
file_bytes = result.get('file_bytes', [])
chunks = []
for chunk in file_bytes:
chunks.append(base64.b64decode(chunk))
file = bytes_to_uploaded_file(b''.join(chunks), result.get('name'))
file_url = self.upload_knowledge_file(file)
download_file_list.append({'file_id': file_url.split('/')[-1], 'name': result.get('name')})
all_params = {
**all_params, **self.workflow_params.get('data_source'),
'download_file_list': download_file_list
}
result = download_file_list
exist = function_executor.exist_function(tool_lib.code, 'get_download_file_list')
if exist:
download_file_list = []
download_list = function_executor.exec_code(tool_lib.code,
{**all_params, **self.workflow_params.get('data_source')},
function_name='get_download_file_list')
for item in download_list:
result = function_executor.exec_code(tool_lib.code,
{**all_params, **self.workflow_params.get('data_source'),
'download_item': item},
function_name='download')
file = bytes_to_uploaded_file(ast.literal_eval(result.get('file_bytes')), result.get('name'))
file_url = self.upload_knowledge_file(file)
download_file_list.append({'file_id': file_url, 'name': result.get('name')})
result = download_file_list
else:
result = function_executor.exec_code(tool_lib.code, all_params)
else:
result = function_executor.exec_code(tool_lib.code, all_params)
return NodeResult({'result': result},
@ -237,7 +230,7 @@ class BaseToolLibNodeNode(IToolLibNode):
'meta': meta,
'source_id': knowledge_id,
'source_type': FileSourceType.KNOWLEDGE.value
}).upload()
}).upload().replace("./oss/file/", '')
file.close()
return file_url

View File

@ -74,6 +74,47 @@ class ToolExecutor:
except Exception as e:
maxkb_logger.error(f'Exception: {e}', exc_info=True)
def exist_function(self, code_str, name):
_id = str(uuid.uuid7())
python_paths = CONFIG.get_sandbox_python_package_paths().split(',')
set_run_user = f'os.setgid({pwd.getpwnam(_run_user).pw_gid});os.setuid({pwd.getpwnam(_run_user).pw_uid});' if _enable_sandbox else ''
_exec_code = f"""
try:
import os, sys, json
path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps']
sys.path = [p for p in sys.path if p not in path_to_exclude]
sys.path += {python_paths}
locals_v={{}}
globals_v={{}}
{set_run_user}
os.environ.clear()
exec({dedent(code_str)!a}, globals_v, locals_v)
exec_result=locals_v.__contains__('{name}')
sys.stdout.write("\\n{_id}:")
json.dump({{'code':200,'msg':'success','data':exec_result}}, sys.stdout, default=str)
except Exception as e:
if isinstance(e, MemoryError): e = Exception("Cannot allocate more memory: exceeded the limit of {_process_limit_mem_mb} MB.")
sys.stdout.write("\\n{_id}:")
json.dump({{'code':500,'msg':str(e),'data':False}}, sys.stdout, default=str)
sys.stdout.flush()
"""
maxkb_logger.debug(f"Sandbox execute code: {_exec_code}")
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=True) as f:
f.write(_exec_code)
f.flush()
subprocess_result = self._exec(f.name)
if subprocess_result.returncode != 0:
raise Exception(subprocess_result.stderr or subprocess_result.stdout or "Unknown exception occurred")
lines = subprocess_result.stdout.splitlines()
result_line = [line for line in lines if line.startswith(_id)]
if not result_line:
maxkb_logger.error("\n".join(lines))
raise Exception("No result found.")
result = json.loads(result_line[-1].split(":", 1)[1])
if result.get('code') == 200:
return result.get('data')
raise Exception(result.get('msg'))
def exec_code(self, code_str, keywords, function_name=None):
_id = str(uuid.uuid7())
action_function = f'({function_name !a}, locals_v.get({function_name !a}))' if function_name else 'locals_v.popitem()'
@ -213,7 +254,7 @@ exec({dedent(code)!a})
],
'cwd': _sandbox_path,
'env': {
'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so',
'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so',
},
'transport': 'stdio',
}