apt-sync: use logger instead of plain print

Signed-off-by: Shengqi Chen <harry-chen@outlook.com>
This commit is contained in:
Shengqi Chen 2025-09-18 21:35:49 +08:00
parent 4bcc59b962
commit 0f9f33cbf8
No known key found for this signature in database

View File

@ -1,21 +1,31 @@
#!/usr/bin/env python3
import hashlib
import traceback
import os
import re
import shutil
import argparse
import bz2
import gzip
import hashlib
import logging
import lzma
import os
import re
import shutil
import socket
import time
import traceback
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import List, Dict, Tuple
from typing import Dict, List, Tuple
import requests
import socket
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d - %(filename)s:%(lineno)d [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
handler.setFormatter(formatter)
logger.addHandler(handler)
APT_SYNC_USER_AGENT = os.getenv("APT_SYNC_USER_AGENT", "APT-Mirror-Tool/1.0")
requests.utils.default_user_agent = lambda: APT_SYNC_USER_AGENT
@ -75,7 +85,7 @@ def check_and_download(url: str, dst_file: Path, caching=False) -> int:
try:
if caching:
if url in download_cache:
print(f"Using cached content: {url}", flush=True)
logger.info(f"Using cached content: {url}")
with dst_file.open("wb") as f:
f.write(download_cache[url])
return 0
@ -104,7 +114,7 @@ def check_and_download(url: str, dst_file: Path, caching=False) -> int:
os.utime(dst_file, (remote_ts, remote_ts))
return 0
except BaseException as e:
print(e, flush=True)
logger.error(f"Error occurred: {e}")
if dst_file.is_file():
dst_file.unlink()
if url in download_cache:
@ -124,7 +134,7 @@ def move_files_in(src: Path, dst: Path):
empty = True
for file in src.glob("*"):
empty = False
print(f"moving {file} to {dst}")
logger.info(f"moving {file} to {dst}")
# shutil.move(str(file), str(dst))
if file.is_dir():
(dst / file.name).mkdir(parents=True, exist_ok=True)
@ -133,7 +143,7 @@ def move_files_in(src: Path, dst: Path):
else:
file.rename(dst / file.name) # Overwrite files
if empty:
print(f"{src} is empty")
logger.info(f"{src} is empty")
def apt_mirror(
@ -145,9 +155,9 @@ def apt_mirror(
deb_set: Dict[str, int],
) -> int:
if not dest_base_dir.is_dir():
print("Destination directory is empty, cannot continue")
logger.error("Destination directory is empty, cannot continue")
return 1
print(f"Started mirroring {base_url} {dist}, {repo}, {arch}!", flush=True)
logger.info(f"Started mirroring {base_url} {dist}, {repo}, {arch}!")
# download Release files
dist_dir, dist_tmp_dir = mkdir_with_dot_tmp(dest_base_dir / "dists" / dist)
@ -160,9 +170,9 @@ def apt_mirror(
)
!= 0
):
print("Invalid Repository")
logger.error("Invalid Repository")
if not (dist_dir / "Release").is_file():
print(
logger.warning(
f"{dist_dir/'Release'} never existed, upstream may not provide packages for {dist}, ignore this error"
)
return 0
@ -214,32 +224,25 @@ def apt_mirror(
deep_tmp_dir.mkdir(parents=True, exist_ok=True)
pkgidx_file = deep_tmp_dir / fn.name
else:
print(f"Ignore the file {filename}")
logger.warning(f"Ignore the file {filename}")
continue
pkglist_url = f"{base_url}/dists/{dist}/{filename}"
if check_and_download(pkglist_url, pkgidx_file) != 0:
print("Failed to download:", pkglist_url)
logger.error(f"Failed to download: {pkglist_url}")
continue
with pkgidx_file.open("rb") as t:
content = t.read()
if len(content) != int(filesize):
print(
f"Invalid size of {pkgidx_file}, expected {filesize}, skipped"
)
logger.error(f"Invalid size of {pkgidx_file}, expected {filesize}, skipped")
pkgidx_file.unlink()
continue
if hashlib.sha256(content).hexdigest() != checksum:
print(
f"Invalid checksum of {pkgidx_file}, expected {checksum}, skipped"
)
logger.error(f"Invalid checksum of {pkgidx_file}, expected {checksum}, skipped")
pkgidx_file.unlink()
continue
if pkgidx_content is None and pkgidx_file.stem == "Packages":
print(
f"getting packages index content from {pkgidx_file.name}",
flush=True,
)
logger.info(f"getting packages index content from {pkgidx_file.name}")
suffix = pkgidx_file.suffix
if suffix == ".xz":
pkgidx_content = lzma.decompress(content).decode("utf-8")
@ -250,7 +253,7 @@ def apt_mirror(
elif suffix == "":
pkgidx_content = content.decode("utf-8")
else:
print("unsupported format")
logger.error("unsupported format")
# Currently only support SHA-256 checksum, because
# "Clients may not use the MD5Sum and SHA1 fields for security purposes, and must require a SHA256 or a SHA512 field."
@ -258,7 +261,7 @@ def apt_mirror(
if line.startswith("SHA256:"):
cnt_start = True
if not cnt_start:
print("Cannot find SHA-256 checksum")
logger.error("Cannot find SHA-256 checksum")
return 1
def collect_tmp_dir():
@ -278,13 +281,13 @@ def apt_mirror(
if arch in ARCH_NO_PKGIDX:
if collect_tmp_dir() == 1:
return 1
print(f"Mirroring {base_url} {dist}, {repo}, {arch} done!")
logger.info(f"Mirroring {base_url} {dist}, {repo}, {arch} done!")
return 0
if pkgidx_content is None:
print("index is empty, failed")
logger.error("index is empty, failed")
if len(list(pkgidx_dir.glob("Packages*"))) == 0:
print(
logger.warning(
f"{pkgidx_dir/'Packages'} never existed, upstream may not provide {dist}/{repo}/{arch}, ignore this error"
)
return 0
@ -302,7 +305,7 @@ def apt_mirror(
pkg_size = int(pattern_package_size.search(pkg).group(1))
pkg_checksum = pattern_package_sha256.search(pkg).group(1)
except:
print("Failed to parse one package description", flush=True)
logger.error("Failed to parse one package description")
traceback.print_exc()
err = 1
continue
@ -316,13 +319,13 @@ def apt_mirror(
if dest_filename.suffix == ".deb":
deb_set[str(dest_filename.relative_to(dest_base_dir))] = pkg_size
if dest_filename.is_file() and dest_filename.stat().st_size == pkg_size:
print(f"Skipping {pkg_filename}, size {pkg_size}")
logger.info(f"Skipping {pkg_filename}, size {pkg_size}")
continue
pkg_url = f"{base_url}/{pkg_filename}"
dest_tmp_filename = dest_filename.with_name("._syncing_." + dest_filename.name)
for retry in range(MAX_RETRY):
print(f"downloading {pkg_url} to {dest_filename}", flush=True)
logger.info(f"downloading {pkg_url} to {dest_filename}")
# break # dry run
if check_and_download(pkg_url, dest_tmp_filename) != 0:
continue
@ -332,19 +335,19 @@ def apt_mirror(
for block in iter(lambda: f.read(1024**2), b""):
sha.update(block)
if sha.hexdigest() != pkg_checksum:
print(f"Invalid checksum of {dest_filename}, expected {pkg_checksum}")
logger.error(f"Invalid checksum of {dest_filename}, expected {pkg_checksum}")
dest_tmp_filename.unlink()
continue
dest_tmp_filename.rename(dest_filename)
break
else:
print(f"Failed to download {dest_filename}")
logger.error(f"Failed to download {dest_filename}")
err = 1
if collect_tmp_dir() == 1:
return 1
print(f"Mirroring {base_url} {dist}, {repo}, {arch} done!")
print(f"{deb_count} packages, {deb_size} bytes in total", flush=True)
logger.info(f"Mirroring {base_url} {dist}, {repo}, {arch} done!")
logger.info(f"{deb_count} packages, {deb_size} bytes in total")
return err
@ -355,15 +358,12 @@ def apt_delete_old_debs(dest_base_dir: Path, remote_set: Dict[str, int], dry_run
deleting = on_disk - remote_set.keys()
# print(on_disk)
# print(remote_set)
print(
f"Deleting {len(deleting)} packages not in the index{' (dry run)' if dry_run else ''}",
flush=True,
)
logger.info(f"Deleting {len(deleting)} packages not in the index{' (dry run)' if dry_run else ''}")
for i in deleting:
if dry_run:
print("Will delete", i)
logger.info(f"Will delete {i}")
else:
print("Deleting", i)
logger.info(f"Deleting {i}")
(dest_base_dir / i).unlink()
@ -411,6 +411,8 @@ def main():
component_lists = generate_list_for_oses(args.component, "component")
arch_lists = generate_list_for_oses(args.arch, "arch")
logger.info(f"Configuration: {os_list=}, {component_lists=}, {arch_lists=}")
args.working_dir.mkdir(parents=True, exist_ok=True)
failed = []
deb_set = {}
@ -426,7 +428,7 @@ def main():
):
failed.append((os, comp, arch))
if len(failed) > 0:
print(f"Failed APT repos of {args.base_url}: ", failed)
logger.error(f"Failed APT repos of {args.base_url}: {failed}")
return
if args.delete or args.delete_dry_run:
apt_delete_old_debs(args.working_dir, deb_set, args.delete_dry_run)