mirror of
https://github.com/1Panel-dev/MaxKB.git
synced 2025-12-26 01:33:05 +00:00
84 lines
2.3 KiB
Python
84 lines
2.3 KiB
Python
# coding=utf-8
|
||
"""
|
||
@project: maxkb
|
||
@Author:虎
|
||
@file: common.py
|
||
@date:2023/10/16 16:42
|
||
@desc:
|
||
"""
|
||
import datetime
|
||
import importlib
|
||
import uuid
|
||
from functools import reduce
|
||
from typing import Dict, List
|
||
from django.core import cache
|
||
|
||
from .rsa_util import encrypt
|
||
|
||
chat_cache = cache.caches['chat_cache']
|
||
|
||
|
||
def set_embed_identity_cookie(request, response):
|
||
if 'embed_identity' in request.COOKIES:
|
||
embed_identity = request.COOKIES['embed_identity']
|
||
else:
|
||
value = str(uuid.uuid1())
|
||
embed_identity = encrypt(value)
|
||
chat_cache.set(value, 0, timeout=getRestSeconds())
|
||
response.set_cookie("embed_identity", embed_identity, max_age=3600 * 24 * 100, samesite='None',
|
||
secure=True)
|
||
return response
|
||
|
||
|
||
def getRestSeconds():
|
||
now = datetime.datetime.now()
|
||
today_begin = datetime.datetime(now.year, now.month, now.day, 0, 0, 0)
|
||
tomorrow_begin = today_begin + datetime.timedelta(days=1)
|
||
rest_seconds = (tomorrow_begin - now).seconds
|
||
return rest_seconds
|
||
|
||
|
||
def sub_array(array: List, item_num=10):
|
||
result = []
|
||
temp = []
|
||
for item in array:
|
||
temp.append(item)
|
||
if len(temp) >= item_num:
|
||
result.append(temp)
|
||
temp = []
|
||
if len(temp) > 0:
|
||
result.append(temp)
|
||
return result
|
||
|
||
|
||
def query_params_to_single_dict(query_params: Dict):
|
||
return reduce(lambda x, y: {**x, **y}, list(
|
||
filter(lambda item: item is not None, [({key: value} if value is not None and len(value) > 0 else None) for
|
||
key, value in
|
||
query_params.items()])), {})
|
||
|
||
|
||
def get_exec_method(clazz_: str, method_: str):
|
||
"""
|
||
根据 class 和method函数 获取执行函数
|
||
:param clazz_: class 字符串
|
||
:param method_: 执行函数
|
||
:return: 执行函数
|
||
"""
|
||
clazz_split = clazz_.split('.')
|
||
clazz_name = clazz_split[-1]
|
||
package = ".".join([clazz_split[index] for index in range(len(clazz_split) - 1)])
|
||
package_model = importlib.import_module(package)
|
||
return getattr(getattr(package_model, clazz_name), method_)
|
||
|
||
|
||
def post(post_function):
|
||
def inner(func):
|
||
def run(*args, **kwargs):
|
||
result = func(*args, **kwargs)
|
||
return post_function(*result)
|
||
|
||
return run
|
||
|
||
return inner
|