MaxKB/apps/common/db/search.py

125 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: maxkb
@Author
@file search.py
@date2023/10/7 18:20
@desc:
"""
from django.db import DEFAULT_DB_ALIAS, models
from django.db.models import QuerySet
from common.db.compiler import AppSQLCompiler
from common.db.sql_execute import select_one, select_list
from common.response.result import Page
def get_dynamics_model(attr: dict, table_name='dynamics'):
"""
获取一个动态的django模型
:param attr: 模型字段
:param table_name: 表名
:return: django 模型
"""
attributes = {
"__module__": "dataset.models",
"Meta": type("Meta", (), {'db_table': table_name}),
**attr
}
return type('Dynamics', (models.Model,), attributes)
def native_search(queryset: QuerySet, select_string: str,
field_replace_dict=None,
with_search_one=False):
"""
复杂查询
:param queryset: 查询条件构造器
:param select_string: 查询前缀 不包括 where limit 等信息
:param field_replace_dict: 需要替换的字段
:param with_search_one: 查询
:return: 查询结果
"""
if field_replace_dict is None:
field_replace_dict = get_field_replace_dict(queryset)
q = queryset.query
compiler = q.get_compiler(DEFAULT_DB_ALIAS)
app_sql_compiler = AppSQLCompiler(q, using=DEFAULT_DB_ALIAS, connection=compiler.connection,
field_replace_dict=field_replace_dict)
sql, params = app_sql_compiler.get_query_str(with_table_name=False)
if with_search_one:
return select_one(select_string + " " +
sql, params)
else:
return select_list(select_string + " " +
sql, params)
def page_search(current_page: int, page_size: int, queryset: QuerySet, post_records_handler):
"""
分页查询
:param current_page: 当前页
:param page_size: 每页大小
:param queryset: 查询条件
:param post_records_handler: 数据处理器
:return: 分页结果
"""
total = QuerySet(query=queryset.query.clone(), model=queryset.model).count()
result = queryset.all()[((current_page - 1) * page_size):(current_page * page_size)]
return Page(total, list(map(post_records_handler, result)), current_page, page_size)
def native_page_search(current_page: int, page_size: int, queryset: QuerySet, select_string: str,
field_replace_dict=None,
post_records_handler=lambda r: r):
"""
复杂分页查询
:param current_page: 当前页
:param page_size: 每页大小
:param queryset: 查询条件
:param select_string: 查询
:param field_replace_dict: 特殊字段替换
:param post_records_handler: 数据row处理器
:return: 分页结果
"""
if field_replace_dict is None:
field_replace_dict = get_field_replace_dict(queryset)
q = queryset.query
compiler = q.get_compiler(DEFAULT_DB_ALIAS)
app_sql_compiler = AppSQLCompiler(q, using=DEFAULT_DB_ALIAS, connection=compiler.connection,
field_replace_dict=field_replace_dict)
page_sql, params = app_sql_compiler.get_query_str(with_table_name=False)
total_sql = "SELECT \"count\"(*) FROM (%s) temp" % (select_string + " " + page_sql)
total = select_one(total_sql, params)
q.set_limits(((current_page - 1) * page_size), (current_page * page_size))
app_sql_compiler = AppSQLCompiler(q, using=DEFAULT_DB_ALIAS, connection=compiler.connection,
field_replace_dict=field_replace_dict)
page_sql, params = app_sql_compiler.get_query_str(with_table_name=False)
result = select_list(select_string + " " + page_sql, params)
return Page(total.get("count"), list(map(post_records_handler, result)), current_page, page_size)
def get_field_replace_dict(queryset: QuerySet):
"""
获取需要替换的字段 默认 “xxx.xxx”需要被替换成 “xxx”."xxx"
:param queryset: 查询对象
:return: 需要替换的字典
"""
result = {}
for field in queryset.model._meta.local_fields:
if field.attname.__contains__("."):
replace_field = to_replace_field(field.attname)
result.__setitem__('"' + field.attname + '"', replace_field)
return result
def to_replace_field(field: str):
"""
将field 转换为 需要替换的field “xxx.xxx”需要被替换成 “xxx”."xxx" 只替换 field包含.的字段
:param field: django field字段
:return: 替换字段
"""
split_field = field.split(".")
return ".".join(list(map(lambda sf: '"' + sf + '"', split_field)))