mirror of
https://github.com/1Panel-dev/MaxKB.git
synced 2025-12-26 18:32:48 +00:00
128 lines
3.8 KiB
Python
128 lines
3.8 KiB
Python
# Generated by Django 4.2.15 on 2025-03-13 07:21
|
|
|
|
from django.db import migrations
|
|
from django.db.models import Q
|
|
|
|
mysql_template = """
|
|
def query_mysql(host,port, user, password, database, sql):
|
|
import pymysql
|
|
import json
|
|
from pymysql.cursors import DictCursor
|
|
from datetime import datetime, date
|
|
|
|
def default_serializer(obj):
|
|
from decimal import Decimal
|
|
if isinstance(obj, (datetime, date)):
|
|
return obj.isoformat() # 将 datetime/date 转换为 ISO 格式字符串
|
|
elif isinstance(obj, Decimal):
|
|
return float(obj) # 将 Decimal 转换为 float
|
|
raise TypeError(f"Type {type(obj)} not serializable")
|
|
|
|
try:
|
|
# 创建连接
|
|
db = pymysql.connect(
|
|
host=host,
|
|
port=int(port),
|
|
user=user,
|
|
password=password,
|
|
database=database,
|
|
cursorclass=DictCursor # 使用字典游标
|
|
)
|
|
|
|
# 使用 cursor() 方法创建一个游标对象 cursor
|
|
cursor = db.cursor()
|
|
|
|
# 使用 execute() 方法执行 SQL 查询
|
|
cursor.execute(sql)
|
|
|
|
# 使用 fetchall() 方法获取所有数据
|
|
data = cursor.fetchall()
|
|
|
|
# 处理 bytes 类型的数据
|
|
for row in data:
|
|
for key, value in row.items():
|
|
if isinstance(value, bytes):
|
|
row[key] = value.decode("utf-8") # 转换为字符串
|
|
|
|
# 将数据序列化为 JSON
|
|
json_data = json.dumps(data, default=default_serializer, ensure_ascii=False)
|
|
return json_data
|
|
|
|
# 关闭数据库连接
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
print(f"Error while connecting to MySQL: {e}")
|
|
raise e
|
|
"""
|
|
|
|
pgsql_template = """
|
|
def queryPgSQL(database, user, password, host, port, query):
|
|
import psycopg2
|
|
import json
|
|
from datetime import datetime
|
|
|
|
# 自定义 JSON 序列化函数
|
|
def default_serializer(obj):
|
|
from decimal import Decimal
|
|
if isinstance(obj, datetime):
|
|
return obj.isoformat() # 将 datetime 转换为 ISO 格式字符串
|
|
elif isinstance(obj, Decimal):
|
|
return float(obj) # 将 Decimal 转换为 float
|
|
raise TypeError(f"Type {type(obj)} not serializable")
|
|
|
|
# 数据库连接信息
|
|
conn_params = {
|
|
"dbname": database,
|
|
"user": user,
|
|
"password": password,
|
|
"host": host,
|
|
"port": port
|
|
}
|
|
try:
|
|
# 建立连接
|
|
conn = psycopg2.connect(**conn_params)
|
|
print("连接成功!")
|
|
# 创建游标对象
|
|
cursor = conn.cursor()
|
|
# 执行查询语句
|
|
cursor.execute(query)
|
|
# 获取查询结果
|
|
rows = cursor.fetchall()
|
|
# 处理 bytes 类型的数据
|
|
columns = [desc[0] for desc in cursor.description]
|
|
result = [dict(zip(columns, row)) for row in rows]
|
|
# 转换为 JSON 格式
|
|
json_result = json.dumps(result, default=default_serializer, ensure_ascii=False)
|
|
return json_result
|
|
except Exception as e:
|
|
print(f"发生错误:{e}")
|
|
raise e
|
|
finally:
|
|
# 关闭游标和连接
|
|
if cursor:
|
|
cursor.close()
|
|
if conn:
|
|
conn.close()
|
|
"""
|
|
|
|
|
|
def fix_type(apps, schema_editor):
|
|
FunctionLib = apps.get_model('function_lib', 'FunctionLib')
|
|
FunctionLib.objects.filter(
|
|
Q(id='22c21b76-0308-11f0-9694-5618c4394482') | Q(template_id='22c21b76-0308-11f0-9694-5618c4394482')
|
|
).update(code=mysql_template)
|
|
FunctionLib.objects.filter(
|
|
Q(id='bd1e8b88-0302-11f0-87bb-5618c4394482') | Q(template_id='bd1e8b88-0302-11f0-87bb-5618c4394482')
|
|
).update(code=pgsql_template)
|
|
|
|
|
|
class Migration(migrations.Migration):
|
|
dependencies = [
|
|
('function_lib', '0003_functionlib_function_type_functionlib_icon_and_more'),
|
|
]
|
|
|
|
operations = [
|
|
migrations.RunPython(fix_type)
|
|
]
|