perf: Optimize file writing and writing (#4512)
Some checks failed
sync2gitee / repo-sync (push) Has been cancelled
Typos Check / Spell Check with Typos (push) Has been cancelled

This commit is contained in:
shaohuzhang1 2025-12-12 18:52:27 +08:00 committed by GitHub
parent d393e28273
commit 585302a2c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -342,39 +342,85 @@ class File(AppModelMixin):
db_table = "file"
def save(self, bytea=None, force_insert=False, force_update=False, using=None, update_fields=None):
if bytea is None:
raise ValueError("bytea参数不能为空")
sha256_hash = get_sha256_hash(bytea)
# 创建压缩文件
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# 设置压缩级别为最高(9)
existing_file = QuerySet(File).filter(sha256_hash=sha256_hash).first()
if existing_file:
self.loid = existing_file.loid
return super().save()
compressed_data = self._compress_data(bytea)
self.loid = self._create_large_object()
self._write_compressed_data(compressed_data)
# 调用父类保存
return super().save()
def _compress_data(self, data, compression_level=9):
"""压缩数据到内存"""
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
zipinfo = zipfile.ZipInfo(self.file_name)
zipinfo.compress_type = zipfile.ZIP_DEFLATED
zip_file.writestr(zipinfo, bytea, compresslevel=9)
# 获取压缩后的数据
compressed_data = zip_buffer.getvalue()
f = QuerySet(File).filter(sha256_hash=sha256_hash).first()
if f is not None:
self.loid = f.loid
else:
result = select_one("SELECT lo_from_bytea(%s, %s::bytea) as loid", [0, bytea])
self.loid = result['loid']
self.file_size = len(compressed_data)
self.sha256_hash = sha256_hash
# 可以在元数据中记录原始大小
if 'original_size' not in self.meta:
self.meta['original_size'] = len(bytea)
super().save()
zip_file.writestr(zipinfo, data, compresslevel=compression_level)
return buffer.getvalue()
def _create_large_object(self):
result = select_one("SELECT lo_creat(-1)::int8 as lo_id;", [])
return result['lo_id']
def _write_compressed_data(self, data, block_size=64 * 1024):
buffer = io.BytesIO(data)
offset = 0
while True:
chunk = buffer.read(block_size)
if not chunk:
break
offset += len(chunk)
select_one(
"SELECT lo_put(%s::oid, %s::bigint, %s::bytea)::VARCHAR;",
[self.loid, offset - len(chunk), chunk]
)
def get_bytes(self):
result = select_one(f'SELECT lo_get({self.loid}) as "data"', [])
compressed_data = result['data']
buffer = io.BytesIO()
for chunk in self.get_bytes_stream():
buffer.write(chunk)
try:
# 解压数据
with zipfile.ZipFile(io.BytesIO(compressed_data)) as zip_file:
with zipfile.ZipFile(buffer) as zip_file:
return zip_file.read(self.file_name)
except Exception as e:
# 如果数据不是zip格式直接返回原始数据
return compressed_data
return buffer.getvalue()
def get_bytes_stream(self, start=0, end=None, chunk_size=64 * 1024):
def _read_with_offset():
offset = start
while True:
result = select_one(
"SELECT lo_get(%s::oid, %s, %s) as chunk",
[self.loid, offset, end - offset if end and (end - offset) < chunk_size else chunk_size]
)
chunk = result['chunk'] if result else None
if not chunk:
break
yield chunk
offset += len(chunk)
if len(chunk) < chunk_size:
break
if end and offset > end:
break
return _read_with_offset()
@receiver(pre_delete, sender=File)