#!/usr/bin/env python3 import concurrent.futures import hashlib import json import logging import os import shutil import tempfile from pathlib import Path from typing import Optional from datetime import datetime import requests logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s" ) handler.setFormatter(formatter) logger.addHandler(handler) BASE_URL = os.getenv("TUNASYNC_UPSTREAM_URL", "https://pub.dev") WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR") MIRROR_URL = os.getenv("MIRROR_BASE_URL", "https://mirrors.tuna.tsinghua.edu.cn/dart-pub") REPOS = [] UA = 'tuna-pub-mirror/0.0 (+https://github.com/tuna/tunasync-scripts)' # wrap around requests.get to use token if available def get_with_token(*args, **kwargs): headers = kwargs["headers"] if "headers" in kwargs else {} if "PUB_TOKEN" in os.environ: headers["Authorization"] = "Bearer {}".format(os.environ["PUB_TOKEN"]) headers["User-Agent"] = UA kwargs["headers"] = headers return requests.get(*args, **kwargs) def do_download(remote_url: str, dst_file: Path, time: datetime, sha256: str): # NOTE the stream=True parameter below with get_with_token(remote_url, stream=True) as r: r.raise_for_status() tmp_dst_file = None try: downloaded_sha256 = hashlib.sha256() with tempfile.NamedTemporaryFile( prefix="." + dst_file.name + ".", suffix=".tmp", dir=dst_file.parent, delete=False, ) as f: tmp_dst_file = Path(f.name) for chunk in r.iter_content(chunk_size=1024**2): if chunk: # filter out keep-alive new chunks f.write(chunk) downloaded_sha256.update(chunk) # f.flush() # check for downloaded sha256 if sha256 != downloaded_sha256.hexdigest(): raise Exception( f"File {dst_file.as_posix()} sha256 mismatch: downloaded {downloaded_sha256.hexdigest()}, expected {sha256}" ) tmp_dst_file.chmod(0o644) tmp_dst_file.replace(dst_file) os.utime(dst_file, (time.timestamp(), time.timestamp())) # access and modified time finally: if tmp_dst_file is not None: if tmp_dst_file.is_file(): tmp_dst_file.unlink() def download_pkg_ver( pkg_name: str, working_dir: Path, ver: str, url: str, time: datetime, sha256: str ) -> bool: # download archive file to /packages//versions/.tar.gz dst_file = working_dir / "packages" / pkg_name / "versions" / f"{ver}.tar.gz" if not dst_file.is_file(): dst_file.parent.mkdir(parents=True, exist_ok=True) logger.info(f"Downloading {url} to {dst_file.as_posix()}") try: do_download(url, dst_file, time, sha256) return True except Exception as e: logger.error(f"Failed to download {url} to {dst_file.as_posix()}: {e}") return False else: logger.info(f"File {dst_file.as_posix()} already exists, skipping download") return True def _from_published_time(published: Optional[str]) -> datetime: if published: return datetime.fromisoformat(published.replace("Z", "+00:00")) else: return datetime.now() # https://github.com/dart-lang/pub/blob/master/doc/repository-spec-v2.md#list-all-versions-of-a-package def handle_pkg( executor: concurrent.futures.ThreadPoolExecutor, base_url: str, pkg_name: str, working_dir: Path, mirror_url: str, clean: bool, ) -> bool: logger.info(f"Handling package {pkg_name}...") # fetch metadata from upstream pkgUrl = base_url + "/api/packages/" + pkg_name req = get_with_token(pkgUrl, headers={"Accept": "application/vnd.pub.v2+json"}, timeout=5) req.raise_for_status() resp = req.json() download_tasks = [] latest_ver = resp["latest"]["version"] latest_time = _from_published_time(resp["latest"].get("published")) for ver in resp["versions"]: logger.debug(f'Checking {pkg_name}=={ver["version"]}') ver_time = _from_published_time(ver.get("published")) if "advisoriesUpdated" in ver: del ver["advisoriesUpdated"] # not supported if ver.get("retracted", False): logger.info(f'Skipping retracted version {pkg_name}=={ver["version"]}') dst_file = working_dir / "packages" / pkg_name / "versions" / f'{ver["version"]}.tar.gz' dst_file.unlink(missing_ok=True) continue download_tasks.append( ( pkg_name, working_dir, ver["version"], ver["archive_url"], ver_time, ver["archive_sha256"], ) ) # replace URL in metadata with our mirror URL cur_ver = ver["version"] serving_url = f"{mirror_url}/packages/{pkg_name}/versions/{cur_ver}.tar.gz" ver["archive_url"] = serving_url if cur_ver == latest_ver: resp["latest"] = ver # clean up obsolete versions if needed if clean: all_versions = [ver["version"] for ver in resp["versions"] if not ver.get("retracted", False)] versions_dir = working_dir / "packages" / pkg_name / "versions" if versions_dir.is_dir(): for f in versions_dir.iterdir(): if f.is_file() and f.suffix == ".gz": local_ver = f.name.removesuffix(".tar.gz") if local_ver not in all_versions: logger.info(f"Removing obsolete pkg file {f.as_posix()}") f.unlink(missing_ok=True) # save modified metadata to api/packages//meta.json modified_meta_str = json.dumps(resp) meta_on_disk = working_dir / "api" / "packages" / pkg_name / "meta.json" # fast path: check if meta.json exists and has the same size if not meta_on_disk.is_file() or meta_on_disk.stat().st_size != len( modified_meta_str ): logger.info( f"Metadata for package {pkg_name} is outdated or missing, updating..." ) # download all versions concurrently results = list(executor.map(lambda p: download_pkg_ver(*p), download_tasks)) if not all(results): logger.error( f"Failed to download some versions of package {pkg_name}, skipping metadata update" ) return False meta_on_disk.parent.mkdir(parents=True, exist_ok=True) with open(meta_on_disk, "w", encoding="utf-8") as f: f.write(modified_meta_str) f.flush() else: logger.info( f"Metadata for package {pkg_name} is up to date (latest {latest_ver}), skipping" ) return True def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--base-url", default=BASE_URL) parser.add_argument("--mirror-url", default=MIRROR_URL) parser.add_argument("--working-dir", default=WORKING_DIR) parser.add_argument( "--workers", default=1, type=int, help="number of concurrent downloading jobs" ) parser.add_argument( "--clean", action='store_true', help="remove obsolete package versions that are no longer in upstream", ) # parser.add_argument("--fast-skip", action='store_true', # help='do not verify sha256 of existing files') args = parser.parse_args() if args.working_dir is None: raise Exception("Working Directory is None") working_dir = Path(args.working_dir) base_url = args.base_url mirror_url = MIRROR_URL clean = args.clean logger.info(f"Using upstream URL: {base_url}") logger.info(f"Using mirror URL: {mirror_url}") logger.info(f"Using working directory: {working_dir.as_posix()}") logger.info(f"Using {args.workers} workers") logger.info(f"Clean obsolete packages: {'Yes' if clean else 'No'}") pkg_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) download_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) # iterate through all packages pkgs_url = base_url + "/api/package-names" pkg_futures = [] all_pkgs = [] while True: req = get_with_token(pkgs_url, headers={"Accept-Encoding": "gzip"}, timeout=5) req.raise_for_status() resp = req.json() for pkg in resp["packages"]: pkg_futures.append( pkg_executor.submit( handle_pkg, download_executor, base_url, pkg, working_dir, mirror_url, clean, ) ) all_pkgs.append(pkg) # null means no more pages if not (pkgs_url := resp["nextUrl"]): break # wait for all packages to be handled for f in concurrent.futures.as_completed(pkg_futures): try: f.result() except Exception as e: logger.error(f"Error handling package: {e}") pkg_executor.shutdown(wait=True) download_executor.shutdown(wait=True) if clean: # clean up obsolete packages pkgs_dir = working_dir / "packages" if pkgs_dir.is_dir(): for p in pkgs_dir.iterdir(): if p.is_dir(): pkg_name = p.name if pkg_name not in all_pkgs: logger.info(f"Removing obsolete package {pkg_name}") shutil.rmtree(p, ignore_errors=True) if __name__ == "__main__": main()