Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 """
0002 This script will be executed at container startup
0003 - It will retrieve the proxy and panda queue from the environment
0004 - It will download the pilot wrapper from github and execute it
0005 - It will upload the pilot logs to panda cache at the end
0006 
0007 post-multipart code was taken from: https://github.com/haiwen/webapi-examples/blob/master/python/upload-file.py
0008 """
0009 
0010 import gzip
0011 import http.client as httplib
0012 import logging
0013 import os
0014 import shutil
0015 import ssl
0016 import subprocess
0017 import sys
0018 import traceback
0019 import urllib.parse as urlparse
0020 from dataclasses import dataclass
0021 from typing import Optional
0022 
0023 WORK_DIR = "/scratch"
0024 CONFIG_DIR = "/scratch/jobconfig"
0025 PJD = "pandaJobData.out"
0026 PFC = "PoolFileCatalog_H.xml"
0027 CONFIG_FILES = [PJD, PFC]
0028 
0029 logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stdout)
0030 
0031 
0032 @dataclass
0033 class Configuration:
0034     proxy_path: Optional[str]
0035     full_token_path: Optional[str]
0036     full_token_key_path: Optional[str]
0037     token_auth_origin: Optional[str]
0038     panda_site: Optional[str]
0039     panda_queue: Optional[str]
0040     resource_type: Optional[str]
0041     prod_source_label: Optional[str]
0042     job_type: Optional[str]
0043     pilot_type: str
0044     pilot_url_option: str
0045     python_option: str
0046     pilot_version: str
0047     harvester_id: Optional[str]
0048     worker_id: Optional[str]
0049     logs_frontend_w: Optional[str]
0050     logs_frontend_r: Optional[str]
0051     stdout_name: str
0052     submit_mode: str
0053 
0054 
0055 def post_multipart(host, port, selector, files, proxy_cert, full_token_path, token_auth_origin):
0056     """
0057     Post files to an http host as multipart/form-data.
0058     files is a sequence of (name, filename, value) elements for data to be uploaded as files
0059     Return the server's response page.
0060     """
0061     content_type, body = encode_multipart_formdata(files)
0062 
0063     # if no token is provided, use the proxy certificate
0064     context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
0065     if not full_token_path or not token_auth_origin:
0066         context.load_cert_chain(certfile=proxy_cert, keyfile=proxy_cert)
0067 
0068     h = httplib.HTTPSConnection(host, port, context=context, timeout=180)
0069 
0070     h.putrequest("POST", selector)
0071     h.putheader("content-type", content_type)
0072     h.putheader("content-length", str(len(body)))
0073 
0074     # if token is provided, use it instead of the proxy certificate
0075     if full_token_path and token_auth_origin:
0076         with open(full_token_path, "r") as token_file:
0077             token_content = token_file.read().strip()
0078         h.putheader("Authorization", f"Bearer {token_content}")
0079         h.putheader("Origin", token_auth_origin)
0080 
0081     # The new API expects JSON responses
0082     h.putheader("Accept", "application/json")
0083 
0084     h.endheaders()
0085     h.send(body)
0086     response = h.getresponse()
0087     return response.status, response.reason
0088 
0089 
0090 def encode_multipart_formdata(files):
0091     """
0092     files is a sequence of (name, filename, value) elements for data to be uploaded as files
0093     Return (content_type, body) ready for httplib.HTTP instance
0094     """
0095     BOUNDARY = "----------ThIs_Is_tHe_bouNdaRY_$"
0096     CRLF = b"\r\n"
0097     L = []
0098     for key, filename, value in files:
0099         L.append("--" + BOUNDARY)
0100         L.append(f'Content-Disposition: form-data; name="{key}"; filename="{filename}"')
0101         L.append("Content-Type: application/octet-stream")
0102         L.append("")
0103         L.append(value)
0104     L.append("--" + BOUNDARY + "--")
0105     L.append("")
0106     body = CRLF.join([x.encode() if isinstance(x, str) else x for x in L])
0107     content_type = f"multipart/form-data; boundary={BOUNDARY}"
0108     return content_type, body
0109 
0110 
0111 def upload_logs(url, log_file_name, destination_name, proxy_cert, full_token_path, token_auth_origin):
0112     try:
0113         # modify the url to point to the new file server API
0114         url = url.replace("/server/panda", "/api/v1/file_server")
0115         full_url = url + "/upload_cache_file"
0116         url_parts = urlparse.urlsplit(full_url)
0117 
0118         logging.debug("[upload_logs] start")
0119         with open(log_file_name, "r") as log_file:
0120             log_payload = gzip.compress(log_file.read().encode())
0121         files = [("file", destination_name, log_payload)]
0122         status, reason = post_multipart(url_parts.hostname, url_parts.port, url_parts.path, files, proxy_cert, full_token_path, token_auth_origin)
0123         logging.debug(f"[upload_logs] finished with code={status} msg={reason}")
0124         if status == 200:
0125             return True
0126     except Exception:
0127         err_type, err_value = sys.exc_info()[:2]
0128         err_message = f"failed to put with {err_type}:{err_value} "
0129         err_message += traceback.format_exc()
0130         logging.debug(f"[upload_logs] excepted with:\n {err_message}")
0131 
0132     return False
0133 
0134 
0135 def copy_files_in_dir(src_dir, dst_dir):
0136     for file_name in CONFIG_FILES:
0137         full_file_name = os.path.join(src_dir, file_name)
0138         shutil.copy(full_file_name, dst_dir)
0139 
0140 
0141 def copy_proxy(source_file, destination_dir):
0142     """
0143     Copy the proxy file and change the permission to user read only
0144     """
0145     # Extract the filename from the source path
0146     filename = os.path.basename(source_file)
0147     # Construct the file name with the full destination path
0148     destination_file = os.path.join(destination_dir, filename)
0149 
0150     # Copy the proxy file and change the permission to user read only
0151     if source_file and destination_file:
0152         shutil.copy(source_file, destination_file)
0153         os.chmod(destination_file, 0o400)
0154         return destination_file
0155 
0156     return ""
0157 
0158 
0159 def get_configuration():
0160 
0161     # see if there is a work directory specified
0162     tmpdir = os.environ.get("TMPDIR")
0163     if tmpdir:
0164         global WORK_DIR
0165         WORK_DIR = tmpdir
0166 
0167     proxy_path_secret = os.environ.get("proxySecretPath")
0168     token_path = os.environ.get("PANDA_AUTH_DIR")
0169     token_filename = os.environ.get("PANDA_AUTH_TOKEN")
0170     token_key_filename = os.environ.get("PANDA_AUTH_TOKEN_KEY")
0171     token_auth_origin = os.environ.get("PANDA_AUTH_ORIGIN")
0172 
0173     # check that either the proxy or the tokens are configured. If not, raise an exception
0174     if not proxy_path_secret and (not token_path or not token_filename or not token_key_filename or not token_auth_origin):
0175         logging.debug("[main] there is no proxy or token specified")
0176         raise Exception("Found no voms proxy or token configuration specified")
0177 
0178     # get the proxy certificate and copy it to the work directory
0179     proxy_path = None
0180     if proxy_path_secret:
0181         proxy_path = copy_proxy(proxy_path_secret, WORK_DIR)
0182         if not proxy_path:
0183             logging.debug("[main] failed to copy proxies")
0184             raise Exception("Failed to copy proxies")
0185 
0186         logging.debug("[main] initialized proxy")
0187 
0188     os.environ["X509_USER_PROXY"] = proxy_path
0189 
0190     # copy the pilot-panda token and token key to the work directory
0191     logging.debug(f"[main] token info {token_path} {token_filename} {token_key_filename}")
0192     full_token_path = None
0193     full_token_key_path = None
0194     if token_path and token_filename and token_key_filename:
0195         full_token_path = os.path.join(token_path, token_filename)
0196         copy_proxy(full_token_path, WORK_DIR)
0197 
0198         full_token_key_path = os.path.join(token_path, token_key_filename)
0199         copy_proxy(full_token_key_path, WORK_DIR)
0200         logging.debug("[main] initialized pilot-panda tokens")
0201     else:
0202         os.unsetenv("PANDA_AUTH_TOKEN")
0203 
0204     # get the panda site name
0205     panda_site = os.environ.get("computingSite")
0206     logging.debug(f"[main] got panda site: {panda_site}")
0207 
0208     # get the panda queue name
0209     panda_queue = os.environ.get("pandaQueueName")
0210     logging.debug(f"[main] got panda queue: {panda_queue}")
0211 
0212     # get the resource type of the worker
0213     resource_type = os.environ.get("resourceType")
0214     logging.debug(f"[main] got resource type: {resource_type}")
0215 
0216     prodSourceLabel = os.environ.get("prodSourceLabel")
0217     logging.debug(f"[main] got prodSourceLabel: {prodSourceLabel}")
0218 
0219     job_type = os.environ.get("jobType")
0220     logging.debug(f"[main] got job type: {job_type}")
0221 
0222     pilot_type = os.environ.get("pilotType", "")
0223     logging.debug(f"[main] got pilotType: {pilot_type}")
0224 
0225     pilot_url_option = os.environ.get("pilotUrlOpt", "")
0226     logging.debug(f"[main] got pilotUrlOpt: {pilot_url_option}")
0227 
0228     python_option = os.environ.get("pythonOption", "")
0229     logging.debug(f"[main] got pythonOption: {python_option}")
0230 
0231     pilot_version = os.environ.get("pilotVersion", "")
0232     logging.debug(f"[main] got pilotVersion: {pilot_version}")
0233 
0234     # get the Harvester ID
0235     harvester_id = os.environ.get("HARVESTER_ID")
0236     logging.debug(f"[main] got Harvester ID: {harvester_id}")
0237 
0238     # get the worker id
0239     worker_id = os.environ.get("workerID")
0240     logging.debug(f"[main] got worker ID: {worker_id}")
0241 
0242     # get the URL (e.g. panda cache) to upload logs
0243     logs_frontend_w = os.environ.get("logs_frontend_w")
0244     logging.debug("[main] got url to upload logs")
0245 
0246     # get the URL (e.g. panda cache) where the logs can be downloaded afterwards
0247     logs_frontend_r = os.environ.get("logs_frontend_r")
0248     logging.debug("[main] got url to download logs")
0249 
0250     # build the filename to use for the stdout log
0251     stdout_name = f"{harvester_id}_{worker_id}_gz.out"
0252 
0253     logging.debug("[main] got filename for the stdout log")
0254 
0255     # get the submission mode (push/pull) for the pilot
0256     submit_mode = os.environ.get("submit_mode")
0257     if not submit_mode:
0258         submit_mode = "PULL"
0259 
0260     return Configuration(
0261         proxy_path=proxy_path,
0262         full_token_path=full_token_path,
0263         full_token_key_path=full_token_key_path,
0264         token_auth_origin=token_auth_origin,
0265         panda_site=panda_site,
0266         panda_queue=panda_queue,
0267         resource_type=resource_type,
0268         prod_source_label=prodSourceLabel,
0269         job_type=job_type,
0270         pilot_type=pilot_type,
0271         pilot_url_option=pilot_url_option,
0272         python_option=python_option,
0273         pilot_version=pilot_version,
0274         harvester_id=harvester_id,
0275         worker_id=worker_id,
0276         logs_frontend_w=logs_frontend_w,
0277         logs_frontend_r=logs_frontend_r,
0278         stdout_name=stdout_name,
0279         submit_mode=submit_mode,
0280     )
0281 
0282 
0283 if __name__ == "__main__":
0284     # get all the configuration from environment
0285     config = get_configuration()
0286     destination_name = config.stdout_name
0287 
0288     # the pilot should propagate the download link via the pilotId field in the job table
0289     log_download_url = f"{config.logs_frontend_r}/{destination_name}"
0290     os.environ["GTAG"] = log_download_url  # GTAG env variable is read by pilot
0291 
0292     # execute the pilot wrapper
0293     logging.debug("[main] starting pilot wrapper...")
0294     resource_type_option = ""
0295     if config.resource_type:
0296         resource_type_option = f"--resource-type {config.resource_type}"
0297 
0298     if config.prod_source_label:
0299         psl_option = f"-j {config.prod_source_label}"
0300     else:
0301         psl_option = "-j managed"
0302 
0303     job_type_option = ""
0304     if config.job_type:
0305         job_type_option = f"--job-type {config.job_type}"
0306 
0307     pilot_type_option = "-i PR"
0308     if config.pilot_type:
0309         pilot_type_option = f"-i {config.pilot_type}"
0310 
0311     pilot_version_option = "--pilotversion 2"
0312     if config.pilot_version:
0313         pilot_version_option = f"--pilotversion {config.pilot_version}"
0314 
0315     wrapper_params = "-q {0} -r {1} -s {2} -a {3} {4} {5} {6} {7} {8} {9} {10}".format(
0316         config.panda_queue,
0317         config.panda_queue,
0318         config.panda_site,
0319         WORK_DIR,
0320         resource_type_option,
0321         psl_option,
0322         pilot_type_option,
0323         job_type_option,
0324         config.pilot_url_option,
0325         config.python_option,
0326         pilot_version_option,
0327     )
0328 
0329     if config.submit_mode == "PUSH":
0330         # job configuration files need to be copied, because k8s configmap mounts as read-only file system
0331         # and therefore the pilot cannot execute in the same directory
0332         copy_files_in_dir(CONFIG_DIR, WORK_DIR)
0333 
0334     wrapper_executable = "/cvmfs/atlas.cern.ch/repo/sw/PandaPilotWrapper/latest/runpilot2-wrapper.sh"
0335     command = "/bin/bash {0} {1} -w generic --pilot-user=ATLAS --url=https://pandaserver.cern.ch -d --harvester-submit-mode={2} --allow-same-user=False".format(
0336         wrapper_executable, wrapper_params, config.submit_mode
0337     )
0338 
0339     # extend command to tee the stdout and stderr to a file. We need to return the wrapper exit code, not the tee exit code
0340     command += " 2>&1 | tee ${TMPDIR:-/tmp}/wrapper-wid.log; exit ${PIPESTATUS[0]}"
0341 
0342     try:
0343         return_code = subprocess.call(command, shell=True)
0344     except BaseException:
0345         logging.error(traceback.format_exc())
0346         return_code = 1
0347 
0348     logging.debug(f"[main] pilot wrapper done with return code {return_code} ...")
0349 
0350     # upload logs to e.g. panda cache or similar
0351     upload_logs(
0352         config.logs_frontend_w,
0353         WORK_DIR + "/wrapper-wid.log",
0354         destination_name,
0355         config.proxy_path,
0356         config.full_token_path,
0357         config.token_auth_origin,
0358     )
0359     logging.debug("[main] FINISHED")
0360 
0361     # Exit with the same exit code as the pilot wrapper
0362     sys.exit(return_code)