Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-15 09:23:41

0001 #!/usr/bin/env python3
0002 
0003 import os
0004 import argparse
0005 import json
0006 import urllib.request
0007 import urllib.error
0008 import re
0009 import subprocess
0010 import hashlib
0011 import tempfile
0012 from pathlib import Path
0013 from typing import Tuple, Dict, Optional, Callable, TypeVar, Any
0014 import contextlib
0015 import time
0016 import functools
0017 from contextvars import ContextVar
0018 
0019 # Modify the default cache dir to use a temporary directory
0020 DEFAULT_CACHE_SIZE_LIMIT = 1 * 1024 * 1024  # 1MB
0021 
0022 T = TypeVar("T")
0023 remaining_retries: ContextVar[int] = ContextVar("remaining_retries", default=0)
0024 
0025 
0026 def retry_on_http_error(max_retries: int = 3, base_delay: float = 1.0):
0027     def decorator(func: Callable[..., T]) -> Callable[..., T]:
0028         @functools.wraps(func)
0029         def wrapper(*args: Any, **kwargs: Any) -> T:
0030             for attempt in range(max_retries):
0031                 remaining_retries.set(max_retries - attempt - 1)
0032                 try:
0033                     return func(*args, **kwargs)
0034                 except urllib.error.HTTPError as e:
0035                     if attempt < max_retries - 1:
0036                         delay = base_delay * (2**attempt)
0037                         print(
0038                             f"Got HTTP error {e.code}, retrying in {delay} seconds..."
0039                         )
0040                         time.sleep(delay)
0041                         continue
0042                     raise
0043                 except urllib.error.URLError as e:
0044                     if attempt < max_retries - 1:
0045                         delay = base_delay * (2**attempt)
0046                         print(f"Got URL error {e}, retrying in {delay} seconds...")
0047                         time.sleep(delay)
0048                         continue
0049                     raise
0050             return func(*args, **kwargs)  # Final attempt
0051 
0052         return wrapper
0053 
0054     return decorator
0055 
0056 
0057 def compute_cache_key(url: str) -> str:
0058     """Compute a cache key for a URL"""
0059     return hashlib.sha256(url.encode()).hexdigest()
0060 
0061 
0062 def compute_cache_digest(cache_dir: Path) -> str:
0063     """Compute a digest of all cache files except digest.txt"""
0064     files = sorted(
0065         f
0066         for f in os.listdir(cache_dir)
0067         if (cache_dir / f).is_file() and f != "digest.txt"
0068     )
0069 
0070     digest = hashlib.sha256()
0071     for fname in files:
0072         fpath = cache_dir / fname
0073         digest.update(fname.encode())
0074         digest.update(str(fpath.stat().st_size).encode())
0075         digest.update(fpath.read_bytes())
0076     return digest.hexdigest()
0077 
0078 
0079 def update_cache_digest(cache_dir: Path):
0080     """Update the cache digest file"""
0081     digest = compute_cache_digest(cache_dir)
0082     (cache_dir / "digest.txt").write_text(digest)
0083 
0084 
0085 def prune_cache(cache_dir: Optional[Path], size_limit: int):
0086     """Prune the cache to keep it under the size limit"""
0087     if cache_dir is None or not cache_dir.exists():
0088         return
0089 
0090     # Get all cache files with their modification times
0091     cache_files = [
0092         (cache_dir / f, (cache_dir / f).stat().st_mtime)
0093         for f in os.listdir(cache_dir)
0094         if (cache_dir / f).is_file()
0095         and f != "digest.txt"  # Exclude digest from pruning
0096     ]
0097     total_size = sum(f.stat().st_size for f, _ in cache_files)
0098 
0099     if total_size <= size_limit:
0100         return
0101 
0102     # Sort by modification time (oldest first)
0103     cache_files.sort(key=lambda x: x[1])
0104 
0105     # Remove files until we're under the limit
0106     for file_path, _ in cache_files:
0107         if total_size <= size_limit:
0108             break
0109         total_size -= file_path.stat().st_size
0110         file_path.unlink()
0111 
0112     # Update digest after pruning
0113     update_cache_digest(cache_dir)
0114 
0115 
0116 @retry_on_http_error()
0117 def fetch_github(base_url: str, cache_dir: Optional[Path], cache_limit: int) -> bytes:
0118     headers = {}
0119     token = os.environ.get("GITHUB_TOKEN")
0120 
0121     # Only add auth header if we have retries left
0122     if token is not None and token != "" and remaining_retries.get() > 0:
0123         headers["Authorization"] = f"Bearer {token}"
0124 
0125     print(f"Remaining retries: {remaining_retries.get()} for {base_url}")
0126     print(headers)
0127 
0128     with contextlib.ExitStack() as stack:
0129         if cache_dir is not None:
0130             cache_dir.mkdir(parents=True, exist_ok=True)
0131         else:
0132             cache_dir = Path(stack.enter_context(tempfile.TemporaryDirectory()))
0133 
0134         # Check cache first
0135         cache_key = compute_cache_key(base_url)
0136         cache_file = cache_dir / cache_key
0137 
0138         if cache_file.exists():
0139             print("Cache hit on", base_url)
0140             return cache_file.read_bytes()
0141         else:
0142             print("Cache miss on", base_url)
0143 
0144         try:
0145             req = urllib.request.Request(base_url, headers=headers)
0146             with urllib.request.urlopen(req) as response:
0147                 content = response.read()
0148 
0149                 # Write to cache
0150                 cache_file.write_bytes(content)
0151 
0152                 # Update digest after adding new file
0153                 update_cache_digest(cache_dir)
0154 
0155                 # Prune cache if necessary (this will update digest again if pruning occurs)
0156                 prune_cache(cache_dir, cache_limit)
0157 
0158                 return content
0159         except urllib.error.URLError as e:
0160             print(f"Failed to fetch from {base_url}: {e}")
0161             raise e
0162         except json.JSONDecodeError as e:
0163             print(f"Failed to parse JSON response: {e}")
0164             raise e
0165 
0166 
0167 def main():
0168     parser = argparse.ArgumentParser()
0169     parser.add_argument("--tag", type=str, required=True, help="Tag to use")
0170     parser.add_argument("--arch", type=str, required=True, help="Architecture to use")
0171     parser.add_argument(
0172         "--compiler-binary",
0173         type=str,
0174         default=os.environ.get("CXX"),
0175         help="Compiler to use (defaults to CXX environment variable if set)",
0176     )
0177     parser.add_argument(
0178         "--compiler",
0179         type=str,
0180         default=None,
0181         help="Compiler to use (defaults to compiler binary if set)",
0182     )
0183     parser.add_argument(
0184         "--output",
0185         type=str,
0186         default=None,
0187         help="Output file to write lockfile to",
0188     )
0189     parser.add_argument(
0190         "--cache-dir",
0191         type=lambda x: Path(x).expanduser() if x else None,
0192         default=os.environ.get("LOCKFILE_CACHE_DIR"),
0193         help="Directory to use for caching (defaults to LOCKFILE_CACHE_DIR env var)",
0194     )
0195     parser.add_argument(
0196         "--cache-limit",
0197         type=int,
0198         default=int(os.environ.get("LOCKFILE_CACHE_LIMIT", DEFAULT_CACHE_SIZE_LIMIT)),
0199         help="Cache size limit in bytes (defaults to LOCKFILE_CACHE_LIMIT env var)",
0200     )
0201     args = parser.parse_args()
0202 
0203     print("Fetching lockfiles for tag:", args.tag)
0204     print("Architecture:", args.arch)
0205 
0206     base_url = f"https://api.github.com/repos/acts-project/ci-dependencies/releases/tags/{args.tag}"
0207 
0208     data = json.loads(fetch_github(base_url, args.cache_dir, args.cache_limit))
0209 
0210     lockfiles = parse_assets(data)
0211 
0212     print("Available lockfiles:")
0213     for arch, compilers in lockfiles.items():
0214         print(f"> {arch}:")
0215         for c, (n, _) in compilers.items():
0216             print(f"  - {c}: {n}")
0217 
0218     if args.arch not in lockfiles:
0219         print(f"No lockfile found for architecture {args.arch}")
0220         exit(1)
0221 
0222     if args.compiler_binary is not None:
0223         compiler = determine_compiler_version(args.compiler_binary)
0224         print("Compiler:", args.compiler_binary, f"{compiler}")
0225     elif args.compiler is not None:
0226         if not re.match(r"^([\w-]+)@(\d+\.\d+\.\d+)$", args.compiler):
0227             print(f"Invalid compiler format: {args.compiler}")
0228             exit(1)
0229         compiler = args.compiler
0230         print("Compiler:", f"{compiler}")
0231     else:
0232         compiler = None
0233 
0234     lockfile = select_lockfile(lockfiles, args.arch, compiler)
0235 
0236     print("Selected lockfile:", lockfile)
0237 
0238     if args.output:
0239         with open(args.output, "wb") as f:
0240             f.write(fetch_github(lockfile, args.cache_dir, args.cache_limit))
0241 
0242 
0243 def parse_assets(data: Dict) -> Dict[str, Dict[str, Tuple[str, str]]]:
0244     lockfiles: Dict[str, Dict[str, Tuple[str, str]]] = {}
0245 
0246     for asset in data["assets"]:
0247         url = asset["browser_download_url"]
0248 
0249         name = asset["name"]
0250         if not name.endswith(".lock") or not name.startswith("spack_"):
0251             continue
0252 
0253         m = re.match(r"spack_(.*(?:aarch64|x86_64))(?:_(.*))?\.lock", name)
0254         if m is None:
0255             continue
0256 
0257         arch, compiler = m.groups()
0258         compiler = compiler if compiler else "default"
0259         lockfiles.setdefault(arch, {})[compiler] = (name, url)
0260 
0261     return lockfiles
0262 
0263 
0264 def select_lockfile(
0265     lockfiles: Dict[str, Dict[str, Tuple[str, str]]], arch: str, compiler: Optional[str]
0266 ):
0267     # Default to the default lockfile
0268     _, lockfile = lockfiles[arch]["default"]
0269 
0270     if compiler is None:
0271         return lockfile
0272 
0273     # Extract compiler family and version
0274     compiler_family = compiler.split("@")[0]
0275 
0276     # Find all matching compiler families
0277     matching_compilers = {
0278         comp: ver
0279         for comp, ver in lockfiles[arch].items()
0280         if comp != "default" and comp.split("@")[0] == compiler_family
0281     }
0282 
0283     if matching_compilers:
0284         if compiler in matching_compilers:
0285             # Exact match found
0286             _, lockfile = matching_compilers[compiler]
0287         else:
0288             # Find highest version of same compiler family
0289             highest_version = max(
0290                 matching_compilers.keys(),
0291                 key=lambda x: [int(v) for v in x.split("@")[1].split(".")],
0292             )
0293             _, lockfile = matching_compilers[highest_version]
0294 
0295     return lockfile
0296 
0297 
0298 def determine_compiler_version(binary: str):
0299     try:
0300         result = subprocess.run([binary, "--version"], capture_output=True, text=True)
0301 
0302         line = result.stdout.split("\n", 1)[0]
0303         print(line)
0304         if "clang" in line:
0305             compiler = "clang"
0306             if "Apple" in line:
0307                 compiler = "apple-clang"
0308         elif "gcc" in line or "GCC" in line or "g++" in line:
0309             compiler = "gcc"
0310         else:
0311             print(f"Unknown compiler: {binary}")
0312             exit(1)
0313 
0314         m = re.search(r"(\d+\.\d+\.\d+)", line)
0315         if m is None:
0316             print(f"Failed to determine version for compiler: {binary}")
0317             exit(1)
0318         (version,) = m.groups()
0319         return f"{compiler}@{version}"
0320 
0321     except (subprocess.SubprocessError, FileNotFoundError):
0322         print(f"Failed to determine version for compiler: {binary}")
0323         exit(1)
0324 
0325 
0326 if __name__ == "__main__":
0327     main()