mirror of
https://github.com/1Panel-dev/MaxKB.git
synced 2025-12-25 17:22:55 +00:00
feat: Knowledge base workflow supports terminating execution
This commit is contained in:
parent
ab87d11a25
commit
e42454c06f
|
|
@ -123,6 +123,8 @@ def get_loop_workflow_node(node_list):
|
|||
|
||||
|
||||
def get_workflow_state(workflow):
|
||||
if workflow.is_the_task_interrupted():
|
||||
return State.REVOKED
|
||||
details = workflow.get_runtime_details()
|
||||
node_list = details.values()
|
||||
all_node = [*node_list, *get_loop_workflow_node(node_list)]
|
||||
|
|
|
|||
|
|
@ -30,10 +30,10 @@ class KnowledgeWorkflowManage(WorkflowManage):
|
|||
work_flow_post_handler: WorkFlowPostHandler,
|
||||
base_to_response: BaseToResponse = SystemToResponse(),
|
||||
start_node_id=None,
|
||||
start_node_data=None, chat_record=None, child_node=None):
|
||||
start_node_data=None, chat_record=None, child_node=None, is_the_task_interrupted=lambda: False):
|
||||
super().__init__(flow, params, work_flow_post_handler, base_to_response, None, None, None,
|
||||
None,
|
||||
None, None, start_node_id, start_node_data, chat_record, child_node)
|
||||
None, None, start_node_id, start_node_data, chat_record, child_node, is_the_task_interrupted)
|
||||
|
||||
def get_params_serializer_class(self):
|
||||
return KnowledgeFlowParamsSerializer
|
||||
|
|
@ -91,6 +91,9 @@ class KnowledgeWorkflowManage(WorkflowManage):
|
|||
list(result)
|
||||
if current_node.status == 500:
|
||||
return None
|
||||
if self.is_the_task_interrupted():
|
||||
current_node.status = 201
|
||||
return None
|
||||
return current_result
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
|
|
|
|||
|
|
@ -92,14 +92,14 @@ class LoopWorkflowManage(WorkflowManage):
|
|||
get_loop_context,
|
||||
base_to_response: BaseToResponse = SystemToResponse(),
|
||||
start_node_id=None,
|
||||
start_node_data=None, chat_record=None, child_node=None):
|
||||
start_node_data=None, chat_record=None, child_node=None, is_the_task_interrupted=lambda: False):
|
||||
self.parentWorkflowManage = parentWorkflowManage
|
||||
self.loop_params = loop_params
|
||||
self.get_loop_context = get_loop_context
|
||||
self.loop_field_list = []
|
||||
super().__init__(flow, params, work_flow_post_handler, base_to_response, None, None, None,
|
||||
None,
|
||||
None, None, start_node_id, start_node_data, chat_record, child_node)
|
||||
None, None, start_node_id, start_node_data, chat_record, child_node, is_the_task_interrupted)
|
||||
|
||||
def get_node_cls_by_id(self, node_id, up_node_id_list=None,
|
||||
get_node_params=lambda node: node.properties.get('node_data')):
|
||||
|
|
|
|||
|
|
@ -63,6 +63,8 @@ def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INo
|
|||
response_reasoning_content = False
|
||||
|
||||
for chunk in response:
|
||||
if workflow.is_the_task_interrupted():
|
||||
break
|
||||
reasoning_chunk = reasoning.get_reasoning_content(chunk)
|
||||
content_chunk = reasoning_chunk.get('content')
|
||||
if 'reasoning_content' in chunk.additional_kwargs:
|
||||
|
|
@ -110,7 +112,8 @@ def write_context(node_variable: Dict, workflow_variable: Dict, node: INode, wor
|
|||
if 'reasoning_content' in meta:
|
||||
reasoning_content = (meta.get('reasoning_content', '') or '')
|
||||
else:
|
||||
reasoning_content = (reasoning_result.get('reasoning_content') or '') + (reasoning_result_end.get('reasoning_content') or '')
|
||||
reasoning_content = (reasoning_result.get('reasoning_content') or '') + (
|
||||
reasoning_result_end.get('reasoning_content') or '')
|
||||
_write_context(node_variable, workflow_variable, node, workflow, content, reasoning_content)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -268,7 +268,8 @@ class BaseLoopNode(ILoopNode):
|
|||
start_node_id=start_node_id,
|
||||
start_node_data=start_node_data,
|
||||
chat_record=chat_record,
|
||||
child_node=child_node
|
||||
child_node=child_node,
|
||||
is_the_task_interrupted=self.workflow_manage.is_the_task_interrupted
|
||||
)
|
||||
|
||||
return workflow_manage
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ class WorkflowManage:
|
|||
video_list=None,
|
||||
other_list=None,
|
||||
start_node_id=None,
|
||||
start_node_data=None, chat_record=None, child_node=None):
|
||||
start_node_data=None, chat_record=None, child_node=None, is_the_task_interrupted=lambda: False):
|
||||
if form_data is None:
|
||||
form_data = {}
|
||||
if image_list is None:
|
||||
|
|
@ -138,6 +138,7 @@ class WorkflowManage:
|
|||
self.global_field_list = []
|
||||
self.chat_field_list = []
|
||||
self.init_fields()
|
||||
self.is_the_task_interrupted = is_the_task_interrupted
|
||||
if start_node_id is not None:
|
||||
self.load_node(chat_record, start_node_id, start_node_data)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ class Cache_Version(Enum):
|
|||
SYSTEM = "SYSTEM", lambda key: key
|
||||
# 应用对接三方应用的缓存
|
||||
APPLICATION_THIRD_PARTY = "APPLICATION:THIRD_PARTY", lambda key: key
|
||||
|
||||
KNOWLEDGE_WORKFLOW_INTERRUPTED = "KNOWLEDGE_WORKFLOW_INTERRUPTED", lambda action_id: action_id
|
||||
# 对话
|
||||
CHAT = "CHAT", lambda key: key
|
||||
|
||||
|
|
|
|||
|
|
@ -7,36 +7,41 @@ import os
|
|||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import pwd
|
||||
import resource
|
||||
import getpass
|
||||
import random
|
||||
import signal
|
||||
import time
|
||||
import uuid_utils.compat as uuid
|
||||
from contextlib import contextmanager
|
||||
from common.utils.logger import maxkb_logger
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from maxkb.const import BASE_DIR, CONFIG
|
||||
from maxkb.const import PROJECT_DIR
|
||||
from textwrap import dedent
|
||||
|
||||
_enable_sandbox = bool(CONFIG.get('SANDBOX', 0))
|
||||
_run_user = 'sandbox' if _enable_sandbox else getpass.getuser()
|
||||
_sandbox_path = CONFIG.get("SANDBOX_HOME", '/opt/maxkb-app/sandbox') if _enable_sandbox else os.path.join(PROJECT_DIR, 'data', 'sandbox')
|
||||
_process_limit_timeout_seconds = int(CONFIG.get("SANDBOX_PYTHON_PROCESS_LIMIT_TIMEOUT_SECONDS", '3600'))
|
||||
_process_limit_cpu_cores = min(max(int(CONFIG.get("SANDBOX_PYTHON_PROCESS_LIMIT_CPU_CORES", '1')), 1), len(os.sched_getaffinity(0))) if sys.platform.startswith("linux") else os.cpu_count() # 只支持linux,window和mac不支持
|
||||
_process_limit_mem_mb = int(CONFIG.get("SANDBOX_PYTHON_PROCESS_LIMIT_MEM_MB", '256'))
|
||||
python_directory = sys.executable
|
||||
|
||||
|
||||
class ToolExecutor:
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
def __init__(self, sandbox=False):
|
||||
self.sandbox = sandbox
|
||||
if sandbox:
|
||||
self.sandbox_path = CONFIG.get("SANDBOX_HOME", '/opt/maxkb-app/sandbox')
|
||||
self.user = 'sandbox'
|
||||
else:
|
||||
self.sandbox_path = os.path.join(PROJECT_DIR, 'data', 'sandbox')
|
||||
self.user = None
|
||||
self.sandbox_so_path = f'{self.sandbox_path}/lib/sandbox.so'
|
||||
self.process_timeout_seconds = int(CONFIG.get("SANDBOX_PYTHON_PROCESS_TIMEOUT_SECONDS", '3600'))
|
||||
try:
|
||||
self._init_sandbox_dir()
|
||||
except Exception as e:
|
||||
# 本机忽略异常,容器内不忽略
|
||||
maxkb_logger.error(f'Exception: {e}', exc_info=True)
|
||||
if self.sandbox:
|
||||
raise e
|
||||
|
||||
@staticmethod
|
||||
def init_sandbox_dir():
|
||||
if not _enable_sandbox:
|
||||
# 不启用sandbox就不初始化目录
|
||||
def _init_sandbox_dir(self):
|
||||
if not self.sandbox:
|
||||
# 不是sandbox就不初始化目录
|
||||
return
|
||||
try:
|
||||
# 只初始化一次
|
||||
|
|
@ -46,7 +51,7 @@ class ToolExecutor:
|
|||
except FileExistsError:
|
||||
# 文件已存在 → 已初始化过
|
||||
return
|
||||
maxkb_logger.info("Init sandbox dir.")
|
||||
maxkb_logger.debug("init dir")
|
||||
try:
|
||||
os.system("chmod -R g-rwx /dev/shm /dev/mqueue")
|
||||
os.system("chmod o-rwx /run/postgresql")
|
||||
|
|
@ -56,7 +61,7 @@ class ToolExecutor:
|
|||
if CONFIG.get("SANDBOX_TMP_DIR_ENABLED", '0') == "1":
|
||||
os.system("chmod g+rwx /tmp")
|
||||
# 初始化sandbox配置文件
|
||||
sandbox_lib_path = os.path.dirname(f'{_sandbox_path}/lib/sandbox.so')
|
||||
sandbox_lib_path = os.path.dirname(self.sandbox_so_path)
|
||||
sandbox_conf_file_path = f'{sandbox_lib_path}/.sandbox.conf'
|
||||
if os.path.exists(sandbox_conf_file_path):
|
||||
os.remove(sandbox_conf_file_path)
|
||||
|
|
@ -69,49 +74,37 @@ class ToolExecutor:
|
|||
with open(sandbox_conf_file_path, "w") as f:
|
||||
f.write(f"SANDBOX_PYTHON_BANNED_HOSTS={banned_hosts}\n")
|
||||
f.write(f"SANDBOX_PYTHON_ALLOW_SUBPROCESS={allow_subprocess}\n")
|
||||
os.system(f"chmod -R 550 {_sandbox_path}")
|
||||
|
||||
try:
|
||||
init_sandbox_dir()
|
||||
except Exception as e:
|
||||
maxkb_logger.error(f'Exception: {e}', exc_info=True)
|
||||
os.system(f"chmod -R 550 {self.sandbox_path}")
|
||||
|
||||
def exec_code(self, code_str, keywords, function_name=None):
|
||||
_id = str(uuid.uuid7())
|
||||
success = '{"code":200,"msg":"成功","data":exec_result}'
|
||||
err = '{"code":500,"msg":str(e),"data":None}'
|
||||
action_function = f'({function_name !a}, locals_v.get({function_name !a}))' if function_name else 'locals_v.popitem()'
|
||||
python_paths = CONFIG.get_sandbox_python_package_paths().split(',')
|
||||
set_run_user = f'os.setgid({pwd.getpwnam(_run_user).pw_gid});os.setuid({pwd.getpwnam(_run_user).pw_uid});' if _enable_sandbox else ''
|
||||
_exec_code = f"""
|
||||
try:
|
||||
import os, sys, json
|
||||
from contextlib import redirect_stdout
|
||||
import os, sys, json, base64, builtins
|
||||
path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps']
|
||||
sys.path = [p for p in sys.path if p not in path_to_exclude]
|
||||
sys.path += {python_paths}
|
||||
locals_v={{}}
|
||||
locals_v={'{}'}
|
||||
keywords={keywords}
|
||||
globals_v={{}}
|
||||
{set_run_user}
|
||||
globals_v={'{}'}
|
||||
os.environ.clear()
|
||||
with redirect_stdout(open(os.devnull, 'w')):
|
||||
exec({dedent(code_str)!a}, globals_v, locals_v)
|
||||
f_name, f = {action_function}
|
||||
globals_v.update(locals_v)
|
||||
exec_result=f(**keywords)
|
||||
sys.stdout.write("\\n{_id}:")
|
||||
json.dump({{'code':200,'msg':'success','data':exec_result}}, sys.stdout, default=str)
|
||||
exec({dedent(code_str)!a}, globals_v, locals_v)
|
||||
f_name, f = {action_function}
|
||||
for local in locals_v:
|
||||
globals_v[local] = locals_v[local]
|
||||
exec_result=f(**keywords)
|
||||
builtins.print("\\n{_id}:"+base64.b64encode(json.dumps({success}, default=str).encode()).decode())
|
||||
except Exception as e:
|
||||
if isinstance(e, MemoryError): e = Exception("Cannot allocate more memory: exceeded the limit of {_process_limit_mem_mb} MB.")
|
||||
sys.stdout.write("\\n{_id}:")
|
||||
json.dump({{'code':500,'msg':str(e),'data':None}}, sys.stdout, default=str)
|
||||
sys.stdout.flush()
|
||||
builtins.print("\\n{_id}:"+base64.b64encode(json.dumps({err}, default=str).encode()).decode())
|
||||
"""
|
||||
maxkb_logger.debug(f"Sandbox execute code: {_exec_code}")
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=True) as f:
|
||||
f.write(_exec_code)
|
||||
f.flush()
|
||||
with execution_timer(_id):
|
||||
subprocess_result = self._exec(f.name)
|
||||
if self.sandbox:
|
||||
subprocess_result = self._exec_sandbox(_exec_code)
|
||||
else:
|
||||
subprocess_result = self._exec(_exec_code)
|
||||
if subprocess_result.returncode != 0:
|
||||
raise Exception(subprocess_result.stderr or subprocess_result.stdout or "Unknown exception occurred")
|
||||
lines = subprocess_result.stdout.splitlines()
|
||||
|
|
@ -119,10 +112,10 @@ sys.stdout.flush()
|
|||
if not result_line:
|
||||
maxkb_logger.error("\n".join(lines))
|
||||
raise Exception("No result found.")
|
||||
result = json.loads(result_line[-1].split(":", 1)[1])
|
||||
result = json.loads(base64.b64decode(result_line[-1].split(":", 1)[1]).decode())
|
||||
if result.get('code') == 200:
|
||||
return result.get('data')
|
||||
raise Exception(result.get('msg') + (f'\n{subprocess_result.stderr}' if subprocess_result.stderr else ''))
|
||||
raise Exception(result.get('msg'))
|
||||
|
||||
def _generate_mcp_server_code(self, _code, params):
|
||||
# 解析代码,提取导入语句和函数定义
|
||||
|
|
@ -190,7 +183,6 @@ sys.stdout.flush()
|
|||
def generate_mcp_server_code(self, code_str, params):
|
||||
python_paths = CONFIG.get_sandbox_python_package_paths().split(',')
|
||||
code = self._generate_mcp_server_code(code_str, params)
|
||||
set_run_user = f'os.setgid({pwd.getpwnam(_run_user).pw_gid});os.setuid({pwd.getpwnam(_run_user).pw_uid});' if _enable_sandbox else ''
|
||||
return f"""
|
||||
import os, sys, logging
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
|
|
@ -199,7 +191,6 @@ logging.getLogger("mcp.server").setLevel(logging.ERROR)
|
|||
path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps']
|
||||
sys.path = [p for p in sys.path if p not in path_to_exclude]
|
||||
sys.path += {python_paths}
|
||||
{set_run_user}
|
||||
os.environ.clear()
|
||||
exec({dedent(code)!a})
|
||||
"""
|
||||
|
|
@ -208,39 +199,67 @@ exec({dedent(code)!a})
|
|||
_code = self.generate_mcp_server_code(code, params)
|
||||
maxkb_logger.debug(f"Python code of mcp tool: {_code}")
|
||||
compressed_and_base64_encoded_code_str = base64.b64encode(gzip.compress(_code.encode())).decode()
|
||||
tool_config = {
|
||||
'command': sys.executable,
|
||||
'args': [
|
||||
'-c',
|
||||
f'import base64,gzip; exec(gzip.decompress(base64.b64decode(\'{compressed_and_base64_encoded_code_str}\')).decode())',
|
||||
],
|
||||
'cwd': _sandbox_path,
|
||||
'env': {
|
||||
'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so',
|
||||
},
|
||||
'transport': 'stdio',
|
||||
}
|
||||
if self.sandbox:
|
||||
tool_config = {
|
||||
'command': 'su',
|
||||
'args': [
|
||||
'-s', sys.executable,
|
||||
'-c',
|
||||
f'import base64,gzip; exec(gzip.decompress(base64.b64decode(\'{compressed_and_base64_encoded_code_str}\')).decode())',
|
||||
self.user,
|
||||
],
|
||||
'cwd': self.sandbox_path,
|
||||
'env': {
|
||||
'LD_PRELOAD': self.sandbox_so_path,
|
||||
},
|
||||
'transport': 'stdio',
|
||||
}
|
||||
else:
|
||||
tool_config = {
|
||||
'command': sys.executable,
|
||||
'args': [
|
||||
'-c',
|
||||
f'import base64,gzip; exec(gzip.decompress(base64.b64decode(\'{compressed_and_base64_encoded_code_str}\')).decode())',
|
||||
],
|
||||
'transport': 'stdio',
|
||||
}
|
||||
return tool_config
|
||||
|
||||
def _exec(self, execute_file):
|
||||
def _exec_sandbox(self, _code):
|
||||
kwargs = {'cwd': BASE_DIR, 'env': {
|
||||
'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so',
|
||||
'LD_PRELOAD': self.sandbox_so_path,
|
||||
}}
|
||||
maxkb_logger.debug(f"Sandbox execute code: {_code}")
|
||||
compressed_and_base64_encoded_code_str = base64.b64encode(gzip.compress(_code.encode())).decode()
|
||||
cmd = [
|
||||
'su', '-s', python_directory, '-c',
|
||||
f'import base64,gzip; exec(gzip.decompress(base64.b64decode(\'{compressed_and_base64_encoded_code_str}\')).decode())',
|
||||
self.user
|
||||
]
|
||||
try:
|
||||
subprocess_result = subprocess.run(
|
||||
[sys.executable, execute_file],
|
||||
timeout=_process_limit_timeout_seconds,
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
**kwargs,
|
||||
preexec_fn=(lambda: None if (not _enable_sandbox or not sys.platform.startswith("linux")) else (
|
||||
resource.setrlimit(resource.RLIMIT_AS, (_process_limit_mem_mb * 1024 * 1024,) * 2),
|
||||
os.sched_setaffinity(0, set(random.sample(list(os.sched_getaffinity(0)), _process_limit_cpu_cores)))
|
||||
))
|
||||
start_new_session=True
|
||||
)
|
||||
proc.wait(timeout=self.process_timeout_seconds)
|
||||
return subprocess.CompletedProcess(
|
||||
proc.args,
|
||||
proc.returncode,
|
||||
proc.stdout.read(),
|
||||
proc.stderr.read()
|
||||
)
|
||||
return subprocess_result
|
||||
except subprocess.TimeoutExpired:
|
||||
raise Exception(_(f"Process execution timed out after {_process_limit_timeout_seconds} seconds."))
|
||||
pgid = os.getpgid(proc.pid)
|
||||
os.killpg(pgid, signal.SIGTERM) #温和终止
|
||||
time.sleep(1) #留出短暂时间让进程清理
|
||||
if proc.poll() is None: #如果仍未终止,强制终止
|
||||
os.killpg(pgid, signal.SIGKILL)
|
||||
proc.wait()
|
||||
raise Exception(_(f"Process execution timed out after {self.process_timeout_seconds} seconds."))
|
||||
|
||||
def validate_mcp_transport(self, code_str):
|
||||
servers = json.loads(code_str)
|
||||
|
|
@ -248,11 +267,6 @@ exec({dedent(code)!a})
|
|||
if config.get('transport') not in ['sse', 'streamable_http']:
|
||||
raise Exception(_('Only support transport=sse or transport=streamable_http'))
|
||||
|
||||
|
||||
@contextmanager
|
||||
def execution_timer(id=""):
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
maxkb_logger.debug(f"Tool execution({id}) takes {time.perf_counter() - start:.6f} seconds.")
|
||||
@staticmethod
|
||||
def _exec(_code):
|
||||
return subprocess.run([python_directory, '-c', _code], text=True, capture_output=True)
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ from application.flow.i_step_node import KnowledgeWorkflowPostHandler
|
|||
from application.flow.knowledge_workflow_manage import KnowledgeWorkflowManage
|
||||
from application.flow.step_node import get_node
|
||||
from application.serializers.application import get_mcp_tools
|
||||
from common.constants.cache_version import Cache_Version
|
||||
from common.db.search import page_search
|
||||
from common.exception.app_exception import AppApiException
|
||||
from common.utils.rsa_util import rsa_long_decrypt
|
||||
|
|
@ -22,7 +23,7 @@ from common.utils.tool_code import ToolExecutor
|
|||
from knowledge.models import KnowledgeScope, Knowledge, KnowledgeType, KnowledgeWorkflow, KnowledgeWorkflowVersion
|
||||
from knowledge.models.knowledge_action import KnowledgeAction, State
|
||||
from knowledge.serializers.knowledge import KnowledgeModelSerializer
|
||||
from maxkb.const import CONFIG
|
||||
from django.core.cache import cache
|
||||
from system_manage.models import AuthTargetType
|
||||
from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
|
||||
from tools.models import Tool
|
||||
|
|
@ -52,7 +53,11 @@ class KnowledgeWorkflowActionSerializer(serializers.Serializer):
|
|||
knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
|
||||
|
||||
def get_query_set(self, instance: Dict):
|
||||
query_set = QuerySet(KnowledgeAction).filter(knowledge_id=self.data.get('knowledge_id')).values('id','knowledge_id',"state",'meta','run_time',"create_time")
|
||||
query_set = QuerySet(KnowledgeAction).filter(knowledge_id=self.data.get('knowledge_id')).values('id',
|
||||
'knowledge_id',
|
||||
"state", 'meta',
|
||||
'run_time',
|
||||
"create_time")
|
||||
if instance.get("user_name"):
|
||||
query_set = query_set.filter(meta__user_name__icontains=instance.get('user_name'))
|
||||
if instance.get('state'):
|
||||
|
|
@ -73,7 +78,8 @@ class KnowledgeWorkflowActionSerializer(serializers.Serializer):
|
|||
KnowledgeWorkflowActionListQuerySerializer(data=instance).is_valid(raise_exception=True)
|
||||
return page_search(current_page, page_size, self.get_query_set(instance),
|
||||
lambda a: {'id': a.get("id"), 'knowledge_id': a.get("knowledge_id"), 'state': a.get("state"),
|
||||
'meta': a.get("meta"), 'run_time': a.get("run_time"), 'create_time': a.get("create_time")})
|
||||
'meta': a.get("meta"), 'run_time': a.get("run_time"),
|
||||
'create_time': a.get("create_time")})
|
||||
|
||||
def action(self, instance: Dict, user, with_valid=True):
|
||||
if with_valid:
|
||||
|
|
@ -91,7 +97,10 @@ class KnowledgeWorkflowActionSerializer(serializers.Serializer):
|
|||
{'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True,
|
||||
'workspace_id': self.data.get("workspace_id"),
|
||||
**instance},
|
||||
KnowledgeWorkflowPostHandler(None, knowledge_action_id))
|
||||
KnowledgeWorkflowPostHandler(None, knowledge_action_id),
|
||||
is_the_task_interrupted=lambda: cache.get(
|
||||
Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id),
|
||||
version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version()) or False)
|
||||
work_flow_manage.run()
|
||||
return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED,
|
||||
'details': {}, 'meta': meta}
|
||||
|
|
@ -135,6 +144,15 @@ class KnowledgeWorkflowActionSerializer(serializers.Serializer):
|
|||
'details': knowledge_action.details,
|
||||
'meta': knowledge_action.meta}
|
||||
|
||||
def cancel(self, is_valid=True):
|
||||
if is_valid:
|
||||
self.is_valid(raise_exception=True)
|
||||
knowledge_action_id = self.data.get("id")
|
||||
cache.set(Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id), True,
|
||||
version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version())
|
||||
QuerySet(KnowledgeAction).filter(id=knowledge_action_id).update(state=State.REVOKE)
|
||||
return True
|
||||
|
||||
|
||||
class KnowledgeWorkflowSerializer(serializers.Serializer):
|
||||
class Datasource(serializers.Serializer):
|
||||
|
|
|
|||
|
|
@ -76,6 +76,7 @@ urlpatterns = [
|
|||
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action/<int:current_page>/<int:page_size>', views.KnowledgeWorkflowActionView.Page.as_view()),
|
||||
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/upload_document', views.KnowledgeWorkflowUploadDocumentView.as_view()),
|
||||
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action/<str:knowledge_action_id>', views.KnowledgeWorkflowActionView.Operate.as_view()),
|
||||
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action/<str:knowledge_action_id>/cancel', views.KnowledgeWorkflowActionView.Cancel.as_view()),
|
||||
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/mcp_tools', views.McpServers.as_view()),
|
||||
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/knowledge_version', views.KnowledgeWorkflowVersionView.as_view()),
|
||||
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/knowledge_version/<int:current_page>/<int:page_size>', views.KnowledgeWorkflowVersionView.Page.as_view()),
|
||||
|
|
|
|||
|
|
@ -168,6 +168,33 @@ class KnowledgeWorkflowActionView(APIView):
|
|||
data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id, 'id': knowledge_action_id})
|
||||
.one())
|
||||
|
||||
class Cancel(APIView):
|
||||
authentication_classes = [TokenAuth]
|
||||
|
||||
@extend_schema(
|
||||
methods=['POST'],
|
||||
description=_('Cancel knowledge workflow action'),
|
||||
summary=_('Cancel knowledge workflow action'),
|
||||
operation_id=_('Cancel knowledge workflow action'), # type: ignore
|
||||
parameters=KnowledgeWorkflowActionApi.get_parameters(),
|
||||
responses=DefaultResultSerializer(),
|
||||
tags=[_('Knowledge Base')] # type: ignore
|
||||
)
|
||||
@has_permissions(
|
||||
PermissionConstants.KNOWLEDGE_WORKFLOW_EDIT.get_workspace_knowledge_permission(),
|
||||
PermissionConstants.KNOWLEDGE_WORKFLOW_EDIT.get_workspace_permission_workspace_manage_role(),
|
||||
RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
|
||||
ViewPermission(
|
||||
[RoleConstants.USER.get_workspace_role()],
|
||||
[PermissionConstants.KNOWLEDGE.get_workspace_knowledge_permission()],
|
||||
CompareConstants.AND
|
||||
),
|
||||
)
|
||||
def post(self, request, workspace_id: str, knowledge_id: str, knowledge_action_id: str):
|
||||
return result.success(KnowledgeWorkflowActionSerializer.Operate(
|
||||
data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id, 'id': knowledge_action_id})
|
||||
.cancel())
|
||||
|
||||
|
||||
class KnowledgeWorkflowView(APIView):
|
||||
authentication_classes = [TokenAuth]
|
||||
|
|
|
|||
|
|
@ -433,7 +433,13 @@ const getWorkflowAction: (
|
|||
) => Promise<Result<any>> = (knowledge_id: string, knowledge_action_id, loading) => {
|
||||
return get(`${prefix.value}/${knowledge_id}/action/${knowledge_action_id}`, {}, loading)
|
||||
}
|
||||
|
||||
const cancelWorkflowAction: (
|
||||
knowledge_id: string,
|
||||
knowledge_action_id: string,
|
||||
loading?: Ref<boolean>,
|
||||
) => Promise<Result<any>> = (knowledge_id: string, knowledge_action_id, loading) => {
|
||||
return post(`${prefix.value}/${knowledge_id}/action/${knowledge_action_id}/cancel`, {}, loading)
|
||||
}
|
||||
/**
|
||||
* mcp 节点
|
||||
*/
|
||||
|
|
@ -480,4 +486,5 @@ export default {
|
|||
putKnowledgeWorkflow,
|
||||
workflowUpload,
|
||||
getWorkflowActionPage,
|
||||
cancelWorkflowAction,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,6 +47,20 @@
|
|||
<el-icon class="color-danger"><CircleCloseFilled /></el-icon>
|
||||
{{ $t('common.status.fail') }}
|
||||
</el-text>
|
||||
<el-text
|
||||
class="color-text-primary"
|
||||
v-else-if="props.currentContent?.state === 'REVOKED'"
|
||||
>
|
||||
<el-icon class="color-danger"><CircleCloseFilled /></el-icon>
|
||||
{{ $t('common.status.REVOKED', '已取消') }}
|
||||
</el-text>
|
||||
<el-text
|
||||
class="color-text-primary"
|
||||
v-else-if="props.currentContent?.state === 'REVOKE'"
|
||||
>
|
||||
<el-icon class="is-loading color-primary"><Loading /></el-icon>
|
||||
{{ $t('views.document.fileStatus.REVOKE', '取消中') }}
|
||||
</el-text>
|
||||
<el-text class="color-text-primary" v-else>
|
||||
<el-icon class="is-loading color-primary"><Loading /></el-icon>
|
||||
{{ $t('common.status.padding') }}
|
||||
|
|
|
|||
|
|
@ -64,6 +64,14 @@
|
|||
<el-icon class="color-danger"><CircleCloseFilled /></el-icon>
|
||||
{{ $t('common.status.fail') }}
|
||||
</el-text>
|
||||
<el-text class="color-text-primary" v-else-if="row.state === 'REVOKED'">
|
||||
<el-icon class="color-danger"><CircleCloseFilled /></el-icon>
|
||||
{{ $t('common.status.REVOKED', '已取消') }}
|
||||
</el-text>
|
||||
<el-text class="color-text-primary" v-else-if="row.state === 'REVOKE'">
|
||||
<el-icon class="is-loading color-primary"><Loading /></el-icon>
|
||||
{{ $t('views.document.fileStatus.REVOKE', '取消中') }}
|
||||
</el-text>
|
||||
<el-text class="color-text-primary" v-else>
|
||||
<el-icon class="is-loading color-primary"><Loading /></el-icon>
|
||||
{{ $t('common.status.padding') }}
|
||||
|
|
@ -87,11 +95,22 @@
|
|||
|
||||
<el-table-column :label="$t('common.operation')" width="80">
|
||||
<template #default="{ row }">
|
||||
<el-tooltip effect="dark" :content="$t('chat.executionDetails.title')" placement="top">
|
||||
<el-button type="primary" text @click.stop="toDetails(row)">
|
||||
<AppIcon iconName="app-operate-log"></AppIcon>
|
||||
</el-button>
|
||||
</el-tooltip>
|
||||
<div class="flex">
|
||||
<el-tooltip effect="dark" :content="$t('chat.executionDetails.title')" placement="top">
|
||||
<el-button type="primary" text @click.stop="toDetails(row)">
|
||||
<AppIcon iconName="app-operate-log"></AppIcon>
|
||||
</el-button>
|
||||
</el-tooltip>
|
||||
<el-tooltip
|
||||
effect="dark"
|
||||
:content="$t('chat.executionDetails.cancel', '取消')"
|
||||
placement="top"
|
||||
>
|
||||
<el-button type="primary" text @click.stop="cancel(row)">
|
||||
<el-icon><CircleCloseFilled /></el-icon>
|
||||
</el-button>
|
||||
</el-tooltip>
|
||||
</div>
|
||||
</template>
|
||||
</el-table-column>
|
||||
</app-table-infinite-scroll>
|
||||
|
|
@ -157,6 +176,11 @@ const toDetails = (row: any) => {
|
|||
ExecutionDetailDrawerRef.value?.open()
|
||||
}
|
||||
|
||||
const cancel = (row: any) => {
|
||||
loadSharedApi({ type: 'knowledge', systemType: apiType.value })
|
||||
.cancelWorkflowAction(active_knowledge_id.value, row.id, loading)
|
||||
.then((ok: any) => {})
|
||||
}
|
||||
const changeFilterHandle = () => {
|
||||
query.value = { user_name: '', status: '' }
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue