MaxKB/apps/common/db/compiler.py

218 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: maxkb
@Author
@file compiler.py
@date2023/10/7 10:53
@desc:
"""
from django.core.exceptions import EmptyResultSet, FullResultSet
from django.db import NotSupportedError
from django.db.models.sql.compiler import SQLCompiler
from django.db.transaction import TransactionManagementError
class AppSQLCompiler(SQLCompiler):
def __init__(self, query, connection, using, elide_empty=True, field_replace_dict=None):
super().__init__(query, connection, using, elide_empty)
if field_replace_dict is None:
field_replace_dict = {}
self.field_replace_dict = field_replace_dict
def get_query_str(self, with_limits=True, with_table_name=False, with_col_aliases=False):
refcounts_before = self.query.alias_refcount.copy()
try:
combinator = self.query.combinator
extra_select, order_by, group_by = self.pre_sql_setup(
with_col_aliases=with_col_aliases or bool(combinator),
)
for_update_part = None
# Is a LIMIT/OFFSET clause needed?
with_limit_offset = with_limits and self.query.is_sliced
combinator = self.query.combinator
features = self.connection.features
if combinator:
if not getattr(features, "supports_select_{}".format(combinator)):
raise NotSupportedError(
"{} is not supported on this database backend.".format(
combinator
)
)
result, params = self.get_combinator_sql(
combinator, self.query.combinator_all
)
elif self.qualify:
result, params = self.get_qualify_sql()
order_by = None
else:
distinct_fields, distinct_params = self.get_distinct()
# This must come after 'select', 'ordering', and 'distinct'
# (see docstring of get_from_clause() for details).
from_, f_params = self.get_from_clause()
try:
where, w_params = (
self.compile(self.where) if self.where is not None else ("", [])
)
except EmptyResultSet:
if self.elide_empty:
raise
# Use a predicate that's always False.
where, w_params = "0 = 1", []
except FullResultSet:
where, w_params = "", []
try:
having, h_params = (
self.compile(self.having)
if self.having is not None
else ("", [])
)
except FullResultSet:
having, h_params = "", []
result = []
params = []
if self.query.distinct:
distinct_result, distinct_params = self.connection.ops.distinct_sql(
distinct_fields,
distinct_params,
)
result += distinct_result
params += distinct_params
out_cols = []
for _, (s_sql, s_params), alias in self.select + extra_select:
if alias:
s_sql = "%s AS %s" % (
s_sql,
self.connection.ops.quote_name(alias),
)
params.extend(s_params)
out_cols.append(s_sql)
params.extend(f_params)
if self.query.select_for_update and features.has_select_for_update:
if (
self.connection.get_autocommit()
# Don't raise an exception when database doesn't
# support transactions, as it's a noop.
and features.supports_transactions
):
raise TransactionManagementError(
"select_for_update cannot be used outside of a transaction."
)
if (
with_limit_offset
and not features.supports_select_for_update_with_limit
):
raise NotSupportedError(
"LIMIT/OFFSET is not supported with "
"select_for_update on this database backend."
)
nowait = self.query.select_for_update_nowait
skip_locked = self.query.select_for_update_skip_locked
of = self.query.select_for_update_of
no_key = self.query.select_for_no_key_update
# If it's a NOWAIT/SKIP LOCKED/OF/NO KEY query but the
# backend doesn't support it, raise NotSupportedError to
# prevent a possible deadlock.
if nowait and not features.has_select_for_update_nowait:
raise NotSupportedError(
"NOWAIT is not supported on this database backend."
)
elif skip_locked and not features.has_select_for_update_skip_locked:
raise NotSupportedError(
"SKIP LOCKED is not supported on this database backend."
)
elif of and not features.has_select_for_update_of:
raise NotSupportedError(
"FOR UPDATE OF is not supported on this database backend."
)
elif no_key and not features.has_select_for_no_key_update:
raise NotSupportedError(
"FOR NO KEY UPDATE is not supported on this "
"database backend."
)
for_update_part = self.connection.ops.for_update_sql(
nowait=nowait,
skip_locked=skip_locked,
of=self.get_select_for_update_of_arguments(),
no_key=no_key,
)
if for_update_part and features.for_update_after_from:
result.append(for_update_part)
if where:
result.append("WHERE %s" % where)
params.extend(w_params)
grouping = []
for g_sql, g_params in group_by:
grouping.append(g_sql)
params.extend(g_params)
if grouping:
if distinct_fields:
raise NotImplementedError(
"annotate() + distinct(fields) is not implemented."
)
order_by = order_by or self.connection.ops.force_no_ordering()
result.append("GROUP BY %s" % ", ".join(grouping))
if self._meta_ordering:
order_by = None
if having:
result.append("HAVING %s" % having)
params.extend(h_params)
if self.query.explain_info:
result.insert(
0,
self.connection.ops.explain_query_prefix(
self.query.explain_info.format,
**self.query.explain_info.options,
),
)
if order_by:
ordering = []
for _, (o_sql, o_params, _) in order_by:
ordering.append(o_sql)
params.extend(o_params)
order_by_sql = "ORDER BY %s" % ", ".join(ordering)
if combinator and features.requires_compound_order_by_subquery:
result = ["SELECT * FROM (", *result, ")", order_by_sql]
else:
result.append(order_by_sql)
if with_limit_offset:
result.append(
self.connection.ops.limit_offset_sql(
self.query.low_mark, self.query.high_mark
)
)
if for_update_part and not features.for_update_after_from:
result.append(for_update_part)
from_, f_params = self.get_from_clause()
sql = " ".join(result)
if not with_table_name:
for table_name in from_:
sql = sql.replace(table_name + ".", "")
for key in self.field_replace_dict.keys():
value = self.field_replace_dict.get(key)
sql = sql.replace(key, value)
return sql, tuple(params)
finally:
# Finally do cleanup - get rid of the joins we created above.
self.query.reset_refcounts(refcounts_before)
def as_sql(self, with_limits=True, with_col_aliases=False, select_string=None):
if select_string is None:
return super().as_sql(with_limits, with_col_aliases)
else:
sql, params = self.get_query_str(with_table_name=False)
return (select_string + " " + sql), params