tunasync-scripts/yum-sync.py
Shengqi Chen 4bcc59b962
yum-sync: use logger instead of plain print
Signed-off-by: Shengqi Chen <harry-chen@outlook.com>
2025-09-18 21:31:41 +08:00

336 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import bz2
import gzip
import logging
import os
import re
import shutil
import sqlite3
import subprocess as sp
import tempfile
import time
import traceback
import xml.etree.ElementTree as ET
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Dict, List
import requests
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d - %(filename)s:%(lineno)d [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
handler.setFormatter(formatter)
logger.addHandler(handler)
REPO_SIZE_FILE = os.getenv("REPO_SIZE_FILE", "")
DOWNLOAD_TIMEOUT = int(os.getenv("DOWNLOAD_TIMEOUT", "1800"))
REPO_STAT = {}
OS_TEMPLATE = {
"rhel-current": ["9", "10"],
"fedora-current": ["41", "42"],
}
pattern_os_template = re.compile(r"@\{(.+)\}")
def replace_os_template(os_list: List[str]) -> List[str]:
ret = []
for i in os_list:
matched = pattern_os_template.search(i)
if matched:
for os in OS_TEMPLATE[matched.group(1)]:
ret.append(pattern_os_template.sub(os, i))
elif i.startswith("@"):
ret.extend(OS_TEMPLATE[i[1:]])
else:
ret.append(i)
return ret
def calc_repo_size(path: Path):
dbfiles = path.glob("repodata/*primary.*")
with tempfile.NamedTemporaryFile() as tmp:
dec = None
dbfile = None
for db in dbfiles:
dbfile = db
suffixes = db.suffixes
if suffixes[-1] == ".bz2":
dec = bz2.decompress
suffixes = suffixes[:-1]
elif suffixes[-1] == ".gz":
dec = gzip.decompress
suffixes = suffixes[:-1]
elif suffixes[-1] in (".sqlite", ".xml"):
dec = lambda x: x
if dec is None:
logger.error(f"Failed to read from {path}: {list(dbfiles)}")
return
with db.open("rb") as f:
tmp.write(dec(f.read()))
tmp.flush()
if suffixes[-1] == ".sqlite":
conn = sqlite3.connect(tmp.name)
c = conn.cursor()
c.execute("select sum(size_package),count(1) from packages")
size, cnt = c.fetchone()
conn.close()
elif suffixes[-1] == ".xml":
try:
tree = ET.parse(tmp.name)
root = tree.getroot()
assert root.tag.endswith("metadata")
cnt, size = 0, 0
for location in root.findall(
"./{http://linux.duke.edu/metadata/common}package/{http://linux.duke.edu/metadata/common}size"
):
size += int(location.attrib["package"])
cnt += 1
except:
traceback.print_exc()
return
else:
logger.error(f"Unknown suffix {suffixes}")
return
logger.info(f"Repository {path}:")
logger.info(f" {cnt} packages, {size} bytes in total")
global REPO_STAT
REPO_STAT[str(path)] = (size, cnt) if cnt > 0 else (0, 0) # size can be None
def check_and_download(url: str, dst_file: Path) -> int:
try:
start = time.time()
with requests.get(url, stream=True, timeout=(5, 10)) as r:
r.raise_for_status()
if "last-modified" in r.headers:
remote_ts = parsedate_to_datetime(
r.headers["last-modified"]
).timestamp()
else:
remote_ts = None
with dst_file.open("wb") as f:
for chunk in r.iter_content(chunk_size=1024**2):
if time.time() - start > DOWNLOAD_TIMEOUT:
raise TimeoutError("Download timeout")
if not chunk:
continue # filter out keep-alive new chunks
f.write(chunk)
if remote_ts is not None:
os.utime(dst_file, (remote_ts, remote_ts))
return 0
except BaseException as e:
logger.error(f"Error occurred: {e}")
if dst_file.is_file():
dst_file.unlink()
return 1
def download_repodata(url: str, path: Path) -> int:
path = path / "repodata"
path.mkdir(exist_ok=True)
oldfiles = set(path.glob("*.*"))
newfiles = set()
if check_and_download(url + "/repodata/repomd.xml", path / ".repomd.xml") != 0:
logger.error(f"Failed to download the repomd.xml of {url}")
return 1
try:
tree = ET.parse(path / ".repomd.xml")
root = tree.getroot()
assert root.tag.endswith("repomd")
for location in root.findall(
"./{http://linux.duke.edu/metadata/repo}data/{http://linux.duke.edu/metadata/repo}location"
):
href = location.attrib["href"]
assert len(href) > 9 and href[:9] == "repodata/"
fn = path / href[9:]
newfiles.add(fn)
if check_and_download(url + "/" + href, fn) != 0:
logger.error(f"Failed to download the {href}")
return 1
except BaseException as e:
traceback.print_exc()
return 1
(path / ".repomd.xml").rename(path / "repomd.xml") # update the repomd.xml
newfiles.add(path / "repomd.xml")
for i in oldfiles - newfiles:
logger.info(f"Deleting old files: {i}")
i.unlink()
def check_args(prop: str, lst: List[str]):
for s in lst:
if len(s) == 0 or " " in s:
raise ValueError(f"Invalid item in {prop}: {repr(s)}")
def substitute_vars(s: str, vardict: Dict[str, str]) -> str:
for key, val in vardict.items():
tpl = "@{" + key + "}"
s = s.replace(tpl, val)
return s
def main():
parser = argparse.ArgumentParser()
parser.add_argument("base_url", type=str, help="base URL")
parser.add_argument("os_version", type=str, help="e.g. 7-8,9")
parser.add_argument(
"component", type=str, help="e.g. mysql56-community,mysql57-community"
)
parser.add_argument("arch", type=str, help="e.g. x86_64,aarch64")
parser.add_argument("repo_name", type=str, help="e.g. @{comp}-el@{os_ver}")
parser.add_argument("working_dir", type=Path, help="working directory")
parser.add_argument(
"--download-repodata",
action="store_true",
help="download repodata files instead of generating them",
)
parser.add_argument(
"--pass-arch-to-reposync",
action="store_true",
help="""pass --arch to reposync to further filter packages by 'arch' field in metadata (NOT recommended, prone to missing packages in some repositories, e.g. mysql)""",
)
args = parser.parse_args()
raw_os_list = args.os_version.split(",")
raw_os_list = replace_os_template(raw_os_list)
os_list = []
for os_version in raw_os_list:
if "-" in os_version and "-stream" not in os_version:
dash = os_version.index("-")
os_list = os_list + [
str(i)
for i in range(int(os_version[:dash]), 1 + int(os_version[dash + 1 :]))
]
else:
os_list.append(os_version)
check_args("os_version", os_list)
component_list = args.component.split(",")
check_args("component", component_list)
arch_list = args.arch.split(",")
check_args("arch", arch_list)
logger.info(f"Configuration: {os_list=}, {component_list=}, {arch_list=}")
failed = []
args.working_dir.mkdir(parents=True, exist_ok=True)
cache_dir = tempfile.mkdtemp()
def combination_os_comp(arch: str):
for os in os_list:
for comp in component_list:
vardict = {
"arch": arch,
"os_ver": os,
"comp": comp,
}
name = substitute_vars(args.repo_name, vardict)
url = substitute_vars(args.base_url, vardict)
try:
probe_url = (
url + ("" if url.endswith("/") else "/") + "repodata/repomd.xml"
)
r = requests.head(probe_url, timeout=(7, 7))
if r.status_code < 400 or r.status_code == 403:
yield (name, url)
else:
logger.warning(f"{probe_url} -> {r.status_code}")
except:
traceback.print_exc()
for arch in arch_list:
dest_dirs = []
conf = tempfile.NamedTemporaryFile("w", suffix=".conf")
conf.write(
"""
[main]
keepcache=0
"""
)
for name, url in combination_os_comp(arch):
conf.write(
f"""
[{name}]
name={name}
baseurl={url}
repo_gpgcheck=0
gpgcheck=0
enabled=1
"""
)
dst = (args.working_dir / name).absolute()
dst.mkdir(parents=True, exist_ok=True)
dest_dirs.append(dst)
conf.flush()
# sp.run(["cat", conf.name])
# sp.run(["ls", "-la", cache_dir])
if len(dest_dirs) == 0:
logger.info("Nothing to sync")
failed.append(("", arch))
continue
cmd_args = [
"dnf",
"reposync",
"-c",
conf.name,
"--delete",
"-p",
str(args.working_dir.absolute()),
]
if args.pass_arch_to_reposync:
cmd_args += ["--arch", arch]
logger.info(f"Launching dnf reposync with command: {cmd_args}")
ret = sp.run(cmd_args)
if ret.returncode != 0:
failed.append((name, arch))
continue
for path in dest_dirs:
path.mkdir(exist_ok=True)
if args.download_repodata:
download_repodata(url, path)
else:
shutil.rmtree(".repodata", True)
cmd_args = [
"createrepo_c",
"--update",
"-v",
"-c",
cache_dir,
"-o",
str(path),
str(path),
]
logger.info(f"Launching createrepo with command: {cmd_args}")
ret = sp.run(cmd_args)
calc_repo_size(path)
if len(failed) > 0:
logger.error(f"Failed YUM repos: {failed}")
else:
if len(REPO_SIZE_FILE) > 0:
with open(REPO_SIZE_FILE, "a") as fd:
total_size = sum([r[0] for r in REPO_STAT.values()])
fd.write(f"+{total_size}")
if __name__ == "__main__":
main()