feat: enhance init_params handling with encryption and decryption for tools and shared tools

This commit is contained in:
CaptainB 2025-06-04 19:15:33 +08:00
parent 64e26dd2cc
commit 40137f147b

View File

@ -20,6 +20,7 @@ from common.exception.app_exception import AppApiException
from common.field.common import UploadedImageField
from common.result import result
from common.utils.common import get_file_content
from common.utils.rsa_util import rsa_long_decrypt, rsa_long_encrypt
from common.utils.tool_code import ToolExecutor
from knowledge.models import File, FileSourceType
from maxkb.const import CONFIG, PROJECT_DIR
@ -289,17 +290,45 @@ class ToolSerializer(serializers.Serializer):
edit_dict = {field: instance.get(field) for field in edit_field_list if (
field in instance and instance.get(field) is not None)}
tool = QuerySet(Tool).filter(id=self.data.get('id')).first()
if 'init_params' in edit_dict:
if edit_dict['init_field_list'] is not None:
rm_key = []
for key in edit_dict['init_params']:
if key not in [field['field'] for field in edit_dict['init_field_list']]:
rm_key.append(key)
for key in rm_key:
edit_dict['init_params'].pop(key)
if tool.init_params:
old_init_params = json.loads(rsa_long_decrypt(tool.init_params))
for key in edit_dict['init_params']:
if key in old_init_params and edit_dict['init_params'][key] == encryption(old_init_params[key]):
edit_dict['init_params'][key] = old_init_params[key]
edit_dict['init_params'] = rsa_long_encrypt(json.dumps(edit_dict['init_params']))
QuerySet(Tool).filter(id=self.data.get('id')).update(**edit_dict)
return self.one()
def delete(self):
self.is_valid(raise_exception=True)
tool = QuerySet(Tool).filter(id=self.data.get('id')).first()
if tool.template_id is None and tool.icon != '/ui/favicon.ico':
QuerySet(File).filter(id=tool.icon.split('/')[-1]).delete()
QuerySet(Tool).filter(id=self.data.get('id')).delete()
def one(self):
self.is_valid(raise_exception=True)
tool = QuerySet(Tool).filter(id=self.data.get('id')).first()
if tool.init_params:
tool.init_params = json.loads(rsa_long_decrypt(tool.init_params))
if tool.init_field_list:
password_fields = [i["field"] for i in tool.init_field_list if
i.get("input_type") == "PasswordInput"]
if tool.init_params:
for k in tool.init_params:
if k in password_fields and tool.init_params[k]:
tool.init_params[k] = encryption(tool.init_params[k])
return ToolModelSerializer(tool).data
def export(self):