File indexing completed on 2025-07-09 07:53:33
0001 import io
0002 from pathlib import Path
0003 from zipfile import ZipFile
0004
0005 import requests
0006
0007
0008 def download_artifact(workflow, artifact_name, token=None, click=None):
0009 artifacts = [artifact for artifact in workflow.get_artifacts() if artifact.name == artifact_name]
0010 if not artifacts:
0011 if click is not None:
0012 click.secho(f"Can not obtain {artifact_name}", fg="red", err=True)
0013 if workflow.get_artifacts().totalCount:
0014 click.secho(f"Available artifacts:", fg="red", err=True)
0015 for artifact in workflow.get_artifacts():
0016 click.echo(artifact.name)
0017 else:
0018 click.secho(f"No artifacts available", fg="red", err=True)
0019 return None
0020
0021 outdir = Path(workflow.created_at.isoformat().replace(":", "-") + "_" + workflow.head_sha)
0022 outpath = outdir / artifact_name
0023 if outpath.exists():
0024 return outpath
0025
0026 if not outpath.parent.exists():
0027 outdir.mkdir()
0028
0029 artifact, = artifacts
0030 req = requests.get(artifact.archive_download_url, headers={"Authorization": f"token {token}"} if token else {})
0031 zfp = ZipFile(io.BytesIO(req.content))
0032
0033
0034 zip_filename = None
0035 if artifact_name in zfp.namelist():
0036 zip_filename = artifact_name
0037 elif len(zfp.namelist()) == 1:
0038 zip_filename, = zfp.namelist()
0039 if click is not None:
0040 click.secho(f"Can't locate {artifact_name} in the artifact ZIP archive, using {zip_filename} instead", fg="orange", err=True)
0041
0042 if zip_filename is not None:
0043
0044 with zfp.open(zip_filename) as fp_zip:
0045 with open(outpath, "wb") as fp_out:
0046 fp_out.write(fp_zip.read())
0047 else:
0048
0049 if click is not None:
0050 click.secho(f"Can't locate {artifact_name} in the artifact ZIP archive, extracting all", fg="green", err=True)
0051 zfp.extractall(path=outpath)
0052
0053 return outpath