feat: Knowledge write

This commit is contained in:
zhangzhanwei 2025-11-20 15:04:36 +08:00 committed by zhanweizhang7
parent ec72140b51
commit 9dc3f21406
7 changed files with 279 additions and 36 deletions

View File

@ -54,7 +54,7 @@ class IChatNode(INode):
return ChatNodeSerializer
def _run(self):
if [WorkflowMode.KNOWLEDGE, WorkflowMode.APPLICATION_LOOP].__contains__(
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
self.workflow_manage.flow.workflow_mode):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data,
**{'history_chat_record': [], 'stream': True, 'chat_id': None, 'chat_record_id': None})

View File

@ -22,7 +22,7 @@ class IDataSourceWebNode(INode):
pass
def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
return self.execute(**self.flow_params_serializer.data)
def execute(self, **kwargs) -> NodeResult:
pass

View File

@ -6,15 +6,39 @@
@date2025/11/12 13:47
@desc:
"""
import traceback
from django.utils.translation import gettext_lazy as _
from application.flow.i_step_node import NodeResult
from application.flow.step_node.data_source_web_node.i_data_source_web_node import IDataSourceWebNode
from common import forms
from common.forms import BaseForm
from common.utils.fork import ForkManage, Fork, ChildLink
from common.utils.logger import maxkb_logger
class BaseDataSourceWebNodeForm(BaseForm):
source_url = forms.TextInputField('source url', required=True)
selector = forms.TextInputField('knowledge selector', required=True)
selector = forms.TextInputField('knowledge selector', required=False,default_value="body")
def get_collect_handler():
results = []
def handler(child_link: ChildLink, response: Fork.Response):
if response.status == 200:
try:
document_name = child_link.tag.text if child_link.tag is not None and len(
child_link.tag.text.strip()) > 0 else child_link.url
results.append({
"name": document_name.strip(),
"content": response.content,
})
except Exception as e:
maxkb_logger.error(f'{str(e)}:{traceback.format_exc()}')
return handler,results
class BaseDataSourceWebNode(IDataSourceWebNode):
@ -26,4 +50,37 @@ class BaseDataSourceWebNode(IDataSourceWebNode):
return BaseDataSourceWebNodeForm().to_form_list()
def execute(self, **kwargs) -> NodeResult:
pass
BaseDataSourceWebNodeForm().valid_form(self.workflow_params.get("data_source"))
data_source = self.workflow_params.get("data_source")
node_id = data_source.get("node_id")
source_url = data_source.get("source_url")
selector = data_source.get("selector") or "body"
collect_handler, document_list = get_collect_handler()
try:
ForkManage(source_url,selector.split(" ") if selector is not None else []).fork(1,set(),collect_handler)
return NodeResult({'document_list': document_list},
self.workflow_manage.params.get('knowledge_base') or {})
except Exception as e:
maxkb_logger.error(_('data source web node:{node_id} error{error}{traceback}').format(
knowledge_id=node_id, error=str(e), traceback=traceback.format_exc()))
def get_details(self, index: int, **kwargs):
return {
'name': self.node.properties.get('stepName'),
"index": index,
'run_time': self.context.get('run_time'),
'type': self.node.type,
'input_params': {"source_url": self.context.get("source_url"),"selector": self.context.get('selector')},
'output_params': self.context.get('document_list'),
'knowledge_base': self.workflow_params.get('knowledge_base'),
'status': self.status,
'err_message': self.err_message
}

View File

@ -46,7 +46,12 @@ class IReplyNode(INode):
return ReplyNodeParamsSerializer
def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
self.workflow_manage.flow.workflow_mode):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data,
**{'stream': True})
else:
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
def execute(self, reply_type, stream, fields=None, content=None, **kwargs) -> NodeResult:
pass

View File

@ -16,19 +16,27 @@ from application.flow.i_step_node import INode, NodeResult
class KnowledgeWriteNodeParamSerializer(serializers.Serializer):
paragraph_list = serializers.ListField(required=True, label=_("Paragraph list"))
chunk_length = serializers.CharField(required=True, label=_("Child chunk length"))
document_list = serializers.ListField(required=True, child=serializers.CharField(required=True), allow_null=True,label=_('document list'))
class IKnowledgeWriteNode(INode):
def save_context(self, details, workflow_manage):
pass
def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
return KnowledgeWriteNodeParamSerializer
def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
documents = self.workflow_manage.get_reference_field(
self.node_params_serializer.data.get('document_list')[0],
self.node_params_serializer.data.get('document_list')[1:],
)
def execute(self, paragraph_list, chunk_length, **kwargs) -> NodeResult:
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data, documents=documents)
def execute(self, documents, **kwargs) -> NodeResult:
pass
type = 'knowledge-write-node'

View File

@ -6,8 +6,131 @@
@date2025/11/13 11:19
@desc:
"""
from functools import reduce
from typing import Dict, List
import uuid_utils.compat as uuid
from django.db.models import QuerySet
from django.db.models.aggregates import Max
from rest_framework import serializers
from django.utils.translation import gettext_lazy as _
from application.flow.i_step_node import NodeResult
from application.flow.step_node.knowledge_write_node.i_knowledge_write_node import IKnowledgeWriteNode
from common.utils.common import bulk_create_in_batches
from knowledge.models import Document, KnowledgeType, Paragraph, File, FileSourceType, Problem, ProblemParagraphMapping
from knowledge.serializers.common import ProblemParagraphObject, ProblemParagraphManage
class ParagraphInstanceSerializer(serializers.Serializer):
content = serializers.CharField(required=True, label=_('content'), max_length=102400, min_length=1, allow_null=True,
allow_blank=True)
title = serializers.CharField(required=False, max_length=256, label=_('section title'), allow_null=True,
allow_blank=True)
problem_list = serializers.ListField(required=False, child=serializers.CharField(required=True))
is_active = serializers.BooleanField(required=False, label=_('Is active'))
chunks = serializers.ListField(required=False, child=serializers.CharField(required=True))
class KnowledgeWriteParamSerializer(serializers.Serializer):
name = serializers.CharField(required=True, label=_('document name'), max_length=128, min_length=1,
source=_('document name'))
paragraphs = ParagraphInstanceSerializer(required=False, many=True, allow_null=True)
def convert_uuid_to_str(obj):
if isinstance(obj, dict):
return {k: convert_uuid_to_str(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_uuid_to_str(i) for i in obj]
elif isinstance(obj, uuid.UUID):
return str(obj)
else:
return obj
def link_file(source_file_id, document_id):
if source_file_id is None:
return
source_file = QuerySet(File).filter(id=source_file_id).first()
if source_file:
file_content = source_file.get_bytes()
new_file = File(
id=uuid.uuid7(),
file_name=source_file.file_name,
file_size=source_file.file_size,
source_type=FileSourceType.DOCUMENT,
source_id=document_id, # 更新为当前知识库ID
meta=source_file.meta.copy() if source_file.meta else {}
)
# 保存文件内容和元数据
new_file.save(file_content)
def get_paragraph_problem_model(knowledge_id: str, document_id: str, instance: Dict):
paragraph = Paragraph(
id=uuid.uuid7(),
document_id=document_id,
content=instance.get("content"),
knowledge_id=knowledge_id,
title=instance.get("title") if 'title' in instance else ''
)
problem_paragraph_object_list = [ProblemParagraphObject(
knowledge_id, document_id, str(paragraph.id), problem
) for problem in (instance.get('problem_list') if 'problem_list' in instance else [])]
return {
'paragraph': paragraph,
'problem_paragraph_object_list': problem_paragraph_object_list,
}
def get_paragraph_model(document_model, paragraph_list: List):
knowledge_id = document_model.knowledge_id
paragraph_model_dict_list = [
get_paragraph_problem_model(knowledge_id,document_model.id,paragraph)
for paragraph in paragraph_list
]
paragraph_model_list = []
problem_paragraph_object_list = []
for paragraphs in paragraph_model_dict_list:
paragraph = paragraphs.get('paragraph')
for problem_model in paragraphs.get('problem_paragraph_object_list'):
problem_paragraph_object_list.append(problem_model)
paragraph_model_list.append(paragraph)
return {
'document': document_model,
'paragraph_model_list': paragraph_model_list,
'problem_paragraph_object_list': problem_paragraph_object_list,
}
def get_document_paragraph_model(knowledge_id: str, instance: Dict):
source_meta = {'source_file_id': instance.get("source_file_id")} if instance.get("source_file_id") else {}
meta = {**instance.get('meta'), **source_meta} if instance.get('meta') is not None else source_meta
meta = {**convert_uuid_to_str(meta), 'allow_download': True}
document_model = Document(
**{
'knowledge_id': knowledge_id,
'id': uuid.uuid7(),
'name': instance.get('name'),
'char_length': reduce(
lambda x, y: x + y,
[len(p.get('content')) for p in instance.get('paragraphs', [])],
0),
'meta': meta,
'type': instance.get('type') if instance.get('type') is not None else KnowledgeType.WORKFLOW
}
)
return get_paragraph_model(
document_model,
instance.get('paragraphs') if 'paragraphs' in instance else []
)
class BaseKnowledgeWriteNode(IKnowledgeWriteNode):
@ -15,5 +138,77 @@ class BaseKnowledgeWriteNode(IKnowledgeWriteNode):
def save_context(self, details, workflow_manage):
pass
def execute(self, paragraph_list, chunk_length, **kwargs) -> NodeResult:
pass
def save(self, document_list):
serializer = KnowledgeWriteParamSerializer(data=document_list, many=True)
serializer.is_valid(raise_exception=True)
document_list = serializer.data
knowledge_id = self.workflow_params.get("knowledge_id")
workspace_id = "default"
document_model_list = []
paragraph_model_list = []
problem_paragraph_object_list = []
for document in document_list:
document_paragraph_dict_model = get_document_paragraph_model(
knowledge_id,
document
)
document_instance = document_paragraph_dict_model.get('document')
link_file(document.get("source_file_id"), document_instance.id)
document_model_list.append(document_instance)
for paragraph in document_paragraph_dict_model.get("paragraph_model_list"):
paragraph_model_list.append(paragraph)
for problem_paragraph_object in document_paragraph_dict_model.get("problem_paragraph_object_list"):
problem_paragraph_object_list.append(problem_paragraph_object)
problem_model_list, problem_paragraph_mapping_list = (
ProblemParagraphManage(problem_paragraph_object_list, knowledge_id).to_problem_model_list()
)
QuerySet(Document).bulk_create(document_model_list) if len(document_model_list) > 0 else None
if len(paragraph_model_list) > 0:
for document in document_model_list:
max_position = Paragraph.objects.filter(document_id=document.id).aggregate(
max_position=Max('position')
)['max_position'] or 0
sub_list = [p for p in paragraph_model_list if p.document_id == document.id]
for i, paragraph in enumerate(sub_list):
paragraph.position = max_position + i + 1
QuerySet(Paragraph).bulk_create(sub_list if len(sub_list) > 0 else [])
bulk_create_in_batches(Problem, problem_model_list, batch_size=1000)
bulk_create_in_batches(ProblemParagraphMapping, problem_paragraph_mapping_list, batch_size=1000)
return document_model_list, knowledge_id, workspace_id
def execute(self, documents, **kwargs) -> NodeResult:
document_model_list, knowledge_id, workspace_id = self.save(documents)
write_content_list = [{
"name": document.get("name"),
"paragraphs": [{
"title": p.get("title"),
"content": p.get("content"),
} for p in document.get("paragraphs")[0:4]]
} for document in documents]
return NodeResult({'write_content':write_content_list},{})
def get_details(self, index: int, **kwargs):
return {
'name': self.node.properties.get('stepName'),
"index": index,
'run_time': self.context.get('run_time'),
'type': self.node.type,
'write_content': self.context.get("write_content"),
'status': self.status,
'err_message': self.err_message
}

View File

@ -12,7 +12,7 @@
hide-required-asterisk
>
<el-form-item
prop="paragraph_list"
prop="document_list"
:label="$t('common.inputContent')"
:rules="{
message: $t('views.applicationWorkflow.nodes.textToSpeechNode.content.label'),
@ -35,30 +35,9 @@
:nodeModel="nodeModel"
class="w-full"
:placeholder="$t('views.applicationWorkflow.nodes.textToSpeechNode.content.label')"
v-model="form_data.paragraph_list"
v-model="form_data.document_list"
/>
</el-form-item>
<el-form-item
prop="chunk_length"
:label="$t('views.applicationWorkflow.nodes.knowledgeWriteNode.chunk_length')"
:rules="{
message: $t('views.applicationWorkflow.nodes.knowledgeWriteNode.chunk_length'),
trigger: 'change',
required: true,
}"
>
<template #label>
<div class="flex-between">
<div>
<span
>{{ $t('views.applicationWorkflow.nodes.knowledgeWriteNode.chunk_length')
}}<span class="color-danger">*</span></span
>
</div>
</div>
</template>
<el-slider v-model="form_data.chunk_length" show-input :max="8192"></el-slider>
</el-form-item>
</el-form>
</el-card>
</NodeContainer>
@ -73,8 +52,7 @@ import NodeCascader from '@/workflow/common/NodeCascader.vue'
const props = defineProps<{ nodeModel: any }>()
const form = {
paragraph_list: [],
chunk_length: 4096
document_list: [],
}
const form_data = computed({