MaxKB/apps/common/util/common.py
2024-12-13 14:42:14 +08:00

139 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: maxkb
@Author
@file common.py
@date2023/10/16 16:42
@desc:
"""
import hashlib
import importlib
import mimetypes
import io
from functools import reduce
from typing import Dict, List
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.db.models import QuerySet
from ..exception.app_exception import AppApiException
from ..models.db_model_manage import DBModelManage
def sub_array(array: List, item_num=10):
result = []
temp = []
for item in array:
temp.append(item)
if len(temp) >= item_num:
result.append(temp)
temp = []
if len(temp) > 0:
result.append(temp)
return result
def query_params_to_single_dict(query_params: Dict):
return reduce(lambda x, y: {**x, **y}, list(
filter(lambda item: item is not None, [({key: value} if value is not None and len(value) > 0 else None) for
key, value in
query_params.items()])), {})
def get_exec_method(clazz_: str, method_: str):
"""
根据 class 和method函数 获取执行函数
:param clazz_: class 字符串
:param method_: 执行函数
:return: 执行函数
"""
clazz_split = clazz_.split('.')
clazz_name = clazz_split[-1]
package = ".".join([clazz_split[index] for index in range(len(clazz_split) - 1)])
package_model = importlib.import_module(package)
return getattr(getattr(package_model, clazz_name), method_)
def flat_map(array: List[List]):
"""
将二位数组转为一维数组
:param array: 二维数组
:return: 一维数组
"""
result = []
for e in array:
result += e
return result
def password_encrypt(raw_password):
"""
密码 md5加密
:param raw_password: 密码
:return: 加密后密码
"""
md5 = hashlib.md5() # 2实例化md5() 方法
md5.update(raw_password.encode()) # 3对字符串的字节类型加密
result = md5.hexdigest() # 4加密
return result
def post(post_function):
def inner(func):
def run(*args, **kwargs):
result = func(*args, **kwargs)
return post_function(*result)
return run
return inner
def valid_license(model=None, count=None, message=None):
def inner(func):
def run(*args, **kwargs):
xpack_cache = DBModelManage.get_model('xpack_cache')
is_license_valid = xpack_cache.get('XPACK_LICENSE_IS_VALID', False) if xpack_cache is not None else False
record_count = QuerySet(model).count()
if not is_license_valid and record_count >= count:
error_message = message or f'超出限制{count}, 请联系我们https://fit2cloud.com/)。'
raise AppApiException(400, error_message)
return func(*args, **kwargs)
return run
return inner
def bulk_create_in_batches(model, data, batch_size=1000):
if len(data) == 0:
return
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
model.objects.bulk_create(batch)
def bytes_to_uploaded_file(file_bytes, file_name="file.txt"):
content_type, _ = mimetypes.guess_type(file_name)
if content_type is None:
# 如果未能识别,设置为默认的二进制文件类型
content_type = "application/octet-stream"
# 创建一个内存中的字节流对象
file_stream = io.BytesIO(file_bytes)
# 获取文件大小
file_size = len(file_bytes)
# 创建 InMemoryUploadedFile 对象
uploaded_file = InMemoryUploadedFile(
file=file_stream,
field_name=None,
name=file_name,
content_type=content_type,
size=file_size,
charset=None,
)
return uploaded_file