diff --git a/pub-mirror.py b/pub-mirror.py index d6e7483..9f0048c 100755 --- a/pub-mirror.py +++ b/pub-mirror.py @@ -17,16 +17,19 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter( - "%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s" + "%(asctime)s.%(msecs)03d - %(filename)s:%(lineno)d [%(levelname)s] %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", ) handler.setFormatter(formatter) logger.addHandler(handler) BASE_URL = os.getenv("TUNASYNC_UPSTREAM_URL", "https://pub.dev") WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR") -MIRROR_URL = os.getenv("MIRROR_BASE_URL", "https://mirrors.tuna.tsinghua.edu.cn/dart-pub") +MIRROR_URL = os.getenv( + "MIRROR_BASE_URL", "https://mirrors.tuna.tsinghua.edu.cn/dart-pub" +) REPOS = [] -UA = 'tuna-pub-mirror/0.0 (+https://github.com/tuna/tunasync-scripts)' +UA = "tuna-pub-mirror/0.0 (+https://github.com/tuna/tunasync-scripts)" # wrap around requests.get to use token if available @@ -39,7 +42,7 @@ def get_with_token(*args, **kwargs): return requests.get(*args, **kwargs) -def do_download(remote_url: str, dst_file: Path, time: datetime, sha256: str): +def download_file(remote_url: str, dst_file: Path, mtime: datetime, sha256: str): # NOTE the stream=True parameter below with get_with_token(remote_url, stream=True) as r: r.raise_for_status() @@ -65,7 +68,9 @@ def do_download(remote_url: str, dst_file: Path, time: datetime, sha256: str): ) tmp_dst_file.chmod(0o644) tmp_dst_file.replace(dst_file) - os.utime(dst_file, (time.timestamp(), time.timestamp())) # access and modified time + os.utime( + dst_file, (mtime.timestamp(), mtime.timestamp()) + ) # access and modified time finally: if tmp_dst_file is not None: if tmp_dst_file.is_file(): @@ -73,7 +78,13 @@ def do_download(remote_url: str, dst_file: Path, time: datetime, sha256: str): def download_pkg_ver( - pkg_name: str, working_dir: Path, ver: str, url: str, time: datetime, sha256: str + pkg_name: str, + working_dir: Path, + ver: str, + url: str, + time: datetime, + sha256: str, + verify: bool, ) -> bool: # download archive file to /packages//versions/.tar.gz dst_file = working_dir / "packages" / pkg_name / "versions" / f"{ver}.tar.gz" @@ -81,22 +92,41 @@ def download_pkg_ver( dst_file.parent.mkdir(parents=True, exist_ok=True) logger.info(f"Downloading {url} to {dst_file.as_posix()}") try: - do_download(url, dst_file, time, sha256) + download_file(url, dst_file, time, sha256) return True except Exception as e: logger.error(f"Failed to download {url} to {dst_file.as_posix()}: {e}") return False else: - os.utime(dst_file, (time.timestamp(), time.timestamp())) # update access and modified time + if verify: + # first check sha256 + with open(dst_file, "rb") as f: + existing_sha256 = hashlib.sha256(f.read()).hexdigest() + if existing_sha256 != sha256: + logger.warning( + f"File {dst_file.as_posix()} sha256 mismatch: existing {existing_sha256}, expected {sha256}, re-downloading" + ) + try: + download_file(url, dst_file, time, sha256) + return True + except Exception as e: + logger.error( + f"Failed to download {url} to {dst_file.as_posix()}: {e}" + ) + return False + # update access and modified time anyway + os.utime(dst_file, (time.timestamp(), time.timestamp())) logger.info(f"File {dst_file.as_posix()} already exists, skipping download") return True + def _from_published_time(published: Optional[str]) -> datetime: if published: return datetime.fromisoformat(published.replace("Z", "+00:00")) else: return datetime.now() + # https://github.com/dart-lang/pub/blob/master/doc/repository-spec-v2.md#list-all-versions-of-a-package def handle_pkg( executor: concurrent.futures.ThreadPoolExecutor, @@ -104,28 +134,39 @@ def handle_pkg( pkg_name: str, working_dir: Path, mirror_url: str, - clean: bool, + verify: bool, ) -> bool: logger.info(f"Handling package {pkg_name}...") # fetch metadata from upstream pkgUrl = base_url + "/api/packages/" + pkg_name - req = get_with_token(pkgUrl, headers={"Accept": "application/vnd.pub.v2+json"}, timeout=5) + req = get_with_token( + pkgUrl, headers={"Accept": "application/vnd.pub.v2+json"}, timeout=5 + ) req.raise_for_status() - resp = req.json() + pkg_meta = req.json() download_tasks = [] - latest_ver = resp["latest"]["version"] - latest_time = _from_published_time(resp["latest"].get("published")) + latest_ver = pkg_meta["latest"]["version"] + latest_time = _from_published_time(pkg_meta["latest"].get("published")) - for ver in resp["versions"]: + # modify metadata and prepare download tasks + for ver in pkg_meta["versions"]: logger.debug(f'Checking {pkg_name}=={ver["version"]}') ver_time = _from_published_time(ver.get("published")) if "advisoriesUpdated" in ver: - del ver["advisoriesUpdated"] # not supported + del ver["advisoriesUpdated"] # not supported if ver.get("retracted", False): - logger.info(f'Skipping retracted version {pkg_name}=={ver["version"]}') - dst_file = working_dir / "packages" / pkg_name / "versions" / f'{ver["version"]}.tar.gz' + logger.info( + f'Trying to delete retracted version {pkg_name}=={ver["version"]}' + ) + dst_file = ( + working_dir + / "packages" + / pkg_name + / "versions" + / f'{ver["version"]}.tar.gz' + ) dst_file.unlink(missing_ok=True) continue download_tasks.append( @@ -136,6 +177,7 @@ def handle_pkg( ver["archive_url"], ver_time, ver["archive_sha256"], + verify, ) ) # replace URL in metadata with our mirror URL @@ -143,11 +185,17 @@ def handle_pkg( serving_url = f"{mirror_url}/packages/{pkg_name}/versions/{cur_ver}.tar.gz" ver["archive_url"] = serving_url if cur_ver == latest_ver: - resp["latest"] = ver + pkg_meta["latest"] = ver # clean up obsolete versions if needed - if clean: - all_versions = [ver["version"] for ver in resp["versions"] if not ver.get("retracted", False)] + if verify: + all_versions = set( + [ + ver["version"] + for ver in pkg_meta["versions"] + if not ver.get("retracted", False) + ] + ) versions_dir = working_dir / "packages" / pkg_name / "versions" if versions_dir.is_dir(): for f in versions_dir.iterdir(): @@ -158,7 +206,7 @@ def handle_pkg( f.unlink(missing_ok=True) # save modified metadata to api/packages//meta.json - modified_meta_str = json.dumps(resp) + modified_meta_str = json.dumps(pkg_meta) meta_on_disk = working_dir / "api" / "packages" / pkg_name / "meta.json" # fast path: check if meta.json exists and has the same size if not meta_on_disk.is_file() or meta_on_disk.stat().st_size != len( @@ -180,7 +228,7 @@ def handle_pkg( f.flush() else: logger.info( - f"Metadata for package {pkg_name} is up to date (latest {latest_ver}), skipping" + f"Metadata for package {pkg_name} looks up to date (latest {latest_ver})" ) return True @@ -198,12 +246,10 @@ def main(): "--workers", default=1, type=int, help="number of concurrent downloading jobs" ) parser.add_argument( - "--clean", - action='store_true', - help="remove obsolete package versions that are no longer in upstream", + "--verify", + action="store_true", + help="remove obsolete package versions, touch the mtime of package files", ) - # parser.add_argument("--fast-skip", action='store_true', - # help='do not verify sha256 of existing files') args = parser.parse_args() if args.working_dir is None: @@ -212,13 +258,13 @@ def main(): working_dir = Path(args.working_dir) base_url = args.base_url mirror_url = MIRROR_URL - clean = args.clean + verify = args.verify logger.info(f"Using upstream URL: {base_url}") logger.info(f"Using mirror URL: {mirror_url}") logger.info(f"Using working directory: {working_dir.as_posix()}") logger.info(f"Using {args.workers} workers") - logger.info(f"Clean obsolete packages: {'Yes' if clean else 'No'}") + logger.info(f"Verify existing files: {'Yes' if verify else 'No'}") pkg_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) download_executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) @@ -226,7 +272,7 @@ def main(): # iterate through all packages pkgs_url = base_url + "/api/package-names" pkg_futures = [] - all_pkgs = [] + remote_pkgs = set() while True: req = get_with_token(pkgs_url, headers={"Accept-Encoding": "gzip"}, timeout=5) req.raise_for_status() @@ -241,10 +287,10 @@ def main(): pkg, working_dir, mirror_url, - clean, + verify, ) ) - all_pkgs.append(pkg) + remote_pkgs.add(pkg) # null means no more pages if not (pkgs_url := resp["nextUrl"]): @@ -260,16 +306,22 @@ def main(): pkg_executor.shutdown(wait=True) download_executor.shutdown(wait=True) - if clean: - # clean up obsolete packages + # clean up packages that are no longer in upstream + if verify: + local_pkgs = set() pkgs_dir = working_dir / "packages" if pkgs_dir.is_dir(): for p in pkgs_dir.iterdir(): if p.is_dir(): pkg_name = p.name - if pkg_name not in all_pkgs: - logger.info(f"Removing obsolete package {pkg_name}") - shutil.rmtree(p, ignore_errors=True) + local_pkgs.add(pkg_name) + + pkgs_to_clean = local_pkgs - remote_pkgs + logging.info(f"{len(pkgs_to_clean)} packages not in upstream: {pkgs_to_clean}") + for pkg_name in pkgs_to_clean: + logger.info(f"Removing obsolete package dir {pkg_name}") + p = pkgs_dir / pkg_name + shutil.rmtree(p, ignore_errors=True) if __name__ == "__main__":