From da7e9b146017e6c6b85ee6810cc430ebc5025831 Mon Sep 17 00:00:00 2001 From: shaohuzhang1 <80892890+shaohuzhang1@users.noreply.github.com> Date: Tue, 26 Nov 2024 19:40:26 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E9=83=A8=E5=88=86=E9=97=AE=E9=A2=98=20(#1699?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/event/listener_manage.py | 21 +++++---- ...tus_meta_paragraph_status_meta_and_more.py | 6 +++ .../serializers/document_serializers.py | 3 ++ apps/dataset/task/generate.py | 25 ++++++++--- apps/embedding/task/embedding.py | 1 + apps/smartdoc/settings/lib.py | 4 +- .../views/document/component/StatusTable.vue | 20 ++++++--- ui/src/views/document/index.vue | 45 ++++++++++++++++--- 8 files changed, 98 insertions(+), 27 deletions(-) diff --git a/apps/common/event/listener_manage.py b/apps/common/event/listener_manage.py index 9c16ad5c2..dc57bfaaf 100644 --- a/apps/common/event/listener_manage.py +++ b/apps/common/event/listener_manage.py @@ -181,7 +181,8 @@ class ListenerManagement: def aggregation_document_status(): sql = get_file_content( os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql')) - native_update({'document_custom_sql': QuerySet(Document).filter(dataset_id=dataset_id)}, sql) + native_update({'document_custom_sql': QuerySet(Document).filter(dataset_id=dataset_id)}, sql, + with_table_name=True) return aggregation_document_status @@ -190,7 +191,7 @@ class ListenerManagement: def aggregation_document_status(): sql = get_file_content( os.path.join(PROJECT_DIR, "apps", "dataset", 'sql', 'update_document_status_meta.sql')) - native_update({'document_custom_sql': queryset}, sql) + native_update({'document_custom_sql': queryset}, sql, with_table_name=True) return aggregation_document_status @@ -249,19 +250,23 @@ class ListenerManagement: """ if not try_lock('embedding' + str(document_id)): return - max_kb.info(f"开始--->向量化文档:{document_id}") - # 批量修改状态为PADDING - ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, State.STARTED) try: - # 删除文档向量数据 - VectorStore.get_embedding_vector().delete_by_document_id(document_id) - def is_the_task_interrupted(): document = QuerySet(Document).filter(id=document_id).first() if document is None or Status(document.status)[TaskType.EMBEDDING] == State.REVOKE: return True return False + if is_the_task_interrupted(): + return + max_kb.info(f"开始--->向量化文档:{document_id}") + # 批量修改状态为PADDING + ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING, + State.STARTED) + + # 删除文档向量数据 + VectorStore.get_embedding_vector().delete_by_document_id(document_id) + # 根据段落进行向量化处理 page(QuerySet(Paragraph).filter(document_id=document_id).values('id'), 5, ListenerManagement.get_embedding_paragraph_apply(embedding_model, is_the_task_interrupted, diff --git a/apps/dataset/migrations/0011_document_status_meta_paragraph_status_meta_and_more.py b/apps/dataset/migrations/0011_document_status_meta_paragraph_status_meta_and_more.py index c64a4db20..e47bfd60c 100644 --- a/apps/dataset/migrations/0011_document_status_meta_paragraph_status_meta_and_more.py +++ b/apps/dataset/migrations/0011_document_status_meta_paragraph_status_meta_and_more.py @@ -7,6 +7,11 @@ import dataset from common.event import ListenerManagement from dataset.models import State, TaskType +sql = """ +UPDATE "document" +SET status ="replace"(status, '1', '3') +""" + def updateDocumentStatus(apps, schema_editor): ParagraphModel = apps.get_model('dataset', 'Paragraph') @@ -43,5 +48,6 @@ class Migration(migrations.Migration): name='status', field=models.CharField(default=dataset.models.data_set.Status.__str__, max_length=20, verbose_name='状态'), ), + migrations.RunSQL(sql), migrations.RunPython(updateDocumentStatus) ] diff --git a/apps/dataset/serializers/document_serializers.py b/apps/dataset/serializers/document_serializers.py index 70facd8db..45057d9bc 100644 --- a/apps/dataset/serializers/document_serializers.py +++ b/apps/dataset/serializers/document_serializers.py @@ -297,6 +297,9 @@ class DocumentSerializers(ApiMixin, serializers.Serializer): ListenerManagement.update_status(QuerySet(Document).filter(id__in=document_id_list), TaskType.EMBEDDING, State.PENDING) + ListenerManagement.update_status(QuerySet(Paragraph).filter(document_id__in=document_id_list), + TaskType.EMBEDDING, + State.PENDING) embedding_by_document_list.delay(document_id_list, model_id) else: update_embedding_dataset_id(pid_list, target_dataset_id) diff --git a/apps/dataset/task/generate.py b/apps/dataset/task/generate.py index e81039744..6a085c448 100644 --- a/apps/dataset/task/generate.py +++ b/apps/dataset/task/generate.py @@ -51,21 +51,28 @@ def get_generate_problem(llm_model, prompt, post_apply=lambda: None, is_the_task return generate_problem +def get_is_the_task_interrupted(document_id): + def is_the_task_interrupted(): + document = QuerySet(Document).filter(id=document_id).first() + if document is None or Status(document.status)[TaskType.GENERATE_PROBLEM] == State.REVOKE: + return True + return False + + return is_the_task_interrupted + + @celery_app.task(base=QueueOnce, once={'keys': ['document_id']}, name='celery:generate_related_by_document') def generate_related_by_document_id(document_id, model_id, prompt): try: + is_the_task_interrupted = get_is_the_task_interrupted(document_id) + if is_the_task_interrupted(): + return ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.GENERATE_PROBLEM, State.STARTED) llm_model = get_llm_model(model_id) - def is_the_task_interrupted(): - document = QuerySet(Document).filter(id=document_id).first() - if document is None or Status(document.status)[TaskType.GENERATE_PROBLEM] == State.REVOKE: - return True - return False - # 生成问题函数 generate_problem = get_generate_problem(llm_model, prompt, ListenerManagement.get_aggregation_document_status( @@ -82,6 +89,12 @@ def generate_related_by_document_id(document_id, model_id, prompt): name='celery:generate_related_by_paragraph_list') def generate_related_by_paragraph_id_list(document_id, paragraph_id_list, model_id, prompt): try: + is_the_task_interrupted = get_is_the_task_interrupted(document_id) + if is_the_task_interrupted(): + ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), + TaskType.GENERATE_PROBLEM, + State.REVOKED) + return ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.GENERATE_PROBLEM, State.STARTED) diff --git a/apps/embedding/task/embedding.py b/apps/embedding/task/embedding.py index b6d5dfb75..3e63c26b2 100644 --- a/apps/embedding/task/embedding.py +++ b/apps/embedding/task/embedding.py @@ -102,6 +102,7 @@ def embedding_by_dataset(dataset_id, model_id): max_kb.info(f"数据集文档:{[d.name for d in document_list]}") for document in document_list: try: + print(document.id, model_id) embedding_by_document.delay(document.id, model_id) except Exception as e: pass diff --git a/apps/smartdoc/settings/lib.py b/apps/smartdoc/settings/lib.py index e7b6d39dd..a4c1aaabb 100644 --- a/apps/smartdoc/settings/lib.py +++ b/apps/smartdoc/settings/lib.py @@ -32,9 +32,11 @@ CELERY_WORKER_REDIRECT_STDOUTS = True CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO" CELERY_TASK_SOFT_TIME_LIMIT = 3600 CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True +CELERY_ACKS_LATE = True +celery_once_path = os.path.join(celery_data_dir, "celery_once") CELERY_ONCE = { 'backend': 'celery_once.backends.File', - 'settings': {'location': os.path.join(celery_data_dir, "celery_once")} + 'settings': {'location': celery_once_path} } CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True CELERY_LOG_DIR = os.path.join(PROJECT_DIR, 'logs', 'celery') diff --git a/ui/src/views/document/component/StatusTable.vue b/ui/src/views/document/component/StatusTable.vue index 506f1bf98..a96e935bc 100644 --- a/ui/src/views/document/component/StatusTable.vue +++ b/ui/src/views/document/component/StatusTable.vue @@ -24,13 +24,19 @@ - 完成 - {{ - Object.keys(status.aggs ? status.aggs : {}) - .filter((k) => k == State.SUCCESS) - .map((k) => status.aggs[k]) - .reduce((x: any, y: any) => x + y, 0) - }}/{{ Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0) }} + + 完成 + {{ + Object.keys(status.aggs ? status.aggs : {}) + .filter((k) => k == State.SUCCESS) + .map((k) => status.aggs[k]) + .reduce((x: any, y: any) => x + y, 0) + }}/{{ + Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0) + }} {{ diff --git a/ui/src/views/document/index.vue b/ui/src/views/document/index.vue index 4777dd106..d0c342d91 100644 --- a/ui/src/views/document/index.vue +++ b/ui/src/views/document/index.vue @@ -235,7 +235,25 @@