diff --git a/apps/function_lib/migrations/0004_functionlib_decimal_date.py b/apps/function_lib/migrations/0004_functionlib_decimal_date.py new file mode 100644 index 000000000..82e4a6d02 --- /dev/null +++ b/apps/function_lib/migrations/0004_functionlib_decimal_date.py @@ -0,0 +1,127 @@ +# Generated by Django 4.2.15 on 2025-03-13 07:21 + +from django.db import migrations +from django.db.models import Q + +mysql_template = """ +def query_mysql(host,port, user, password, database, sql): + import pymysql + import json + from pymysql.cursors import DictCursor + from datetime import datetime, date + + def default_serializer(obj): + from decimal import Decimal + if isinstance(obj, (datetime, date)): + return obj.isoformat() # 将 datetime/date 转换为 ISO 格式字符串 + elif isinstance(obj, Decimal): + return float(obj) # 将 Decimal 转换为 float + raise TypeError(f"Type {type(obj)} not serializable") + + try: + # 创建连接 + db = pymysql.connect( + host=host, + port=int(port), + user=user, + password=password, + database=database, + cursorclass=DictCursor # 使用字典游标 + ) + + # 使用 cursor() 方法创建一个游标对象 cursor + cursor = db.cursor() + + # 使用 execute() 方法执行 SQL 查询 + cursor.execute(sql) + + # 使用 fetchall() 方法获取所有数据 + data = cursor.fetchall() + + # 处理 bytes 类型的数据 + for row in data: + for key, value in row.items(): + if isinstance(value, bytes): + row[key] = value.decode("utf-8") # 转换为字符串 + + # 将数据序列化为 JSON + json_data = json.dumps(data, default=default_serializer, ensure_ascii=False) + return json_data + + # 关闭数据库连接 + db.close() + + except Exception as e: + print(f"Error while connecting to MySQL: {e}") + raise e +""" + +pgsql_template = """ +def queryPgSQL(database, user, password, host, port, query): + import psycopg2 + import json + from datetime import datetime + + # 自定义 JSON 序列化函数 + def default_serializer(obj): + from decimal import Decimal + if isinstance(obj, datetime): + return obj.isoformat() # 将 datetime 转换为 ISO 格式字符串 + elif isinstance(obj, Decimal): + return float(obj) # 将 Decimal 转换为 float + raise TypeError(f"Type {type(obj)} not serializable") + + # 数据库连接信息 + conn_params = { + "dbname": database, + "user": user, + "password": password, + "host": host, + "port": port + } + try: + # 建立连接 + conn = psycopg2.connect(**conn_params) + print("连接成功!") + # 创建游标对象 + cursor = conn.cursor() + # 执行查询语句 + cursor.execute(query) + # 获取查询结果 + rows = cursor.fetchall() + # 处理 bytes 类型的数据 + columns = [desc[0] for desc in cursor.description] + result = [dict(zip(columns, row)) for row in rows] + # 转换为 JSON 格式 + json_result = json.dumps(result, default=default_serializer, ensure_ascii=False) + return json_result + except Exception as e: + print(f"发生错误:{e}") + raise e + finally: + # 关闭游标和连接 + if cursor: + cursor.close() + if conn: + conn.close() +""" + + +def fix_type(apps, schema_editor): + FunctionLib = apps.get_model('function_lib', 'FunctionLib') + FunctionLib.objects.filter( + Q(id='22c21b76-0308-11f0-9694-5618c4394482') | Q(template_id='22c21b76-0308-11f0-9694-5618c4394482') + ).update(code=mysql_template) + FunctionLib.objects.filter( + Q(id='bd1e8b88-0302-11f0-87bb-5618c4394482') | Q(template_id='bd1e8b88-0302-11f0-87bb-5618c4394482') + ).update(code=pgsql_template) + + +class Migration(migrations.Migration): + dependencies = [ + ('function_lib', '0003_functionlib_function_type_functionlib_icon_and_more'), + ] + + operations = [ + migrations.RunPython(fix_type) + ]