fix: 修复文档状态部分问题 (#1699)
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
Typos Check / Spell Check with Typos (push) Waiting to run

This commit is contained in:
shaohuzhang1 2024-11-26 19:40:26 +08:00 committed by GitHub
parent a37a6184b4
commit da7e9b1460
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 98 additions and 27 deletions

View File

@ -181,7 +181,8 @@ class ListenerManagement:
def aggregation_document_status():
sql = get_file_content(
os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql'))
native_update({'document_custom_sql': QuerySet(Document).filter(dataset_id=dataset_id)}, sql)
native_update({'document_custom_sql': QuerySet(Document).filter(dataset_id=dataset_id)}, sql,
with_table_name=True)
return aggregation_document_status
@ -190,7 +191,7 @@ class ListenerManagement:
def aggregation_document_status():
sql = get_file_content(
os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql'))
native_update({'document_custom_sql': queryset}, sql)
native_update({'document_custom_sql': queryset}, sql, with_table_name=True)
return aggregation_document_status
@ -249,19 +250,23 @@ class ListenerManagement:
"""
if not try_lock('embedding' + str(document_id)):
return
max_kb.info(f"开始--->向量化文档:{document_id}")
# 批量修改状态为PADDING
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, State.STARTED)
try:
# 删除文档向量数据
VectorStore.get_embedding_vector().delete_by_document_id(document_id)
def is_the_task_interrupted():
document = QuerySet(Document).filter(id=document_id).first()
if document is None or Status(document.status)[TaskType.EMBEDDING] == State.REVOKE:
return True
return False
if is_the_task_interrupted():
return
max_kb.info(f"开始--->向量化文档:{document_id}")
# 批量修改状态为PADDING
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING,
State.STARTED)
# 删除文档向量数据
VectorStore.get_embedding_vector().delete_by_document_id(document_id)
# 根据段落进行向量化处理
page(QuerySet(Paragraph).filter(document_id=document_id).values('id'), 5,
ListenerManagement.get_embedding_paragraph_apply(embedding_model, is_the_task_interrupted,

View File

@ -7,6 +7,11 @@ import dataset
from common.event import ListenerManagement
from dataset.models import State, TaskType
sql = """
UPDATE "document"
SET status ="replace"(status, '1', '3')
"""
def updateDocumentStatus(apps, schema_editor):
ParagraphModel = apps.get_model('dataset', 'Paragraph')
@ -43,5 +48,6 @@ class Migration(migrations.Migration):
name='status',
field=models.CharField(default=dataset.models.data_set.Status.__str__, max_length=20, verbose_name='状态'),
),
migrations.RunSQL(sql),
migrations.RunPython(updateDocumentStatus)
]

View File

@ -297,6 +297,9 @@ class DocumentSerializers(ApiMixin, serializers.Serializer):
ListenerManagement.update_status(QuerySet(Document).filter(id__in=document_id_list),
TaskType.EMBEDDING,
State.PENDING)
ListenerManagement.update_status(QuerySet(Paragraph).filter(document_id__in=document_id_list),
TaskType.EMBEDDING,
State.PENDING)
embedding_by_document_list.delay(document_id_list, model_id)
else:
update_embedding_dataset_id(pid_list, target_dataset_id)

View File

@ -51,21 +51,28 @@ def get_generate_problem(llm_model, prompt, post_apply=lambda: None, is_the_task
return generate_problem
def get_is_the_task_interrupted(document_id):
def is_the_task_interrupted():
document = QuerySet(Document).filter(id=document_id).first()
if document is None or Status(document.status)[TaskType.GENERATE_PROBLEM] == State.REVOKE:
return True
return False
return is_the_task_interrupted
@celery_app.task(base=QueueOnce, once={'keys': ['document_id']},
name='celery:generate_related_by_document')
def generate_related_by_document_id(document_id, model_id, prompt):
try:
is_the_task_interrupted = get_is_the_task_interrupted(document_id)
if is_the_task_interrupted():
return
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id),
TaskType.GENERATE_PROBLEM,
State.STARTED)
llm_model = get_llm_model(model_id)
def is_the_task_interrupted():
document = QuerySet(Document).filter(id=document_id).first()
if document is None or Status(document.status)[TaskType.GENERATE_PROBLEM] == State.REVOKE:
return True
return False
# 生成问题函数
generate_problem = get_generate_problem(llm_model, prompt,
ListenerManagement.get_aggregation_document_status(
@ -82,6 +89,12 @@ def generate_related_by_document_id(document_id, model_id, prompt):
name='celery:generate_related_by_paragraph_list')
def generate_related_by_paragraph_id_list(document_id, paragraph_id_list, model_id, prompt):
try:
is_the_task_interrupted = get_is_the_task_interrupted(document_id)
if is_the_task_interrupted():
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id),
TaskType.GENERATE_PROBLEM,
State.REVOKED)
return
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id),
TaskType.GENERATE_PROBLEM,
State.STARTED)

View File

@ -102,6 +102,7 @@ def embedding_by_dataset(dataset_id, model_id):
max_kb.info(f"数据集文档:{[d.name for d in document_list]}")
for document in document_list:
try:
print(document.id, model_id)
embedding_by_document.delay(document.id, model_id)
except Exception as e:
pass

View File

@ -32,9 +32,11 @@ CELERY_WORKER_REDIRECT_STDOUTS = True
CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
CELERY_TASK_SOFT_TIME_LIMIT = 3600
CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True
CELERY_ACKS_LATE = True
celery_once_path = os.path.join(celery_data_dir, "celery_once")
CELERY_ONCE = {
'backend': 'celery_once.backends.File',
'settings': {'location': os.path.join(celery_data_dir, "celery_once")}
'settings': {'location': celery_once_path}
}
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_LOG_DIR = os.path.join(PROJECT_DIR, 'logs', 'celery')

View File

@ -24,13 +24,19 @@
</el-text>
</el-col>
<el-col :span="7">
完成
{{
Object.keys(status.aggs ? status.aggs : {})
.filter((k) => k == State.SUCCESS)
.map((k) => status.aggs[k])
.reduce((x: any, y: any) => x + y, 0)
}}/{{ Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0) }}
<span
:style="{ color: [State.FAILURE, State.REVOKED].includes(status.state) ? '#F54A45' : '' }"
>
完成
{{
Object.keys(status.aggs ? status.aggs : {})
.filter((k) => k == State.SUCCESS)
.map((k) => status.aggs[k])
.reduce((x: any, y: any) => x + y, 0)
}}/{{
Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0)
}}</span
>
</el-col>
<el-col :span="9">
{{

View File

@ -235,7 +235,25 @@
<template #default="{ row }">
<div v-if="datasetDetail.type === '0'">
<span class="mr-4">
<el-tooltip effect="dark" content="向量化" placement="top">
<el-tooltip
effect="dark"
v-if="
([State.STARTED, State.PENDING] as Array<string>).includes(
getTaskState(row.status, TaskType.EMBEDDING)
)
"
content="取消向量化"
placement="top"
>
<el-button
type="primary"
text
@click.stop="cancelTask(row, TaskType.EMBEDDING)"
>
<AppIcon iconName="app-close" style="font-size: 16px"></AppIcon>
</el-button>
</el-tooltip>
<el-tooltip v-else effect="dark" content="向量化" placement="top">
<el-button type="primary" text @click.stop="refreshDocument(row)">
<AppIcon iconName="app-document-refresh" style="font-size: 16px"></AppIcon>
</el-button>
@ -255,9 +273,20 @@
</el-button>
<template #dropdown>
<el-dropdown-menu>
<el-dropdown-item @click="openGenerateDialog(row)">
<el-dropdown-item
v-if="
([State.STARTED, State.PENDING] as Array<string>).includes(
getTaskState(row.status, TaskType.GENERATE_PROBLEM)
)
"
@click="cancelTask(row, TaskType.GENERATE_PROBLEM)"
>
<el-icon><Connection /></el-icon>
生成关联问题
取消生成问题
</el-dropdown-item>
<el-dropdown-item v-else @click="openGenerateDialog(row)">
<el-icon><Connection /></el-icon>
生成问题
</el-dropdown-item>
<el-dropdown-item @click="openDatasetDialog(row)">
<AppIcon iconName="app-migrate"></AppIcon>
@ -286,7 +315,11 @@
<span class="mr-4">
<el-tooltip
effect="dark"
v-if="getTaskState(row.status, TaskType.EMBEDDING) == State.STARTED"
v-if="
([State.STARTED, State.PENDING] as Array<string>).includes(
getTaskState(row.status, TaskType.EMBEDDING)
)
"
content="取消向量化"
placement="top"
>
@ -318,7 +351,9 @@
>
<el-dropdown-item
v-if="
getTaskState(row.status, TaskType.GENERATE_PROBLEM) == State.STARTED
([State.STARTED, State.PENDING] as Array<string>).includes(
getTaskState(row.status, TaskType.GENERATE_PROBLEM)
)
"
@click="cancelTask(row, TaskType.GENERATE_PROBLEM)"
>