pub-mirror: initial version that works [ci skip]

Signed-off-by: Shengqi Chen <harry-chen@outlook.com>
This commit is contained in:
Shengqi Chen 2025-09-10 01:14:31 +08:00
parent 4a6bccd012
commit 8186bac8e2
No known key found for this signature in database

216
pub-mirror.py Executable file
View File

@ -0,0 +1,216 @@
#!/usr/bin/env python3
import concurrent.futures
import hashlib
import json
import logging
import os
import tempfile
from pathlib import Path
from typing import Optional
import requests
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
BASE_URL = os.getenv("TUNASYNC_UPSTREAM_URL", "https://pub.dev")
WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR")
MIRROR_URL = os.getenv("MIRROR_URL", "https://mirrors.tuna.tsinghua.edu.cn/dart-pub")
REPOS = []
# wrap around requests.get to use token if available
def get_with_token(*args, **kwargs):
headers = kwargs["headers"] if "headers" in kwargs else {}
if "PUB_TOKEN" in os.environ:
headers["Authorization"] = "Bearer {}".format(os.environ["PUB_TOKEN"])
kwargs["headers"] = headers
return requests.get(*args, **kwargs)
def do_download(remote_url: str, dst_file: Path, sha256: Optional[str] = None):
# NOTE the stream=True parameter below
with get_with_token(remote_url, stream=True) as r:
r.raise_for_status()
tmp_dst_file = None
try:
downloaded_sha256 = hashlib.sha256()
with tempfile.NamedTemporaryFile(
prefix="." + dst_file.name + ".",
suffix=".tmp",
dir=dst_file.parent,
delete=False,
) as f:
tmp_dst_file = Path(f.name)
for chunk in r.iter_content(chunk_size=1024**2):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
downloaded_sha256.update(chunk)
# f.flush()
# check for downloaded sha256
if sha256 and sha256 != downloaded_sha256.hexdigest():
raise Exception(
f"File {dst_file.as_posix()} sha256 mismatch: downloaded {downloaded_sha256.hexdigest()}, expected {sha256}"
)
tmp_dst_file.chmod(0o644)
tmp_dst_file.replace(dst_file)
finally:
if tmp_dst_file is not None:
if tmp_dst_file.is_file():
tmp_dst_file.unlink()
def download_pkg_ver(
pkg_name: str, working_dir: Path, ver: str, url: str, sha256: str
) -> bool:
# download archive file to /packages/<pkg>/versions/<version>.tar.gz
dst_file = working_dir / "packages" / pkg_name / "versions" / f"{ver}.tar.gz"
if not dst_file.is_file():
dst_file.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Downloading {url} to {dst_file.as_posix()}")
try:
do_download(url, dst_file, sha256=sha256)
return True
except Exception as e:
logger.error(f"Failed to download {url} to {dst_file.as_posix()}: {e}")
return False
else:
logger.info(f"File {dst_file.as_posix()} already exists, skipping download")
return True
# https://github.com/dart-lang/pub/blob/master/doc/repository-spec-v2.md#list-all-versions-of-a-package
def handle_pkg(
executor: concurrent.futures.ThreadPoolExecutor,
base_url: str,
pkg_name: str,
working_dir: Path,
mirror_url: str,
) -> bool:
logger.info(f"Handling package {pkg_name}...")
# fetch metadata from upstream
pkgUrl = base_url + "/api/packages/" + pkg_name
req = get_with_token(pkgUrl, headers={"Accept": "application/vnd.pub.v2+json"})
req.raise_for_status()
resp = req.json()
download_tasks = []
latest_ver = resp["latest"]["version"]
for ver in resp["versions"]:
logger.debug(f'Checking {pkg_name}=={ver["version"]}')
if ver.get("retracted", False):
logger.info(f'Skipping retracted version {pkg_name}=={ver["version"]}')
continue
download_tasks.append(
(
pkg_name,
working_dir,
ver["version"],
ver["archive_url"],
ver["archive_sha256"],
)
)
# replace URL in metadata with our mirror URL
cur_ver = ver["version"]
serving_url = f"{mirror_url}/packages/{pkg_name}/versions/{cur_ver}.tar.gz"
ver["archive_url"] = serving_url
if cur_ver == latest_ver:
resp["latest"] = ver
# save modified metadata to api/packages/<pkg>/meta.json
modified_meta_str = json.dumps(ver)
meta_on_disk = working_dir / "api" / "packages" / pkg_name / "meta.json"
# fast path: check if meta.json exists and has the same size
if not meta_on_disk.is_file() or meta_on_disk.stat().st_size != len(
modified_meta_str
):
logger.info(
f"Metadata for package {pkg_name} is outdated or missing, updating..."
)
# download all versions concurrently
results = list(executor.map(lambda p: download_pkg_ver(*p), download_tasks))
if not all(results):
logger.error(
f"Failed to download some versions of package {pkg_name}, skipping metadata update"
)
return False
meta_on_disk.parent.mkdir(parents=True, exist_ok=True)
with open(meta_on_disk, "w", encoding="utf-8") as f:
f.write(modified_meta_str)
f.flush()
else:
logger.info(
f"Metadata for package {pkg_name} is up to date (latest {latest_ver}), skipping"
)
return True
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--base-url", default=BASE_URL)
parser.add_argument("--mirror-url", default=MIRROR_URL)
parser.add_argument("--working-dir", default=WORKING_DIR)
parser.add_argument(
"--workers", default=1, type=int, help="number of concurrent downloading jobs"
)
# parser.add_argument("--fast-skip", action='store_true',
# help='do not verify sha256 of existing files')
args = parser.parse_args()
if args.working_dir is None:
raise Exception("Working Directory is None")
working_dir = Path(args.working_dir)
base_url = args.base_url
mirror_url = MIRROR_URL
logger.info(f"Using upstream URL: {base_url}")
logger.info(f"Using mirror URL: {mirror_url}")
logger.info(f"Using working directory: {working_dir.as_posix()}")
logger.info(f"Using {args.workers} workers")
pkg_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers)
download_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers)
# iterate through all packages
pkgs_url = base_url + "/api/package-names"
pkg_futures = []
while True:
req = get_with_token(pkgs_url, headers={"Accept-Encoding": "gzip"})
req.raise_for_status()
resp = req.json()
for pkg in resp["packages"][:10]:
pkg_futures.append(
pkg_executor.submit(
handle_pkg,
download_executor,
base_url,
pkg,
working_dir,
mirror_url,
)
)
# null means no more pages
if not (pkgs_url := resp["nextUrl"]):
break
pkg_executor.shutdown(wait=True)
if __name__ == "__main__":
main()