MaxKB/apps/knowledge/task/sync.py

64 lines
2.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: MaxKB
@Author
@file sync.py
@date2024/8/20 21:37
@desc:
"""
import logging
import traceback
from typing import List
from celery_once import QueueOnce
from common.utils.fork import ForkManage, Fork
from .tools import get_save_handler, get_sync_web_document_handler, get_sync_handler
from ops import celery_app
from django.utils.translation import gettext_lazy as _
max_kb_error = logging.getLogger("max_kb_error")
max_kb = logging.getLogger("max_kb")
@celery_app.task(base=QueueOnce, once={'keys': ['knowledge_id']}, name='celery:sync_web_knowledge')
def sync_web_knowledge(knowledge_id: str, url: str, selector: str):
try:
max_kb.info(
_('Start--->Start synchronization web knowledge base:{knowledge_id}').format(knowledge_id=knowledge_id))
ForkManage(url, selector.split(" ") if selector is not None else []).fork(2, set(),
get_save_handler(knowledge_id,
selector))
max_kb.info(_('End--->End synchronization web knowledge base:{knowledge_id}').format(knowledge_id=knowledge_id))
except Exception as e:
max_kb_error.error(_('Synchronize web knowledge base:{knowledge_id} error{error}{traceback}').format(
knowledge_id=knowledge_id, error=str(e), traceback=traceback.format_exc()))
@celery_app.task(base=QueueOnce, once={'keys': ['knowledge_id']}, name='celery:sync_replace_web_knowledge')
def sync_replace_web_knowledge(knowledge_id: str, url: str, selector: str):
try:
max_kb.info(
_('Start--->Start synchronization web knowledge base:{knowledge_id}').format(knowledge_id=knowledge_id))
ForkManage(url, selector.split(" ") if selector is not None else []).fork(2, set(),
get_sync_handler(knowledge_id
))
max_kb.info(_('End--->End synchronization web knowledge base:{knowledge_id}').format(knowledge_id=knowledge_id))
except Exception as e:
max_kb_error.error(_('Synchronize web knowledge base:{knowledge_id} error{error}{traceback}').format(
knowledge_id=knowledge_id, error=str(e), traceback=traceback.format_exc()))
@celery_app.task(name='celery:sync_web_document')
def sync_web_document(knowledge_id, source_url_list: List[str], selector: str):
handler = get_sync_web_document_handler(knowledge_id)
for source_url in source_url_list:
try:
result = Fork(base_fork_url=source_url, selector_list=selector.split(' ')).fork()
handler(source_url, selector, result)
except Exception as e:
pass