tunasync-scripts/pub-mirror.py
Shengqi Chen 1506a1dd70
Some checks failed
docker-images / multi (., tunasync-scripts, ubuntu-24.04-arm) (push) Has been cancelled
docker-images / multi (., tunasync-scripts, ubuntu-latest) (push) Has been cancelled
docker-images / multi (ftpsync, ubuntu-24.04-arm) (push) Has been cancelled
docker-images / multi (ftpsync, ubuntu-latest) (push) Has been cancelled
docker-images / multi (nix-channels, ubuntu-24.04-arm) (push) Has been cancelled
docker-images / multi (nix-channels, ubuntu-latest) (push) Has been cancelled
docker-images / multi (rubygems-mirror, ubuntu-24.04-arm) (push) Has been cancelled
docker-images / multi (rubygems-mirror, ubuntu-latest) (push) Has been cancelled
docker-images / multi (rustup-mirror, ubuntu-24.04-arm) (push) Has been cancelled
docker-images / multi (rustup-mirror, ubuntu-latest) (push) Has been cancelled
docker-images / multi (shadowmire, ubuntu-24.04-arm) (push) Has been cancelled
docker-images / multi (shadowmire, ubuntu-latest) (push) Has been cancelled
docker-images / multi (tsumugu, ubuntu-24.04-arm) (push) Has been cancelled
docker-images / multi (tsumugu, ubuntu-latest) (push) Has been cancelled
docker-images / merge (push) Has been cancelled
dart: set access time of existing files
Signed-off-by: Shengqi Chen <harry-chen@outlook.com>
2025-09-10 02:26:01 +08:00

277 lines
9.9 KiB
Python
Executable File

#!/usr/bin/env python3
import concurrent.futures
import hashlib
import json
import logging
import os
import shutil
import tempfile
from pathlib import Path
from typing import Optional
from datetime import datetime
import requests
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
BASE_URL = os.getenv("TUNASYNC_UPSTREAM_URL", "https://pub.dev")
WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR")
MIRROR_URL = os.getenv("MIRROR_BASE_URL", "https://mirrors.tuna.tsinghua.edu.cn/dart-pub")
REPOS = []
UA = 'tuna-pub-mirror/0.0 (+https://github.com/tuna/tunasync-scripts)'
# wrap around requests.get to use token if available
def get_with_token(*args, **kwargs):
headers = kwargs["headers"] if "headers" in kwargs else {}
if "PUB_TOKEN" in os.environ:
headers["Authorization"] = "Bearer {}".format(os.environ["PUB_TOKEN"])
headers["User-Agent"] = UA
kwargs["headers"] = headers
return requests.get(*args, **kwargs)
def do_download(remote_url: str, dst_file: Path, time: datetime, sha256: str):
# NOTE the stream=True parameter below
with get_with_token(remote_url, stream=True) as r:
r.raise_for_status()
tmp_dst_file = None
try:
downloaded_sha256 = hashlib.sha256()
with tempfile.NamedTemporaryFile(
prefix="." + dst_file.name + ".",
suffix=".tmp",
dir=dst_file.parent,
delete=False,
) as f:
tmp_dst_file = Path(f.name)
for chunk in r.iter_content(chunk_size=1024**2):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
downloaded_sha256.update(chunk)
# f.flush()
# check for downloaded sha256
if sha256 != downloaded_sha256.hexdigest():
raise Exception(
f"File {dst_file.as_posix()} sha256 mismatch: downloaded {downloaded_sha256.hexdigest()}, expected {sha256}"
)
tmp_dst_file.chmod(0o644)
tmp_dst_file.replace(dst_file)
os.utime(dst_file, (time.timestamp(), time.timestamp())) # access and modified time
finally:
if tmp_dst_file is not None:
if tmp_dst_file.is_file():
tmp_dst_file.unlink()
def download_pkg_ver(
pkg_name: str, working_dir: Path, ver: str, url: str, time: datetime, sha256: str
) -> bool:
# download archive file to /packages/<pkg>/versions/<version>.tar.gz
dst_file = working_dir / "packages" / pkg_name / "versions" / f"{ver}.tar.gz"
if not dst_file.is_file():
dst_file.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Downloading {url} to {dst_file.as_posix()}")
try:
do_download(url, dst_file, time, sha256)
return True
except Exception as e:
logger.error(f"Failed to download {url} to {dst_file.as_posix()}: {e}")
return False
else:
os.utime(dst_file, (time.timestamp(), time.timestamp())) # update access and modified time
logger.info(f"File {dst_file.as_posix()} already exists, skipping download")
return True
def _from_published_time(published: Optional[str]) -> datetime:
if published:
return datetime.fromisoformat(published.replace("Z", "+00:00"))
else:
return datetime.now()
# https://github.com/dart-lang/pub/blob/master/doc/repository-spec-v2.md#list-all-versions-of-a-package
def handle_pkg(
executor: concurrent.futures.ThreadPoolExecutor,
base_url: str,
pkg_name: str,
working_dir: Path,
mirror_url: str,
clean: bool,
) -> bool:
logger.info(f"Handling package {pkg_name}...")
# fetch metadata from upstream
pkgUrl = base_url + "/api/packages/" + pkg_name
req = get_with_token(pkgUrl, headers={"Accept": "application/vnd.pub.v2+json"}, timeout=5)
req.raise_for_status()
resp = req.json()
download_tasks = []
latest_ver = resp["latest"]["version"]
latest_time = _from_published_time(resp["latest"].get("published"))
for ver in resp["versions"]:
logger.debug(f'Checking {pkg_name}=={ver["version"]}')
ver_time = _from_published_time(ver.get("published"))
if "advisoriesUpdated" in ver:
del ver["advisoriesUpdated"] # not supported
if ver.get("retracted", False):
logger.info(f'Skipping retracted version {pkg_name}=={ver["version"]}')
dst_file = working_dir / "packages" / pkg_name / "versions" / f'{ver["version"]}.tar.gz'
dst_file.unlink(missing_ok=True)
continue
download_tasks.append(
(
pkg_name,
working_dir,
ver["version"],
ver["archive_url"],
ver_time,
ver["archive_sha256"],
)
)
# replace URL in metadata with our mirror URL
cur_ver = ver["version"]
serving_url = f"{mirror_url}/packages/{pkg_name}/versions/{cur_ver}.tar.gz"
ver["archive_url"] = serving_url
if cur_ver == latest_ver:
resp["latest"] = ver
# clean up obsolete versions if needed
if clean:
all_versions = [ver["version"] for ver in resp["versions"] if not ver.get("retracted", False)]
versions_dir = working_dir / "packages" / pkg_name / "versions"
if versions_dir.is_dir():
for f in versions_dir.iterdir():
if f.is_file() and f.suffix == ".gz":
local_ver = f.name.removesuffix(".tar.gz")
if local_ver not in all_versions:
logger.info(f"Removing obsolete pkg file {f.as_posix()}")
f.unlink(missing_ok=True)
# save modified metadata to api/packages/<pkg>/meta.json
modified_meta_str = json.dumps(resp)
meta_on_disk = working_dir / "api" / "packages" / pkg_name / "meta.json"
# fast path: check if meta.json exists and has the same size
if not meta_on_disk.is_file() or meta_on_disk.stat().st_size != len(
modified_meta_str
):
logger.info(
f"Metadata for package {pkg_name} is outdated or missing, updating..."
)
# download all versions concurrently
results = list(executor.map(lambda p: download_pkg_ver(*p), download_tasks))
if not all(results):
logger.error(
f"Failed to download some versions of package {pkg_name}, skipping metadata update"
)
return False
meta_on_disk.parent.mkdir(parents=True, exist_ok=True)
with open(meta_on_disk, "w", encoding="utf-8") as f:
f.write(modified_meta_str)
f.flush()
else:
logger.info(
f"Metadata for package {pkg_name} is up to date (latest {latest_ver}), skipping"
)
return True
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--base-url", default=BASE_URL)
parser.add_argument("--mirror-url", default=MIRROR_URL)
parser.add_argument("--working-dir", default=WORKING_DIR)
parser.add_argument(
"--workers", default=1, type=int, help="number of concurrent downloading jobs"
)
parser.add_argument(
"--clean",
action='store_true',
help="remove obsolete package versions that are no longer in upstream",
)
# parser.add_argument("--fast-skip", action='store_true',
# help='do not verify sha256 of existing files')
args = parser.parse_args()
if args.working_dir is None:
raise Exception("Working Directory is None")
working_dir = Path(args.working_dir)
base_url = args.base_url
mirror_url = MIRROR_URL
clean = args.clean
logger.info(f"Using upstream URL: {base_url}")
logger.info(f"Using mirror URL: {mirror_url}")
logger.info(f"Using working directory: {working_dir.as_posix()}")
logger.info(f"Using {args.workers} workers")
logger.info(f"Clean obsolete packages: {'Yes' if clean else 'No'}")
pkg_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers)
download_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers)
# iterate through all packages
pkgs_url = base_url + "/api/package-names"
pkg_futures = []
all_pkgs = []
while True:
req = get_with_token(pkgs_url, headers={"Accept-Encoding": "gzip"}, timeout=5)
req.raise_for_status()
resp = req.json()
for pkg in resp["packages"]:
pkg_futures.append(
pkg_executor.submit(
handle_pkg,
download_executor,
base_url,
pkg,
working_dir,
mirror_url,
clean,
)
)
all_pkgs.append(pkg)
# null means no more pages
if not (pkgs_url := resp["nextUrl"]):
break
# wait for all packages to be handled
for f in concurrent.futures.as_completed(pkg_futures):
try:
f.result()
except Exception as e:
logger.error(f"Error handling package: {e}")
pkg_executor.shutdown(wait=True)
download_executor.shutdown(wait=True)
if clean:
# clean up obsolete packages
pkgs_dir = working_dir / "packages"
if pkgs_dir.is_dir():
for p in pkgs_dir.iterdir():
if p.is_dir():
pkg_name = p.name
if pkg_name not in all_pkgs:
logger.info(f"Removing obsolete package {pkg_name}")
shutil.rmtree(p, ignore_errors=True)
if __name__ == "__main__":
main()