From 3c7142ed7c5c12df9c15c1faf299c563201250e9 Mon Sep 17 00:00:00 2001
From: shaohuzhang1 <80892890+shaohuzhang1@users.noreply.github.com>
Date: Mon, 29 Jul 2024 15:51:33 +0800
Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=87=E6=A1=A3=E6=B7=BB=E5=8A=A0?=
=?UTF-8?q?=E6=8E=92=E9=98=9F=E4=B8=AD=E7=8A=B6=E6=80=81=20(#886)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
apps/common/event/common.py | 23 ++++++++++++++++++-
apps/common/event/listener_manage.py | 1 -
..._document_status_alter_paragraph_status.py | 23 +++++++++++++++++++
apps/dataset/models/data_set.py | 3 ++-
.../serializers/dataset_serializers.py | 4 +++-
.../serializers/document_serializers.py | 2 ++
ui/src/views/document/index.vue | 5 +++-
7 files changed, 56 insertions(+), 5 deletions(-)
create mode 100644 apps/dataset/migrations/0008_alter_document_status_alter_paragraph_status.py
diff --git a/apps/common/event/common.py b/apps/common/event/common.py
index 9f4a945bf..480748c31 100644
--- a/apps/common/event/common.py
+++ b/apps/common/event/common.py
@@ -8,10 +8,14 @@
"""
from concurrent.futures import ThreadPoolExecutor
+from django.core.cache.backends.locmem import LocMemCache
+
work_thread_pool = ThreadPoolExecutor(5)
embedding_thread_pool = ThreadPoolExecutor(3)
+memory_cache = LocMemCache('task', {"OPTIONS": {"MAX_ENTRIES": 1000}})
+
def poxy(poxy_function):
def inner(args, **keywords):
@@ -20,8 +24,25 @@ def poxy(poxy_function):
return inner
+def get_cache_key(poxy_function, args):
+ return poxy_function.__name__ + str(args)
+
+
+def get_cache_poxy_function(poxy_function, cache_key):
+ def fun(args, **keywords):
+ poxy_function(args, **keywords)
+ memory_cache.delete(cache_key)
+
+ return fun
+
+
def embedding_poxy(poxy_function):
def inner(args, **keywords):
- embedding_thread_pool.submit(poxy_function, args, **keywords)
+ key = get_cache_key(poxy_function, args)
+ if memory_cache.has_key(key):
+ return
+ memory_cache.add(key, None)
+ f = get_cache_poxy_function(poxy_function, key)
+ embedding_thread_pool.submit(f, args, **keywords)
return inner
diff --git a/apps/common/event/listener_manage.py b/apps/common/event/listener_manage.py
index 0de80acee..c4c6b2298 100644
--- a/apps/common/event/listener_manage.py
+++ b/apps/common/event/listener_manage.py
@@ -189,7 +189,6 @@ class ListenerManagement:
un_lock('embedding' + str(document_id))
@staticmethod
- @embedding_poxy
def embedding_by_dataset(dataset_id, embedding_model: Embeddings):
"""
向量化知识库
diff --git a/apps/dataset/migrations/0008_alter_document_status_alter_paragraph_status.py b/apps/dataset/migrations/0008_alter_document_status_alter_paragraph_status.py
new file mode 100644
index 000000000..3380d7b92
--- /dev/null
+++ b/apps/dataset/migrations/0008_alter_document_status_alter_paragraph_status.py
@@ -0,0 +1,23 @@
+# Generated by Django 4.2.14 on 2024-07-29 15:37
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('dataset', '0007_alter_paragraph_content'),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name='document',
+ name='status',
+ field=models.CharField(choices=[('0', '导入中'), ('1', '已完成'), ('2', '导入失败'), ('3', '排队中')], default='3', max_length=1, verbose_name='状态'),
+ ),
+ migrations.AlterField(
+ model_name='paragraph',
+ name='status',
+ field=models.CharField(choices=[('0', '导入中'), ('1', '已完成'), ('2', '导入失败'), ('3', '排队中')], default='0', max_length=1, verbose_name='状态'),
+ ),
+ ]
diff --git a/apps/dataset/models/data_set.py b/apps/dataset/models/data_set.py
index d566740dd..eaf2a0779 100644
--- a/apps/dataset/models/data_set.py
+++ b/apps/dataset/models/data_set.py
@@ -22,6 +22,7 @@ class Status(models.TextChoices):
embedding = 0, '导入中'
success = 1, '已完成'
error = 2, '导入失败'
+ queue_up = 3, '排队中'
class Type(models.TextChoices):
@@ -66,7 +67,7 @@ class Document(AppModelMixin):
name = models.CharField(max_length=150, verbose_name="文档名称")
char_length = models.IntegerField(verbose_name="文档字符数 冗余字段")
status = models.CharField(verbose_name='状态', max_length=1, choices=Status.choices,
- default=Status.embedding)
+ default=Status.queue_up)
is_active = models.BooleanField(default=True)
type = models.CharField(verbose_name='类型', max_length=1, choices=Type.choices,
diff --git a/apps/dataset/serializers/dataset_serializers.py b/apps/dataset/serializers/dataset_serializers.py
index 946398482..7fdd1b4c8 100644
--- a/apps/dataset/serializers/dataset_serializers.py
+++ b/apps/dataset/serializers/dataset_serializers.py
@@ -35,7 +35,7 @@ from common.util.field_message import ErrMessage
from common.util.file_util import get_file_content
from common.util.fork import ChildLink, Fork
from common.util.split_model import get_split_model
-from dataset.models.data_set import DataSet, Document, Paragraph, Problem, Type, ProblemParagraphMapping
+from dataset.models.data_set import DataSet, Document, Paragraph, Problem, Type, ProblemParagraphMapping, Status
from dataset.serializers.common_serializers import list_paragraph, MetaSerializer, ProblemParagraphManage, \
get_embedding_model_by_dataset_id
from dataset.serializers.document_serializers import DocumentSerializers, DocumentInstanceSerializer
@@ -746,6 +746,8 @@ class DataSetSerializers(serializers.ModelSerializer):
if with_valid:
self.is_valid(raise_exception=True)
model = get_embedding_model_by_dataset_id(self.data.get('id'))
+ QuerySet(Document).filter(dataset_id=self.data.get('id')).update(**{'status': Status.queue_up})
+ QuerySet(Paragraph).filter(dataset_id=self.data.get('id')).update(**{'status': Status.queue_up})
ListenerManagement.embedding_by_dataset_signal.send(self.data.get('id'), embedding_model=model)
def list_application(self, with_valid=True):
diff --git a/apps/dataset/serializers/document_serializers.py b/apps/dataset/serializers/document_serializers.py
index 0977db7e1..654a31c75 100644
--- a/apps/dataset/serializers/document_serializers.py
+++ b/apps/dataset/serializers/document_serializers.py
@@ -539,6 +539,8 @@ class DocumentSerializers(ApiMixin, serializers.Serializer):
self.is_valid(raise_exception=True)
document_id = self.data.get("document_id")
model = get_embedding_model_by_dataset_id(dataset_id=self.data.get('dataset_id'))
+ QuerySet(Document).filter(id=document_id).update(**{'status': Status.queue_up})
+ QuerySet(Paragraph).filter(document_id=document_id).update(**{'status': Status.queue_up})
ListenerManagement.embedding_by_document_signal.send(document_id, embedding_model=model)
@transaction.atomic
diff --git a/ui/src/views/document/index.vue b/ui/src/views/document/index.vue
index ae2dc371b..f69f4c0a1 100644
--- a/ui/src/views/document/index.vue
+++ b/ui/src/views/document/index.vue
@@ -82,7 +82,10 @@
失败
- 导入中
+ 索引中
+
+
+ 排队中