diff --git a/apps/knowledge/models/knowledge.py b/apps/knowledge/models/knowledge.py index 50b64b766..a32f3aa60 100644 --- a/apps/knowledge/models/knowledge.py +++ b/apps/knowledge/models/knowledge.py @@ -342,39 +342,85 @@ class File(AppModelMixin): db_table = "file" def save(self, bytea=None, force_insert=False, force_update=False, using=None, update_fields=None): + if bytea is None: + raise ValueError("bytea参数不能为空") + sha256_hash = get_sha256_hash(bytea) - # 创建压缩文件 - zip_buffer = io.BytesIO() - with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: - # 设置压缩级别为最高(9) + + existing_file = QuerySet(File).filter(sha256_hash=sha256_hash).first() + if existing_file: + self.loid = existing_file.loid + return super().save() + + compressed_data = self._compress_data(bytea) + + self.loid = self._create_large_object() + + self._write_compressed_data(compressed_data) + + # 调用父类保存 + return super().save() + + def _compress_data(self, data, compression_level=9): + """压缩数据到内存""" + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: zipinfo = zipfile.ZipInfo(self.file_name) zipinfo.compress_type = zipfile.ZIP_DEFLATED - zip_file.writestr(zipinfo, bytea, compresslevel=9) - # 获取压缩后的数据 - compressed_data = zip_buffer.getvalue() - f = QuerySet(File).filter(sha256_hash=sha256_hash).first() - if f is not None: - self.loid = f.loid - else: - result = select_one("SELECT lo_from_bytea(%s, %s::bytea) as loid", [0, bytea]) - self.loid = result['loid'] - self.file_size = len(compressed_data) - self.sha256_hash = sha256_hash - # 可以在元数据中记录原始大小 - if 'original_size' not in self.meta: - self.meta['original_size'] = len(bytea) - super().save() + zip_file.writestr(zipinfo, data, compresslevel=compression_level) + + return buffer.getvalue() + + def _create_large_object(self): + result = select_one("SELECT lo_creat(-1)::int8 as lo_id;", []) + return result['lo_id'] + + def _write_compressed_data(self, data, block_size=64 * 1024): + buffer = io.BytesIO(data) + offset = 0 + + while True: + chunk = buffer.read(block_size) + if not chunk: + break + + offset += len(chunk) + select_one( + "SELECT lo_put(%s::oid, %s::bigint, %s::bytea)::VARCHAR;", + [self.loid, offset - len(chunk), chunk] + ) def get_bytes(self): - result = select_one(f'SELECT lo_get({self.loid}) as "data"', []) - compressed_data = result['data'] + buffer = io.BytesIO() + for chunk in self.get_bytes_stream(): + buffer.write(chunk) try: # 解压数据 - with zipfile.ZipFile(io.BytesIO(compressed_data)) as zip_file: + with zipfile.ZipFile(buffer) as zip_file: return zip_file.read(self.file_name) except Exception as e: # 如果数据不是zip格式,直接返回原始数据 - return compressed_data + return buffer.getvalue() + + def get_bytes_stream(self, start=0, end=None, chunk_size=64 * 1024): + def _read_with_offset(): + offset = start + while True: + result = select_one( + "SELECT lo_get(%s::oid, %s, %s) as chunk", + [self.loid, offset, end - offset if end and (end - offset) < chunk_size else chunk_size] + ) + chunk = result['chunk'] if result else None + if not chunk: + break + yield chunk + offset += len(chunk) + if len(chunk) < chunk_size: + break + if end and offset > end: + break + + return _read_with_offset() @receiver(pre_delete, sender=File)