diff --git a/apps/common/event/common.py b/apps/common/event/common.py index 9f4a945bf..480748c31 100644 --- a/apps/common/event/common.py +++ b/apps/common/event/common.py @@ -8,10 +8,14 @@ """ from concurrent.futures import ThreadPoolExecutor +from django.core.cache.backends.locmem import LocMemCache + work_thread_pool = ThreadPoolExecutor(5) embedding_thread_pool = ThreadPoolExecutor(3) +memory_cache = LocMemCache('task', {"OPTIONS": {"MAX_ENTRIES": 1000}}) + def poxy(poxy_function): def inner(args, **keywords): @@ -20,8 +24,25 @@ def poxy(poxy_function): return inner +def get_cache_key(poxy_function, args): + return poxy_function.__name__ + str(args) + + +def get_cache_poxy_function(poxy_function, cache_key): + def fun(args, **keywords): + poxy_function(args, **keywords) + memory_cache.delete(cache_key) + + return fun + + def embedding_poxy(poxy_function): def inner(args, **keywords): - embedding_thread_pool.submit(poxy_function, args, **keywords) + key = get_cache_key(poxy_function, args) + if memory_cache.has_key(key): + return + memory_cache.add(key, None) + f = get_cache_poxy_function(poxy_function, key) + embedding_thread_pool.submit(f, args, **keywords) return inner diff --git a/apps/common/event/listener_manage.py b/apps/common/event/listener_manage.py index 0de80acee..c4c6b2298 100644 --- a/apps/common/event/listener_manage.py +++ b/apps/common/event/listener_manage.py @@ -189,7 +189,6 @@ class ListenerManagement: un_lock('embedding' + str(document_id)) @staticmethod - @embedding_poxy def embedding_by_dataset(dataset_id, embedding_model: Embeddings): """ 向量化知识库 diff --git a/apps/dataset/migrations/0008_alter_document_status_alter_paragraph_status.py b/apps/dataset/migrations/0008_alter_document_status_alter_paragraph_status.py new file mode 100644 index 000000000..3380d7b92 --- /dev/null +++ b/apps/dataset/migrations/0008_alter_document_status_alter_paragraph_status.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.14 on 2024-07-29 15:37 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dataset', '0007_alter_paragraph_content'), + ] + + operations = [ + migrations.AlterField( + model_name='document', + name='status', + field=models.CharField(choices=[('0', '导入中'), ('1', '已完成'), ('2', '导入失败'), ('3', '排队中')], default='3', max_length=1, verbose_name='状态'), + ), + migrations.AlterField( + model_name='paragraph', + name='status', + field=models.CharField(choices=[('0', '导入中'), ('1', '已完成'), ('2', '导入失败'), ('3', '排队中')], default='0', max_length=1, verbose_name='状态'), + ), + ] diff --git a/apps/dataset/models/data_set.py b/apps/dataset/models/data_set.py index d566740dd..eaf2a0779 100644 --- a/apps/dataset/models/data_set.py +++ b/apps/dataset/models/data_set.py @@ -22,6 +22,7 @@ class Status(models.TextChoices): embedding = 0, '导入中' success = 1, '已完成' error = 2, '导入失败' + queue_up = 3, '排队中' class Type(models.TextChoices): @@ -66,7 +67,7 @@ class Document(AppModelMixin): name = models.CharField(max_length=150, verbose_name="文档名称") char_length = models.IntegerField(verbose_name="文档字符数 冗余字段") status = models.CharField(verbose_name='状态', max_length=1, choices=Status.choices, - default=Status.embedding) + default=Status.queue_up) is_active = models.BooleanField(default=True) type = models.CharField(verbose_name='类型', max_length=1, choices=Type.choices, diff --git a/apps/dataset/serializers/dataset_serializers.py b/apps/dataset/serializers/dataset_serializers.py index 946398482..7fdd1b4c8 100644 --- a/apps/dataset/serializers/dataset_serializers.py +++ b/apps/dataset/serializers/dataset_serializers.py @@ -35,7 +35,7 @@ from common.util.field_message import ErrMessage from common.util.file_util import get_file_content from common.util.fork import ChildLink, Fork from common.util.split_model import get_split_model -from dataset.models.data_set import DataSet, Document, Paragraph, Problem, Type, ProblemParagraphMapping +from dataset.models.data_set import DataSet, Document, Paragraph, Problem, Type, ProblemParagraphMapping, Status from dataset.serializers.common_serializers import list_paragraph, MetaSerializer, ProblemParagraphManage, \ get_embedding_model_by_dataset_id from dataset.serializers.document_serializers import DocumentSerializers, DocumentInstanceSerializer @@ -746,6 +746,8 @@ class DataSetSerializers(serializers.ModelSerializer): if with_valid: self.is_valid(raise_exception=True) model = get_embedding_model_by_dataset_id(self.data.get('id')) + QuerySet(Document).filter(dataset_id=self.data.get('id')).update(**{'status': Status.queue_up}) + QuerySet(Paragraph).filter(dataset_id=self.data.get('id')).update(**{'status': Status.queue_up}) ListenerManagement.embedding_by_dataset_signal.send(self.data.get('id'), embedding_model=model) def list_application(self, with_valid=True): diff --git a/apps/dataset/serializers/document_serializers.py b/apps/dataset/serializers/document_serializers.py index 0977db7e1..654a31c75 100644 --- a/apps/dataset/serializers/document_serializers.py +++ b/apps/dataset/serializers/document_serializers.py @@ -539,6 +539,8 @@ class DocumentSerializers(ApiMixin, serializers.Serializer): self.is_valid(raise_exception=True) document_id = self.data.get("document_id") model = get_embedding_model_by_dataset_id(dataset_id=self.data.get('dataset_id')) + QuerySet(Document).filter(id=document_id).update(**{'status': Status.queue_up}) + QuerySet(Paragraph).filter(document_id=document_id).update(**{'status': Status.queue_up}) ListenerManagement.embedding_by_document_signal.send(document_id, embedding_model=model) @transaction.atomic diff --git a/ui/src/views/document/index.vue b/ui/src/views/document/index.vue index ae2dc371b..f69f4c0a1 100644 --- a/ui/src/views/document/index.vue +++ b/ui/src/views/document/index.vue @@ -82,7 +82,10 @@ 失败 - 导入中 + 索引中 + + + 排队中