File indexing completed on 2025-12-15 09:23:41
0001
0002
0003 import os
0004 import argparse
0005 import json
0006 import urllib.request
0007 import urllib.error
0008 import re
0009 import subprocess
0010 import hashlib
0011 import tempfile
0012 from pathlib import Path
0013 from typing import Tuple, Dict, Optional, Callable, TypeVar, Any
0014 import contextlib
0015 import time
0016 import functools
0017 from contextvars import ContextVar
0018
0019
0020 DEFAULT_CACHE_SIZE_LIMIT = 1 * 1024 * 1024
0021
0022 T = TypeVar("T")
0023 remaining_retries: ContextVar[int] = ContextVar("remaining_retries", default=0)
0024
0025
0026 def retry_on_http_error(max_retries: int = 3, base_delay: float = 1.0):
0027 def decorator(func: Callable[..., T]) -> Callable[..., T]:
0028 @functools.wraps(func)
0029 def wrapper(*args: Any, **kwargs: Any) -> T:
0030 for attempt in range(max_retries):
0031 remaining_retries.set(max_retries - attempt - 1)
0032 try:
0033 return func(*args, **kwargs)
0034 except urllib.error.HTTPError as e:
0035 if attempt < max_retries - 1:
0036 delay = base_delay * (2**attempt)
0037 print(
0038 f"Got HTTP error {e.code}, retrying in {delay} seconds..."
0039 )
0040 time.sleep(delay)
0041 continue
0042 raise
0043 except urllib.error.URLError as e:
0044 if attempt < max_retries - 1:
0045 delay = base_delay * (2**attempt)
0046 print(f"Got URL error {e}, retrying in {delay} seconds...")
0047 time.sleep(delay)
0048 continue
0049 raise
0050 return func(*args, **kwargs)
0051
0052 return wrapper
0053
0054 return decorator
0055
0056
0057 def compute_cache_key(url: str) -> str:
0058 """Compute a cache key for a URL"""
0059 return hashlib.sha256(url.encode()).hexdigest()
0060
0061
0062 def compute_cache_digest(cache_dir: Path) -> str:
0063 """Compute a digest of all cache files except digest.txt"""
0064 files = sorted(
0065 f
0066 for f in os.listdir(cache_dir)
0067 if (cache_dir / f).is_file() and f != "digest.txt"
0068 )
0069
0070 digest = hashlib.sha256()
0071 for fname in files:
0072 fpath = cache_dir / fname
0073 digest.update(fname.encode())
0074 digest.update(str(fpath.stat().st_size).encode())
0075 digest.update(fpath.read_bytes())
0076 return digest.hexdigest()
0077
0078
0079 def update_cache_digest(cache_dir: Path):
0080 """Update the cache digest file"""
0081 digest = compute_cache_digest(cache_dir)
0082 (cache_dir / "digest.txt").write_text(digest)
0083
0084
0085 def prune_cache(cache_dir: Optional[Path], size_limit: int):
0086 """Prune the cache to keep it under the size limit"""
0087 if cache_dir is None or not cache_dir.exists():
0088 return
0089
0090
0091 cache_files = [
0092 (cache_dir / f, (cache_dir / f).stat().st_mtime)
0093 for f in os.listdir(cache_dir)
0094 if (cache_dir / f).is_file()
0095 and f != "digest.txt"
0096 ]
0097 total_size = sum(f.stat().st_size for f, _ in cache_files)
0098
0099 if total_size <= size_limit:
0100 return
0101
0102
0103 cache_files.sort(key=lambda x: x[1])
0104
0105
0106 for file_path, _ in cache_files:
0107 if total_size <= size_limit:
0108 break
0109 total_size -= file_path.stat().st_size
0110 file_path.unlink()
0111
0112
0113 update_cache_digest(cache_dir)
0114
0115
0116 @retry_on_http_error()
0117 def fetch_github(base_url: str, cache_dir: Optional[Path], cache_limit: int) -> bytes:
0118 headers = {}
0119 token = os.environ.get("GITHUB_TOKEN")
0120
0121
0122 if token is not None and token != "" and remaining_retries.get() > 0:
0123 headers["Authorization"] = f"Bearer {token}"
0124
0125 print(f"Remaining retries: {remaining_retries.get()} for {base_url}")
0126 print(headers)
0127
0128 with contextlib.ExitStack() as stack:
0129 if cache_dir is not None:
0130 cache_dir.mkdir(parents=True, exist_ok=True)
0131 else:
0132 cache_dir = Path(stack.enter_context(tempfile.TemporaryDirectory()))
0133
0134
0135 cache_key = compute_cache_key(base_url)
0136 cache_file = cache_dir / cache_key
0137
0138 if cache_file.exists():
0139 print("Cache hit on", base_url)
0140 return cache_file.read_bytes()
0141 else:
0142 print("Cache miss on", base_url)
0143
0144 try:
0145 req = urllib.request.Request(base_url, headers=headers)
0146 with urllib.request.urlopen(req) as response:
0147 content = response.read()
0148
0149
0150 cache_file.write_bytes(content)
0151
0152
0153 update_cache_digest(cache_dir)
0154
0155
0156 prune_cache(cache_dir, cache_limit)
0157
0158 return content
0159 except urllib.error.URLError as e:
0160 print(f"Failed to fetch from {base_url}: {e}")
0161 raise e
0162 except json.JSONDecodeError as e:
0163 print(f"Failed to parse JSON response: {e}")
0164 raise e
0165
0166
0167 def main():
0168 parser = argparse.ArgumentParser()
0169 parser.add_argument("--tag", type=str, required=True, help="Tag to use")
0170 parser.add_argument("--arch", type=str, required=True, help="Architecture to use")
0171 parser.add_argument(
0172 "--compiler-binary",
0173 type=str,
0174 default=os.environ.get("CXX"),
0175 help="Compiler to use (defaults to CXX environment variable if set)",
0176 )
0177 parser.add_argument(
0178 "--compiler",
0179 type=str,
0180 default=None,
0181 help="Compiler to use (defaults to compiler binary if set)",
0182 )
0183 parser.add_argument(
0184 "--output",
0185 type=str,
0186 default=None,
0187 help="Output file to write lockfile to",
0188 )
0189 parser.add_argument(
0190 "--cache-dir",
0191 type=lambda x: Path(x).expanduser() if x else None,
0192 default=os.environ.get("LOCKFILE_CACHE_DIR"),
0193 help="Directory to use for caching (defaults to LOCKFILE_CACHE_DIR env var)",
0194 )
0195 parser.add_argument(
0196 "--cache-limit",
0197 type=int,
0198 default=int(os.environ.get("LOCKFILE_CACHE_LIMIT", DEFAULT_CACHE_SIZE_LIMIT)),
0199 help="Cache size limit in bytes (defaults to LOCKFILE_CACHE_LIMIT env var)",
0200 )
0201 args = parser.parse_args()
0202
0203 print("Fetching lockfiles for tag:", args.tag)
0204 print("Architecture:", args.arch)
0205
0206 base_url = f"https://api.github.com/repos/acts-project/ci-dependencies/releases/tags/{args.tag}"
0207
0208 data = json.loads(fetch_github(base_url, args.cache_dir, args.cache_limit))
0209
0210 lockfiles = parse_assets(data)
0211
0212 print("Available lockfiles:")
0213 for arch, compilers in lockfiles.items():
0214 print(f"> {arch}:")
0215 for c, (n, _) in compilers.items():
0216 print(f" - {c}: {n}")
0217
0218 if args.arch not in lockfiles:
0219 print(f"No lockfile found for architecture {args.arch}")
0220 exit(1)
0221
0222 if args.compiler_binary is not None:
0223 compiler = determine_compiler_version(args.compiler_binary)
0224 print("Compiler:", args.compiler_binary, f"{compiler}")
0225 elif args.compiler is not None:
0226 if not re.match(r"^([\w-]+)@(\d+\.\d+\.\d+)$", args.compiler):
0227 print(f"Invalid compiler format: {args.compiler}")
0228 exit(1)
0229 compiler = args.compiler
0230 print("Compiler:", f"{compiler}")
0231 else:
0232 compiler = None
0233
0234 lockfile = select_lockfile(lockfiles, args.arch, compiler)
0235
0236 print("Selected lockfile:", lockfile)
0237
0238 if args.output:
0239 with open(args.output, "wb") as f:
0240 f.write(fetch_github(lockfile, args.cache_dir, args.cache_limit))
0241
0242
0243 def parse_assets(data: Dict) -> Dict[str, Dict[str, Tuple[str, str]]]:
0244 lockfiles: Dict[str, Dict[str, Tuple[str, str]]] = {}
0245
0246 for asset in data["assets"]:
0247 url = asset["browser_download_url"]
0248
0249 name = asset["name"]
0250 if not name.endswith(".lock") or not name.startswith("spack_"):
0251 continue
0252
0253 m = re.match(r"spack_(.*(?:aarch64|x86_64))(?:_(.*))?\.lock", name)
0254 if m is None:
0255 continue
0256
0257 arch, compiler = m.groups()
0258 compiler = compiler if compiler else "default"
0259 lockfiles.setdefault(arch, {})[compiler] = (name, url)
0260
0261 return lockfiles
0262
0263
0264 def select_lockfile(
0265 lockfiles: Dict[str, Dict[str, Tuple[str, str]]], arch: str, compiler: Optional[str]
0266 ):
0267
0268 _, lockfile = lockfiles[arch]["default"]
0269
0270 if compiler is None:
0271 return lockfile
0272
0273
0274 compiler_family = compiler.split("@")[0]
0275
0276
0277 matching_compilers = {
0278 comp: ver
0279 for comp, ver in lockfiles[arch].items()
0280 if comp != "default" and comp.split("@")[0] == compiler_family
0281 }
0282
0283 if matching_compilers:
0284 if compiler in matching_compilers:
0285
0286 _, lockfile = matching_compilers[compiler]
0287 else:
0288
0289 highest_version = max(
0290 matching_compilers.keys(),
0291 key=lambda x: [int(v) for v in x.split("@")[1].split(".")],
0292 )
0293 _, lockfile = matching_compilers[highest_version]
0294
0295 return lockfile
0296
0297
0298 def determine_compiler_version(binary: str):
0299 try:
0300 result = subprocess.run([binary, "--version"], capture_output=True, text=True)
0301
0302 line = result.stdout.split("\n", 1)[0]
0303 print(line)
0304 if "clang" in line:
0305 compiler = "clang"
0306 if "Apple" in line:
0307 compiler = "apple-clang"
0308 elif "gcc" in line or "GCC" in line or "g++" in line:
0309 compiler = "gcc"
0310 else:
0311 print(f"Unknown compiler: {binary}")
0312 exit(1)
0313
0314 m = re.search(r"(\d+\.\d+\.\d+)", line)
0315 if m is None:
0316 print(f"Failed to determine version for compiler: {binary}")
0317 exit(1)
0318 (version,) = m.groups()
0319 return f"{compiler}@{version}"
0320
0321 except (subprocess.SubprocessError, FileNotFoundError):
0322 print(f"Failed to determine version for compiler: {binary}")
0323 exit(1)
0324
0325
0326 if __name__ == "__main__":
0327 main()