diff --git a/apps/common/management/__init__.py b/apps/common/management/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/common/management/commands/__init__.py b/apps/common/management/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/common/management/commands/celery.py b/apps/common/management/commands/celery.py new file mode 100644 index 000000000..af7df0a99 --- /dev/null +++ b/apps/common/management/commands/celery.py @@ -0,0 +1,46 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎 + @file: celery.py + @date:2024/8/19 11:57 + @desc: +""" +import os +import subprocess + +from django.core.management.base import BaseCommand + +from maxkb.const import BASE_DIR + + +class Command(BaseCommand): + help = 'celery' + + def add_arguments(self, parser): + parser.add_argument( + 'service', nargs='+', type=str, choices=("celery", "model"), help='Service', + ) + + def handle(self, *args, **options): + service = options.get('service') + os.environ.setdefault('CELERY_NAME', ','.join(service)) + server_hostname = os.environ.get("SERVER_HOSTNAME") + if hasattr(os, 'getuid') and os.getuid() == 0: + os.environ.setdefault('C_FORCE_ROOT', '1') + if not server_hostname: + server_hostname = '%h' + cmd = [ + 'celery', + '-A', 'ops', + 'worker', + '-P', 'threads', + '-l', 'info', + '-c', '10', + '-Q', ','.join(service), + '--heartbeat-interval', '10', + '-n', f'{",".join(service)}@{server_hostname}', + '--without-mingle', + ] + kwargs = {'cwd': BASE_DIR} + subprocess.run(cmd, **kwargs) diff --git a/apps/common/management/commands/restart.py b/apps/common/management/commands/restart.py new file mode 100644 index 000000000..57285f9c9 --- /dev/null +++ b/apps/common/management/commands/restart.py @@ -0,0 +1,6 @@ +from .services.command import BaseActionCommand, Action + + +class Command(BaseActionCommand): + help = 'Restart services' + action = Action.restart.value diff --git a/apps/common/management/commands/services/__init__.py b/apps/common/management/commands/services/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/common/management/commands/services/command.py b/apps/common/management/commands/services/command.py new file mode 100644 index 000000000..0c97d4af3 --- /dev/null +++ b/apps/common/management/commands/services/command.py @@ -0,0 +1,134 @@ +import math + +from django.core.management.base import BaseCommand +from django.db.models import TextChoices + +from .hands import * +from .utils import ServicesUtil +import os + + +class Services(TextChoices): + gunicorn = 'gunicorn', 'gunicorn' + celery_default = 'celery_default', 'celery_default' + local_model = 'local_model', 'local_model' + web = 'web', 'web' + celery = 'celery', 'celery' + celery_model = 'celery_model', 'celery_model' + task = 'task', 'task' + all = 'all', 'all' + + @classmethod + def get_service_object_class(cls, name): + from . import services + services_map = { + cls.gunicorn.value: services.GunicornService, + cls.celery_default: services.CeleryDefaultService, + cls.local_model: services.GunicornLocalModelService + } + return services_map.get(name) + + @classmethod + def web_services(cls): + return [cls.gunicorn, cls.local_model] + + @classmethod + def celery_services(cls): + return [cls.celery_default, cls.celery_model] + + @classmethod + def task_services(cls): + return cls.celery_services() + + @classmethod + def all_services(cls): + return cls.web_services() + cls.task_services() + + @classmethod + def export_services_values(cls): + return [cls.all.value, cls.web.value, cls.task.value] + [s.value for s in cls.all_services()] + + @classmethod + def get_service_objects(cls, service_names, **kwargs): + services = set() + for name in service_names: + method_name = f'{name}_services' + if hasattr(cls, method_name): + _services = getattr(cls, method_name)() + elif hasattr(cls, name): + _services = [getattr(cls, name)] + else: + continue + services.update(set(_services)) + + service_objects = [] + for s in services: + service_class = cls.get_service_object_class(s.value) + if not service_class: + continue + kwargs.update({ + 'name': s.value + }) + service_object = service_class(**kwargs) + service_objects.append(service_object) + return service_objects + + +class Action(TextChoices): + start = 'start', 'start' + status = 'status', 'status' + stop = 'stop', 'stop' + restart = 'restart', 'restart' + + +class BaseActionCommand(BaseCommand): + help = 'Service Base Command' + + action = None + util = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def add_arguments(self, parser): + parser.add_argument( + 'services', nargs='+', choices=Services.export_services_values(), help='Service', + ) + parser.add_argument('-d', '--daemon', nargs="?", const=True) + parser.add_argument('-w', '--worker', type=int, nargs="?", + default=3 if os.cpu_count() > 6 else math.floor(os.cpu_count() / 2)) + parser.add_argument('-f', '--force', nargs="?", const=True) + + def initial_util(self, *args, **options): + service_names = options.get('services') + service_kwargs = { + 'worker_gunicorn': options.get('worker') + } + services = Services.get_service_objects(service_names=service_names, **service_kwargs) + + kwargs = { + 'services': services, + 'run_daemon': options.get('daemon', False), + 'stop_daemon': self.action == Action.stop.value and Services.all.value in service_names, + 'force_stop': options.get('force') or False, + } + self.util = ServicesUtil(**kwargs) + + def handle(self, *args, **options): + self.initial_util(*args, **options) + assert self.action in Action.values, f'The action {self.action} is not in the optional list' + _handle = getattr(self, f'_handle_{self.action}', lambda: None) + _handle() + + def _handle_start(self): + self.util.start_and_watch() + os._exit(0) + + def _handle_stop(self): + self.util.stop() + + def _handle_restart(self): + self.util.restart() + + def _handle_status(self): + self.util.show_status() diff --git a/apps/common/management/commands/services/hands.py b/apps/common/management/commands/services/hands.py new file mode 100644 index 000000000..3a693a242 --- /dev/null +++ b/apps/common/management/commands/services/hands.py @@ -0,0 +1,26 @@ +import logging +import os +import sys + +from maxkb.const import CONFIG, PROJECT_DIR + +try: + from apps.smartdoc import const + + __version__ = const.VERSION +except ImportError as e: + print("Not found __version__: {}".format(e)) + print("Python is: ") + logging.info(sys.executable) + __version__ = 'Unknown' + sys.exit(1) + +HTTP_HOST = '0.0.0.0' +HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080 +DEBUG = CONFIG.DEBUG or False + +LOG_DIR = os.path.join(PROJECT_DIR, 'data', 'logs') +APPS_DIR = os.path.join(PROJECT_DIR, 'apps') +TMP_DIR = os.path.join(PROJECT_DIR, 'tmp') +if not os.path.exists(TMP_DIR): + os.makedirs(TMP_DIR) diff --git a/apps/common/management/commands/services/services/__init__.py b/apps/common/management/commands/services/services/__init__.py new file mode 100644 index 000000000..102739206 --- /dev/null +++ b/apps/common/management/commands/services/services/__init__.py @@ -0,0 +1,3 @@ +from .celery_default import * +from .gunicorn import * +from .local_model import * \ No newline at end of file diff --git a/apps/common/management/commands/services/services/base.py b/apps/common/management/commands/services/services/base.py new file mode 100644 index 000000000..ddcb4feca --- /dev/null +++ b/apps/common/management/commands/services/services/base.py @@ -0,0 +1,207 @@ +import abc +import time +import shutil +import psutil +import datetime +import threading +import subprocess +from ..hands import * + + +class BaseService(object): + + def __init__(self, **kwargs): + self.name = kwargs['name'] + self._process = None + self.STOP_TIMEOUT = 10 + self.max_retry = 0 + self.retry = 3 + self.LOG_KEEP_DAYS = 7 + self.EXIT_EVENT = threading.Event() + + @property + @abc.abstractmethod + def cmd(self): + return [] + + @property + @abc.abstractmethod + def cwd(self): + return '' + + @property + def is_running(self): + if self.pid == 0: + return False + try: + os.kill(self.pid, 0) + except (OSError, ProcessLookupError): + return False + else: + return True + + def show_status(self): + if self.is_running: + msg = f'{self.name} is running: {self.pid}.' + else: + msg = f'{self.name} is stopped.' + if DEBUG: + msg = '\033[31m{} is stopped.\033[0m\nYou can manual start it to find the error: \n' \ + ' $ cd {}\n' \ + ' $ {}'.format(self.name, self.cwd, ' '.join(self.cmd)) + + print(msg) + + # -- log -- + @property + def log_filename(self): + return f'{self.name}.log' + + @property + def log_filepath(self): + return os.path.join(LOG_DIR, self.log_filename) + + @property + def log_file(self): + return open(self.log_filepath, 'a') + + @property + def log_dir(self): + return os.path.dirname(self.log_filepath) + # -- end log -- + + # -- pid -- + @property + def pid_filepath(self): + return os.path.join(TMP_DIR, f'{self.name}.pid') + + @property + def pid(self): + if not os.path.isfile(self.pid_filepath): + return 0 + with open(self.pid_filepath) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = 0 + return pid + + def write_pid(self): + with open(self.pid_filepath, 'w') as f: + f.write(str(self.process.pid)) + + def remove_pid(self): + if os.path.isfile(self.pid_filepath): + os.unlink(self.pid_filepath) + # -- end pid -- + + # -- process -- + @property + def process(self): + if not self._process: + try: + self._process = psutil.Process(self.pid) + except: + pass + return self._process + + # -- end process -- + + # -- action -- + def open_subprocess(self): + kwargs = {'cwd': self.cwd, 'stderr': self.log_file, 'stdout': self.log_file} + self._process = subprocess.Popen(self.cmd, **kwargs) + + def start(self): + if self.is_running: + self.show_status() + return + self.remove_pid() + self.open_subprocess() + self.write_pid() + self.start_other() + + def start_other(self): + pass + + def stop(self, force=False): + if not self.is_running: + self.show_status() + # self.remove_pid() + return + + print(f'Stop service: {self.name}', end='') + sig = 9 if force else 15 + os.kill(self.pid, sig) + + if self.process is None: + print("\033[31m No process found\033[0m") + return + try: + self.process.wait(1) + except: + pass + + for i in range(self.STOP_TIMEOUT): + if i == self.STOP_TIMEOUT - 1: + print("\033[31m Error\033[0m") + if not self.is_running: + print("\033[32m Ok\033[0m") + self.remove_pid() + break + else: + continue + + def watch(self): + self._check() + if not self.is_running: + self._restart() + self._rotate_log() + + def _check(self): + now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"{now} Check service status: {self.name} -> ", end='') + if self.process: + try: + self.process.wait(1) # 不wait,子进程可能无法回收 + except: + pass + + if self.is_running: + print(f'running at {self.pid}') + else: + print(f'stopped at {self.pid}') + + def _restart(self): + if self.retry > self.max_retry: + logging.info("Service start failed, exit: {}".format(self.name)) + self.EXIT_EVENT.set() + return + self.retry += 1 + logging.info(f'> Find {self.name} stopped, retry {self.retry}, {self.pid}') + self.start() + + def _rotate_log(self): + now = datetime.datetime.now() + _time = now.strftime('%H:%M') + if _time != '23:59': + return + + backup_date = now.strftime('%Y-%m-%d') + backup_log_dir = os.path.join(self.log_dir, backup_date) + if not os.path.exists(backup_log_dir): + os.mkdir(backup_log_dir) + + backup_log_path = os.path.join(backup_log_dir, self.log_filename) + if os.path.isfile(self.log_filepath) and not os.path.isfile(backup_log_path): + logging.info(f'Rotate log file: {self.log_filepath} => {backup_log_path}') + shutil.copy(self.log_filepath, backup_log_path) + with open(self.log_filepath, 'w') as f: + pass + + to_delete_date = now - datetime.timedelta(days=self.LOG_KEEP_DAYS) + to_delete_dir = os.path.join(LOG_DIR, to_delete_date.strftime('%Y-%m-%d')) + if os.path.exists(to_delete_dir): + logging.info(f'Remove old log: {to_delete_dir}') + shutil.rmtree(to_delete_dir, ignore_errors=True) + # -- end action -- diff --git a/apps/common/management/commands/services/services/celery_base.py b/apps/common/management/commands/services/services/celery_base.py new file mode 100644 index 000000000..0ae219bd5 --- /dev/null +++ b/apps/common/management/commands/services/services/celery_base.py @@ -0,0 +1,45 @@ +from django.conf import settings + +from .base import BaseService +from ..hands import * + + +class CeleryBaseService(BaseService): + + def __init__(self, queue, num=10, **kwargs): + super().__init__(**kwargs) + self.queue = queue + self.num = num + + @property + def cmd(self): + print('\n- Start Celery as Distributed Task Queue: {}'.format(self.queue.capitalize())) + + os.environ.setdefault('LC_ALL', 'C.UTF-8') + os.environ.setdefault('PYTHONOPTIMIZE', '1') + os.environ.setdefault('ANSIBLE_FORCE_COLOR', 'True') + os.environ.setdefault('PYTHONPATH', settings.APPS_DIR) + + if os.getuid() == 0: + os.environ.setdefault('C_FORCE_ROOT', '1') + server_hostname = os.environ.get("SERVER_HOSTNAME") + if not server_hostname: + server_hostname = '%h' + + cmd = [ + 'celery', + '-A', 'ops', + 'worker', + '-P', 'threads', + '-l', 'error', + '-c', str(self.num), + '-Q', self.queue, + '--heartbeat-interval', '10', + '-n', f'{self.queue}@{server_hostname}', + '--without-mingle', + ] + return cmd + + @property + def cwd(self): + return APPS_DIR diff --git a/apps/common/management/commands/services/services/celery_default.py b/apps/common/management/commands/services/services/celery_default.py new file mode 100644 index 000000000..5d3e6d7b8 --- /dev/null +++ b/apps/common/management/commands/services/services/celery_default.py @@ -0,0 +1,10 @@ +from .celery_base import CeleryBaseService + +__all__ = ['CeleryDefaultService'] + + +class CeleryDefaultService(CeleryBaseService): + + def __init__(self, **kwargs): + kwargs['queue'] = 'celery' + super().__init__(**kwargs) diff --git a/apps/common/management/commands/services/services/gunicorn.py b/apps/common/management/commands/services/services/gunicorn.py new file mode 100644 index 000000000..cc42c4f7c --- /dev/null +++ b/apps/common/management/commands/services/services/gunicorn.py @@ -0,0 +1,36 @@ +from .base import BaseService +from ..hands import * + +__all__ = ['GunicornService'] + + +class GunicornService(BaseService): + + def __init__(self, **kwargs): + self.worker = kwargs['worker_gunicorn'] + super().__init__(**kwargs) + + @property + def cmd(self): + print("\n- Start Gunicorn WSGI HTTP Server") + + log_format = '%(h)s %(t)s %(L)ss "%(r)s" %(s)s %(b)s ' + bind = f'{HTTP_HOST}:{HTTP_PORT}' + cmd = [ + 'gunicorn', 'smartdoc.wsgi:application', + '-b', bind, + '-k', 'gthread', + '--threads', '200', + '-w', str(self.worker), + '--max-requests', '10240', + '--max-requests-jitter', '2048', + '--access-logformat', log_format, + '--access-logfile', '-' + ] + if DEBUG: + cmd.append('--reload') + return cmd + + @property + def cwd(self): + return APPS_DIR diff --git a/apps/common/management/commands/services/services/local_model.py b/apps/common/management/commands/services/services/local_model.py new file mode 100644 index 000000000..7591a2544 --- /dev/null +++ b/apps/common/management/commands/services/services/local_model.py @@ -0,0 +1,46 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎 + @file: local_model.py + @date:2024/8/21 13:28 + @desc: +""" +from maxkb.const import CONFIG +from .base import BaseService +from ..hands import * + +__all__ = ['GunicornLocalModelService'] + + +class GunicornLocalModelService(BaseService): + + def __init__(self, **kwargs): + self.worker = kwargs['worker_gunicorn'] + super().__init__(**kwargs) + + @property + def cmd(self): + print("\n- Start Gunicorn Local Model WSGI HTTP Server") + os.environ.setdefault('SERVER_NAME', 'local_model') + log_format = '%(h)s %(t)s %(L)ss "%(r)s" %(s)s %(b)s ' + bind = f'{CONFIG.get("LOCAL_MODEL_HOST")}:{CONFIG.get("LOCAL_MODEL_PORT")}' + worker = CONFIG.get("LOCAL_MODEL_HOST_WORKER", 1) + cmd = [ + 'gunicorn', 'smartdoc.wsgi:application', + '-b', bind, + '-k', 'gthread', + '--threads', '200', + '-w', str(worker), + '--max-requests', '10240', + '--max-requests-jitter', '2048', + '--access-logformat', log_format, + '--access-logfile', '-' + ] + if DEBUG: + cmd.append('--reload') + return cmd + + @property + def cwd(self): + return APPS_DIR diff --git a/apps/common/management/commands/services/utils.py b/apps/common/management/commands/services/utils.py new file mode 100644 index 000000000..2426758b8 --- /dev/null +++ b/apps/common/management/commands/services/utils.py @@ -0,0 +1,140 @@ +import threading +import signal +import time +import daemon +from daemon import pidfile +from .hands import * +from .hands import __version__ +from .services.base import BaseService + + +class ServicesUtil(object): + + def __init__(self, services, run_daemon=False, force_stop=False, stop_daemon=False): + self._services = services + self.run_daemon = run_daemon + self.force_stop = force_stop + self.stop_daemon = stop_daemon + self.EXIT_EVENT = threading.Event() + self.check_interval = 30 + self.files_preserve_map = {} + + def restart(self): + self.stop() + time.sleep(5) + self.start_and_watch() + + def start_and_watch(self): + logging.info(time.ctime()) + logging.info(f'MaxKB version {__version__}, more see https://www.jumpserver.org') + self.start() + if self.run_daemon: + self.show_status() + with self.daemon_context: + self.watch() + else: + self.watch() + + def start(self): + for service in self._services: + service: BaseService + service.start() + self.files_preserve_map[service.name] = service.log_file + + time.sleep(1) + + def stop(self): + for service in self._services: + service: BaseService + service.stop(force=self.force_stop) + + if self.stop_daemon: + self._stop_daemon() + + # -- watch -- + def watch(self): + while not self.EXIT_EVENT.is_set(): + try: + _exit = self._watch() + if _exit: + break + time.sleep(self.check_interval) + except KeyboardInterrupt: + print('Start stop services') + break + self.clean_up() + + def _watch(self): + for service in self._services: + service: BaseService + service.watch() + if service.EXIT_EVENT.is_set(): + self.EXIT_EVENT.set() + return True + return False + # -- end watch -- + + def clean_up(self): + if not self.EXIT_EVENT.is_set(): + self.EXIT_EVENT.set() + self.stop() + + def show_status(self): + for service in self._services: + service: BaseService + service.show_status() + + # -- daemon -- + def _stop_daemon(self): + if self.daemon_pid and self.daemon_is_running: + os.kill(self.daemon_pid, 15) + self.remove_daemon_pid() + + def remove_daemon_pid(self): + if os.path.isfile(self.daemon_pid_filepath): + os.unlink(self.daemon_pid_filepath) + + @property + def daemon_pid(self): + if not os.path.isfile(self.daemon_pid_filepath): + return 0 + with open(self.daemon_pid_filepath) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = 0 + return pid + + @property + def daemon_is_running(self): + try: + os.kill(self.daemon_pid, 0) + except (OSError, ProcessLookupError): + return False + else: + return True + + @property + def daemon_pid_filepath(self): + return os.path.join(TMP_DIR, 'mk.pid') + + @property + def daemon_log_filepath(self): + return os.path.join(LOG_DIR, 'mk.log') + + @property + def daemon_context(self): + daemon_log_file = open(self.daemon_log_filepath, 'a') + context = daemon.DaemonContext( + pidfile=pidfile.TimeoutPIDLockFile(self.daemon_pid_filepath), + signal_map={ + signal.SIGTERM: lambda x, y: self.clean_up(), + signal.SIGHUP: 'terminate', + }, + stdout=daemon_log_file, + stderr=daemon_log_file, + files_preserve=list(self.files_preserve_map.values()), + detach_process=True, + ) + return context + # -- end daemon -- diff --git a/apps/common/management/commands/start.py b/apps/common/management/commands/start.py new file mode 100644 index 000000000..4c078a876 --- /dev/null +++ b/apps/common/management/commands/start.py @@ -0,0 +1,6 @@ +from .services.command import BaseActionCommand, Action + + +class Command(BaseActionCommand): + help = 'Start services' + action = Action.start.value diff --git a/apps/common/management/commands/status.py b/apps/common/management/commands/status.py new file mode 100644 index 000000000..36f0d3608 --- /dev/null +++ b/apps/common/management/commands/status.py @@ -0,0 +1,6 @@ +from .services.command import BaseActionCommand, Action + + +class Command(BaseActionCommand): + help = 'Show services status' + action = Action.status.value diff --git a/apps/common/management/commands/stop.py b/apps/common/management/commands/stop.py new file mode 100644 index 000000000..a79a5335c --- /dev/null +++ b/apps/common/management/commands/stop.py @@ -0,0 +1,6 @@ +from .services.command import BaseActionCommand, Action + + +class Command(BaseActionCommand): + help = 'Stop services' + action = Action.stop.value diff --git a/pyproject.toml b/pyproject.toml index fac976ebe..1207cc947 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,9 @@ xlrd = "2.0.1" xlwt = "1.3.0" pymupdf = "1.26.0" pypdf = "5.5.0" +gunicorn = "23.0.0" +python-daemon = "3.1.2" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api"