mirror of
https://github.com/1Panel-dev/MaxKB.git
synced 2025-12-26 09:43:10 +00:00
76 lines
2.4 KiB
Python
76 lines
2.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
import logging
|
|
import os
|
|
|
|
from celery import subtask
|
|
from celery.signals import (
|
|
worker_ready, worker_shutdown, after_setup_logger, task_revoked, task_prerun
|
|
)
|
|
from django.core.cache import cache
|
|
from django_celery_beat.models import PeriodicTask
|
|
|
|
from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
|
|
from .logger import CeleryThreadTaskFileHandler
|
|
|
|
logger = logging.getLogger(__file__)
|
|
safe_str = lambda x: x
|
|
|
|
|
|
@worker_ready.connect
|
|
def on_app_ready(sender=None, headers=None, **kwargs):
|
|
if cache.get("CELERY_APP_READY", 0) == 1:
|
|
return
|
|
cache.set("CELERY_APP_READY", 1, 10)
|
|
tasks = get_after_app_ready_tasks()
|
|
logger.debug("Work ready signal recv")
|
|
logger.debug("Start need start task: [{}]".format(", ".join(tasks)))
|
|
for task in tasks:
|
|
periodic_task = PeriodicTask.objects.filter(task=task).first()
|
|
if periodic_task and not periodic_task.enabled:
|
|
logger.debug("Periodic task [{}] is disabled!".format(task))
|
|
continue
|
|
subtask(task).delay()
|
|
|
|
|
|
def delete_files(directory):
|
|
if os.path.isdir(directory):
|
|
for filename in os.listdir(directory):
|
|
file_path = os.path.join(directory, filename)
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
|
|
|
|
@worker_shutdown.connect
|
|
def after_app_shutdown_periodic_tasks(sender=None, **kwargs):
|
|
if cache.get("CELERY_APP_SHUTDOWN", 0) == 1:
|
|
return
|
|
cache.set("CELERY_APP_SHUTDOWN", 1, 10)
|
|
tasks = get_after_app_shutdown_clean_tasks()
|
|
logger.debug("Worker shutdown signal recv")
|
|
logger.debug("Clean period tasks: [{}]".format(', '.join(tasks)))
|
|
PeriodicTask.objects.filter(name__in=tasks).delete()
|
|
|
|
|
|
@after_setup_logger.connect
|
|
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
|
|
if not logger:
|
|
return
|
|
task_handler = CeleryThreadTaskFileHandler()
|
|
task_handler.setLevel(loglevel)
|
|
formatter = logging.Formatter(format)
|
|
task_handler.setFormatter(formatter)
|
|
logger.addHandler(task_handler)
|
|
|
|
|
|
@task_revoked.connect
|
|
def on_task_revoked(request, terminated, signum, expired, **kwargs):
|
|
print('task_revoked', terminated)
|
|
|
|
|
|
@task_prerun.connect
|
|
def on_taskaa_start(sender, task_id, **kwargs):
|
|
pass
|
|
# sender.update_state(state='REVOKED',
|
|
# meta={'exc_type': 'Exception', 'exc': 'Exception', 'message': '暂停任务', 'exc_message': ''})
|