feat: add MySQL and PostgreSQL query templates with JSON serialization
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run

This commit is contained in:
CaptainB 2025-07-21 17:45:20 +08:00
parent 02d6239a71
commit 0ba9b97752

View File

@ -0,0 +1,127 @@
# Generated by Django 4.2.15 on 2025-03-13 07:21
from django.db import migrations
from django.db.models import Q
mysql_template = """
def query_mysql(host,port, user, password, database, sql):
import pymysql
import json
from pymysql.cursors import DictCursor
from datetime import datetime, date
def default_serializer(obj):
from decimal import Decimal
if isinstance(obj, (datetime, date)):
return obj.isoformat() # 将 datetime/date 转换为 ISO 格式字符串
elif isinstance(obj, Decimal):
return float(obj) # 将 Decimal 转换为 float
raise TypeError(f"Type {type(obj)} not serializable")
try:
# 创建连接
db = pymysql.connect(
host=host,
port=int(port),
user=user,
password=password,
database=database,
cursorclass=DictCursor # 使用字典游标
)
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# 使用 execute() 方法执行 SQL 查询
cursor.execute(sql)
# 使用 fetchall() 方法获取所有数据
data = cursor.fetchall()
# 处理 bytes 类型的数据
for row in data:
for key, value in row.items():
if isinstance(value, bytes):
row[key] = value.decode("utf-8") # 转换为字符串
# 将数据序列化为 JSON
json_data = json.dumps(data, default=default_serializer, ensure_ascii=False)
return json_data
# 关闭数据库连接
db.close()
except Exception as e:
print(f"Error while connecting to MySQL: {e}")
raise e
"""
pgsql_template = """
def queryPgSQL(database, user, password, host, port, query):
import psycopg2
import json
from datetime import datetime
# 自定义 JSON 序列化函数
def default_serializer(obj):
from decimal import Decimal
if isinstance(obj, datetime):
return obj.isoformat() # 将 datetime 转换为 ISO 格式字符串
elif isinstance(obj, Decimal):
return float(obj) # 将 Decimal 转换为 float
raise TypeError(f"Type {type(obj)} not serializable")
# 数据库连接信息
conn_params = {
"dbname": database,
"user": user,
"password": password,
"host": host,
"port": port
}
try:
# 建立连接
conn = psycopg2.connect(**conn_params)
print("连接成功!")
# 创建游标对象
cursor = conn.cursor()
# 执行查询语句
cursor.execute(query)
# 获取查询结果
rows = cursor.fetchall()
# 处理 bytes 类型的数据
columns = [desc[0] for desc in cursor.description]
result = [dict(zip(columns, row)) for row in rows]
# 转换为 JSON 格式
json_result = json.dumps(result, default=default_serializer, ensure_ascii=False)
return json_result
except Exception as e:
print(f"发生错误:{e}")
raise e
finally:
# 关闭游标和连接
if cursor:
cursor.close()
if conn:
conn.close()
"""
def fix_type(apps, schema_editor):
FunctionLib = apps.get_model('function_lib', 'FunctionLib')
FunctionLib.objects.filter(
Q(id='22c21b76-0308-11f0-9694-5618c4394482') | Q(template_id='22c21b76-0308-11f0-9694-5618c4394482')
).update(code=mysql_template)
FunctionLib.objects.filter(
Q(id='bd1e8b88-0302-11f0-87bb-5618c4394482') | Q(template_id='bd1e8b88-0302-11f0-87bb-5618c4394482')
).update(code=pgsql_template)
class Migration(migrations.Migration):
dependencies = [
('function_lib', '0003_functionlib_function_type_functionlib_icon_and_more'),
]
operations = [
migrations.RunPython(fix_type)
]