File indexing completed on 2026-04-20 07:58:57
0001 """
0002 This script will be executed at container startup
0003 - It will retrieve the proxy and panda queue from the environment
0004 - It will download the pilot wrapper from github and execute it
0005 - It will upload the pilot logs to panda cache at the end
0006
0007 post-multipart code was taken from: https://github.com/haiwen/webapi-examples/blob/master/python/upload-file.py
0008 """
0009
0010 import gzip
0011 import http.client as httplib
0012 import logging
0013 import os
0014 import shutil
0015 import ssl
0016 import subprocess
0017 import sys
0018 import traceback
0019 import urllib.parse as urlparse
0020 from dataclasses import dataclass
0021 from typing import Optional
0022
0023 WORK_DIR = "/scratch"
0024 CONFIG_DIR = "/scratch/jobconfig"
0025 PJD = "pandaJobData.out"
0026 PFC = "PoolFileCatalog_H.xml"
0027 CONFIG_FILES = [PJD, PFC]
0028
0029 logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stdout)
0030
0031
0032 @dataclass
0033 class Configuration:
0034 proxy_path: Optional[str]
0035 full_token_path: Optional[str]
0036 full_token_key_path: Optional[str]
0037 token_auth_origin: Optional[str]
0038 panda_site: Optional[str]
0039 panda_queue: Optional[str]
0040 resource_type: Optional[str]
0041 prod_source_label: Optional[str]
0042 job_type: Optional[str]
0043 pilot_type: str
0044 pilot_url_option: str
0045 python_option: str
0046 pilot_version: str
0047 harvester_id: Optional[str]
0048 worker_id: Optional[str]
0049 logs_frontend_w: Optional[str]
0050 logs_frontend_r: Optional[str]
0051 stdout_name: str
0052 submit_mode: str
0053
0054
0055 def post_multipart(host, port, selector, files, proxy_cert, full_token_path, token_auth_origin):
0056 """
0057 Post files to an http host as multipart/form-data.
0058 files is a sequence of (name, filename, value) elements for data to be uploaded as files
0059 Return the server's response page.
0060 """
0061 content_type, body = encode_multipart_formdata(files)
0062
0063
0064 context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
0065 if not full_token_path or not token_auth_origin:
0066 context.load_cert_chain(certfile=proxy_cert, keyfile=proxy_cert)
0067
0068 h = httplib.HTTPSConnection(host, port, context=context, timeout=180)
0069
0070 h.putrequest("POST", selector)
0071 h.putheader("content-type", content_type)
0072 h.putheader("content-length", str(len(body)))
0073
0074
0075 if full_token_path and token_auth_origin:
0076 with open(full_token_path, "r") as token_file:
0077 token_content = token_file.read().strip()
0078 h.putheader("Authorization", f"Bearer {token_content}")
0079 h.putheader("Origin", token_auth_origin)
0080
0081
0082 h.putheader("Accept", "application/json")
0083
0084 h.endheaders()
0085 h.send(body)
0086 response = h.getresponse()
0087 return response.status, response.reason
0088
0089
0090 def encode_multipart_formdata(files):
0091 """
0092 files is a sequence of (name, filename, value) elements for data to be uploaded as files
0093 Return (content_type, body) ready for httplib.HTTP instance
0094 """
0095 BOUNDARY = "----------ThIs_Is_tHe_bouNdaRY_$"
0096 CRLF = b"\r\n"
0097 L = []
0098 for key, filename, value in files:
0099 L.append("--" + BOUNDARY)
0100 L.append(f'Content-Disposition: form-data; name="{key}"; filename="{filename}"')
0101 L.append("Content-Type: application/octet-stream")
0102 L.append("")
0103 L.append(value)
0104 L.append("--" + BOUNDARY + "--")
0105 L.append("")
0106 body = CRLF.join([x.encode() if isinstance(x, str) else x for x in L])
0107 content_type = f"multipart/form-data; boundary={BOUNDARY}"
0108 return content_type, body
0109
0110
0111 def upload_logs(url, log_file_name, destination_name, proxy_cert, full_token_path, token_auth_origin):
0112 try:
0113
0114 url = url.replace("/server/panda", "/api/v1/file_server")
0115 full_url = url + "/upload_cache_file"
0116 url_parts = urlparse.urlsplit(full_url)
0117
0118 logging.debug("[upload_logs] start")
0119 with open(log_file_name, "r") as log_file:
0120 log_payload = gzip.compress(log_file.read().encode())
0121 files = [("file", destination_name, log_payload)]
0122 status, reason = post_multipart(url_parts.hostname, url_parts.port, url_parts.path, files, proxy_cert, full_token_path, token_auth_origin)
0123 logging.debug(f"[upload_logs] finished with code={status} msg={reason}")
0124 if status == 200:
0125 return True
0126 except Exception:
0127 err_type, err_value = sys.exc_info()[:2]
0128 err_message = f"failed to put with {err_type}:{err_value} "
0129 err_message += traceback.format_exc()
0130 logging.debug(f"[upload_logs] excepted with:\n {err_message}")
0131
0132 return False
0133
0134
0135 def copy_files_in_dir(src_dir, dst_dir):
0136 for file_name in CONFIG_FILES:
0137 full_file_name = os.path.join(src_dir, file_name)
0138 shutil.copy(full_file_name, dst_dir)
0139
0140
0141 def copy_proxy(source_file, destination_dir):
0142 """
0143 Copy the proxy file and change the permission to user read only
0144 """
0145
0146 filename = os.path.basename(source_file)
0147
0148 destination_file = os.path.join(destination_dir, filename)
0149
0150
0151 if source_file and destination_file:
0152 shutil.copy(source_file, destination_file)
0153 os.chmod(destination_file, 0o400)
0154 return destination_file
0155
0156 return ""
0157
0158
0159 def get_configuration():
0160
0161
0162 tmpdir = os.environ.get("TMPDIR")
0163 if tmpdir:
0164 global WORK_DIR
0165 WORK_DIR = tmpdir
0166
0167 proxy_path_secret = os.environ.get("proxySecretPath")
0168 token_path = os.environ.get("PANDA_AUTH_DIR")
0169 token_filename = os.environ.get("PANDA_AUTH_TOKEN")
0170 token_key_filename = os.environ.get("PANDA_AUTH_TOKEN_KEY")
0171 token_auth_origin = os.environ.get("PANDA_AUTH_ORIGIN")
0172
0173
0174 if not proxy_path_secret and (not token_path or not token_filename or not token_key_filename or not token_auth_origin):
0175 logging.debug("[main] there is no proxy or token specified")
0176 raise Exception("Found no voms proxy or token configuration specified")
0177
0178
0179 proxy_path = None
0180 if proxy_path_secret:
0181 proxy_path = copy_proxy(proxy_path_secret, WORK_DIR)
0182 if not proxy_path:
0183 logging.debug("[main] failed to copy proxies")
0184 raise Exception("Failed to copy proxies")
0185
0186 logging.debug("[main] initialized proxy")
0187
0188 os.environ["X509_USER_PROXY"] = proxy_path
0189
0190
0191 logging.debug(f"[main] token info {token_path} {token_filename} {token_key_filename}")
0192 full_token_path = None
0193 full_token_key_path = None
0194 if token_path and token_filename and token_key_filename:
0195 full_token_path = os.path.join(token_path, token_filename)
0196 copy_proxy(full_token_path, WORK_DIR)
0197
0198 full_token_key_path = os.path.join(token_path, token_key_filename)
0199 copy_proxy(full_token_key_path, WORK_DIR)
0200 logging.debug("[main] initialized pilot-panda tokens")
0201 else:
0202 os.unsetenv("PANDA_AUTH_TOKEN")
0203
0204
0205 panda_site = os.environ.get("computingSite")
0206 logging.debug(f"[main] got panda site: {panda_site}")
0207
0208
0209 panda_queue = os.environ.get("pandaQueueName")
0210 logging.debug(f"[main] got panda queue: {panda_queue}")
0211
0212
0213 resource_type = os.environ.get("resourceType")
0214 logging.debug(f"[main] got resource type: {resource_type}")
0215
0216 prodSourceLabel = os.environ.get("prodSourceLabel")
0217 logging.debug(f"[main] got prodSourceLabel: {prodSourceLabel}")
0218
0219 job_type = os.environ.get("jobType")
0220 logging.debug(f"[main] got job type: {job_type}")
0221
0222 pilot_type = os.environ.get("pilotType", "")
0223 logging.debug(f"[main] got pilotType: {pilot_type}")
0224
0225 pilot_url_option = os.environ.get("pilotUrlOpt", "")
0226 logging.debug(f"[main] got pilotUrlOpt: {pilot_url_option}")
0227
0228 python_option = os.environ.get("pythonOption", "")
0229 logging.debug(f"[main] got pythonOption: {python_option}")
0230
0231 pilot_version = os.environ.get("pilotVersion", "")
0232 logging.debug(f"[main] got pilotVersion: {pilot_version}")
0233
0234
0235 harvester_id = os.environ.get("HARVESTER_ID")
0236 logging.debug(f"[main] got Harvester ID: {harvester_id}")
0237
0238
0239 worker_id = os.environ.get("workerID")
0240 logging.debug(f"[main] got worker ID: {worker_id}")
0241
0242
0243 logs_frontend_w = os.environ.get("logs_frontend_w")
0244 logging.debug("[main] got url to upload logs")
0245
0246
0247 logs_frontend_r = os.environ.get("logs_frontend_r")
0248 logging.debug("[main] got url to download logs")
0249
0250
0251 stdout_name = f"{harvester_id}_{worker_id}_gz.out"
0252
0253 logging.debug("[main] got filename for the stdout log")
0254
0255
0256 submit_mode = os.environ.get("submit_mode")
0257 if not submit_mode:
0258 submit_mode = "PULL"
0259
0260 return Configuration(
0261 proxy_path=proxy_path,
0262 full_token_path=full_token_path,
0263 full_token_key_path=full_token_key_path,
0264 token_auth_origin=token_auth_origin,
0265 panda_site=panda_site,
0266 panda_queue=panda_queue,
0267 resource_type=resource_type,
0268 prod_source_label=prodSourceLabel,
0269 job_type=job_type,
0270 pilot_type=pilot_type,
0271 pilot_url_option=pilot_url_option,
0272 python_option=python_option,
0273 pilot_version=pilot_version,
0274 harvester_id=harvester_id,
0275 worker_id=worker_id,
0276 logs_frontend_w=logs_frontend_w,
0277 logs_frontend_r=logs_frontend_r,
0278 stdout_name=stdout_name,
0279 submit_mode=submit_mode,
0280 )
0281
0282
0283 if __name__ == "__main__":
0284
0285 config = get_configuration()
0286 destination_name = config.stdout_name
0287
0288
0289 log_download_url = f"{config.logs_frontend_r}/{destination_name}"
0290 os.environ["GTAG"] = log_download_url
0291
0292
0293 logging.debug("[main] starting pilot wrapper...")
0294 resource_type_option = ""
0295 if config.resource_type:
0296 resource_type_option = f"--resource-type {config.resource_type}"
0297
0298 if config.prod_source_label:
0299 psl_option = f"-j {config.prod_source_label}"
0300 else:
0301 psl_option = "-j managed"
0302
0303 job_type_option = ""
0304 if config.job_type:
0305 job_type_option = f"--job-type {config.job_type}"
0306
0307 pilot_type_option = "-i PR"
0308 if config.pilot_type:
0309 pilot_type_option = f"-i {config.pilot_type}"
0310
0311 pilot_version_option = "--pilotversion 2"
0312 if config.pilot_version:
0313 pilot_version_option = f"--pilotversion {config.pilot_version}"
0314
0315 wrapper_params = "-q {0} -r {1} -s {2} -a {3} {4} {5} {6} {7} {8} {9} {10}".format(
0316 config.panda_queue,
0317 config.panda_queue,
0318 config.panda_site,
0319 WORK_DIR,
0320 resource_type_option,
0321 psl_option,
0322 pilot_type_option,
0323 job_type_option,
0324 config.pilot_url_option,
0325 config.python_option,
0326 pilot_version_option,
0327 )
0328
0329 if config.submit_mode == "PUSH":
0330
0331
0332 copy_files_in_dir(CONFIG_DIR, WORK_DIR)
0333
0334 wrapper_executable = "/cvmfs/atlas.cern.ch/repo/sw/PandaPilotWrapper/latest/runpilot2-wrapper.sh"
0335 command = "/bin/bash {0} {1} -w generic --pilot-user=ATLAS --url=https://pandaserver.cern.ch -d --harvester-submit-mode={2} --allow-same-user=False".format(
0336 wrapper_executable, wrapper_params, config.submit_mode
0337 )
0338
0339
0340 command += " 2>&1 | tee ${TMPDIR:-/tmp}/wrapper-wid.log; exit ${PIPESTATUS[0]}"
0341
0342 try:
0343 return_code = subprocess.call(command, shell=True)
0344 except BaseException:
0345 logging.error(traceback.format_exc())
0346 return_code = 1
0347
0348 logging.debug(f"[main] pilot wrapper done with return code {return_code} ...")
0349
0350
0351 upload_logs(
0352 config.logs_frontend_w,
0353 WORK_DIR + "/wrapper-wid.log",
0354 destination_name,
0355 config.proxy_path,
0356 config.full_token_path,
0357 config.token_auth_origin,
0358 )
0359 logging.debug("[main] FINISHED")
0360
0361
0362 sys.exit(return_code)