From 8186bac8e28fcfcee03353449329a2e6a72fd0c8 Mon Sep 17 00:00:00 2001 From: Shengqi Chen Date: Wed, 10 Sep 2025 01:14:31 +0800 Subject: [PATCH] pub-mirror: initial version that works [ci skip] Signed-off-by: Shengqi Chen --- pub-mirror.py | 216 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100755 pub-mirror.py diff --git a/pub-mirror.py b/pub-mirror.py new file mode 100755 index 0000000..3e26168 --- /dev/null +++ b/pub-mirror.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 + +import concurrent.futures +import hashlib +import json +import logging +import os +import tempfile +from pathlib import Path +from typing import Optional + +import requests + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s" +) +handler.setFormatter(formatter) +logger.addHandler(handler) + +BASE_URL = os.getenv("TUNASYNC_UPSTREAM_URL", "https://pub.dev") +WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR") +MIRROR_URL = os.getenv("MIRROR_URL", "https://mirrors.tuna.tsinghua.edu.cn/dart-pub") +REPOS = [] + + +# wrap around requests.get to use token if available +def get_with_token(*args, **kwargs): + headers = kwargs["headers"] if "headers" in kwargs else {} + if "PUB_TOKEN" in os.environ: + headers["Authorization"] = "Bearer {}".format(os.environ["PUB_TOKEN"]) + kwargs["headers"] = headers + return requests.get(*args, **kwargs) + + +def do_download(remote_url: str, dst_file: Path, sha256: Optional[str] = None): + # NOTE the stream=True parameter below + with get_with_token(remote_url, stream=True) as r: + r.raise_for_status() + tmp_dst_file = None + try: + downloaded_sha256 = hashlib.sha256() + with tempfile.NamedTemporaryFile( + prefix="." + dst_file.name + ".", + suffix=".tmp", + dir=dst_file.parent, + delete=False, + ) as f: + tmp_dst_file = Path(f.name) + for chunk in r.iter_content(chunk_size=1024**2): + if chunk: # filter out keep-alive new chunks + f.write(chunk) + downloaded_sha256.update(chunk) + # f.flush() + # check for downloaded sha256 + if sha256 and sha256 != downloaded_sha256.hexdigest(): + raise Exception( + f"File {dst_file.as_posix()} sha256 mismatch: downloaded {downloaded_sha256.hexdigest()}, expected {sha256}" + ) + tmp_dst_file.chmod(0o644) + tmp_dst_file.replace(dst_file) + finally: + if tmp_dst_file is not None: + if tmp_dst_file.is_file(): + tmp_dst_file.unlink() + + +def download_pkg_ver( + pkg_name: str, working_dir: Path, ver: str, url: str, sha256: str +) -> bool: + # download archive file to /packages//versions/.tar.gz + dst_file = working_dir / "packages" / pkg_name / "versions" / f"{ver}.tar.gz" + if not dst_file.is_file(): + dst_file.parent.mkdir(parents=True, exist_ok=True) + logger.info(f"Downloading {url} to {dst_file.as_posix()}") + try: + do_download(url, dst_file, sha256=sha256) + return True + except Exception as e: + logger.error(f"Failed to download {url} to {dst_file.as_posix()}: {e}") + return False + else: + logger.info(f"File {dst_file.as_posix()} already exists, skipping download") + return True + + +# https://github.com/dart-lang/pub/blob/master/doc/repository-spec-v2.md#list-all-versions-of-a-package +def handle_pkg( + executor: concurrent.futures.ThreadPoolExecutor, + base_url: str, + pkg_name: str, + working_dir: Path, + mirror_url: str, +) -> bool: + + logger.info(f"Handling package {pkg_name}...") + # fetch metadata from upstream + pkgUrl = base_url + "/api/packages/" + pkg_name + req = get_with_token(pkgUrl, headers={"Accept": "application/vnd.pub.v2+json"}) + req.raise_for_status() + resp = req.json() + + download_tasks = [] + latest_ver = resp["latest"]["version"] + + for ver in resp["versions"]: + logger.debug(f'Checking {pkg_name}=={ver["version"]}') + if ver.get("retracted", False): + logger.info(f'Skipping retracted version {pkg_name}=={ver["version"]}') + continue + download_tasks.append( + ( + pkg_name, + working_dir, + ver["version"], + ver["archive_url"], + ver["archive_sha256"], + ) + ) + # replace URL in metadata with our mirror URL + cur_ver = ver["version"] + serving_url = f"{mirror_url}/packages/{pkg_name}/versions/{cur_ver}.tar.gz" + ver["archive_url"] = serving_url + if cur_ver == latest_ver: + resp["latest"] = ver + + # save modified metadata to api/packages//meta.json + modified_meta_str = json.dumps(ver) + meta_on_disk = working_dir / "api" / "packages" / pkg_name / "meta.json" + # fast path: check if meta.json exists and has the same size + if not meta_on_disk.is_file() or meta_on_disk.stat().st_size != len( + modified_meta_str + ): + logger.info( + f"Metadata for package {pkg_name} is outdated or missing, updating..." + ) + # download all versions concurrently + results = list(executor.map(lambda p: download_pkg_ver(*p), download_tasks)) + if not all(results): + logger.error( + f"Failed to download some versions of package {pkg_name}, skipping metadata update" + ) + return False + meta_on_disk.parent.mkdir(parents=True, exist_ok=True) + with open(meta_on_disk, "w", encoding="utf-8") as f: + f.write(modified_meta_str) + f.flush() + else: + logger.info( + f"Metadata for package {pkg_name} is up to date (latest {latest_ver}), skipping" + ) + + return True + + +def main(): + + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--base-url", default=BASE_URL) + parser.add_argument("--mirror-url", default=MIRROR_URL) + parser.add_argument("--working-dir", default=WORKING_DIR) + parser.add_argument( + "--workers", default=1, type=int, help="number of concurrent downloading jobs" + ) + # parser.add_argument("--fast-skip", action='store_true', + # help='do not verify sha256 of existing files') + args = parser.parse_args() + + if args.working_dir is None: + raise Exception("Working Directory is None") + + working_dir = Path(args.working_dir) + base_url = args.base_url + mirror_url = MIRROR_URL + + logger.info(f"Using upstream URL: {base_url}") + logger.info(f"Using mirror URL: {mirror_url}") + logger.info(f"Using working directory: {working_dir.as_posix()}") + logger.info(f"Using {args.workers} workers") + + pkg_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) + download_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) + + # iterate through all packages + pkgs_url = base_url + "/api/package-names" + pkg_futures = [] + while True: + req = get_with_token(pkgs_url, headers={"Accept-Encoding": "gzip"}) + req.raise_for_status() + resp = req.json() + + for pkg in resp["packages"][:10]: + pkg_futures.append( + pkg_executor.submit( + handle_pkg, + download_executor, + base_url, + pkg, + working_dir, + mirror_url, + ) + ) + + # null means no more pages + if not (pkgs_url := resp["nextUrl"]): + break + + pkg_executor.shutdown(wait=True) + + +if __name__ == "__main__": + main()