File indexing completed on 2026-06-26 08:40:22
0001
0002 """
0003 Client-API EVGEN submission kernel — our owned reproduction of the proven
0004 condor-side recipe (eic/job_submission_condor submit_panda_api.py, spec only).
0005
0006 Given a PCS-built spec (``--spec spec.json``) and an already-assembled sandbox
0007 directory (``--workdir``), this builds the PanDA taskParamMap, uploads the
0008 sandbox to the PanDA cache, and submits the task with pandaclient under the
0009 caller's PanDA identity. It is run by scripts/submit-evgen-task.py inside a
0010 shell that has sourced the panda-client environment (so ``pandaclient`` and a
0011 valid OIDC token are present); it is not the credentialed orchestrator itself.
0012
0013 The taskParamMap is noInput+noOutput: the containerized payload xrootd-streams
0014 the EVGEN input from JLab and self-registers RECO to JLab Rucio, so PanDA stays
0015 out of the science-data path (docs/JEDI_INTEGRATION.md § single-Rucio
0016 constraint). The %RNDM=0 in the exec becomes a per-job ${SEQNUMBER} that selects
0017 the manifest row.
0018
0019 On success it prints a normalized ``jediTaskID=<N>`` line the orchestrator
0020 parses. Every failure path is surfaced (stderr + non-zero exit); nothing is
0021 swallowed.
0022 """
0023 import argparse
0024 import json
0025 import os
0026 import re
0027 import sys
0028 import tarfile
0029 import tempfile
0030 import uuid
0031
0032 from pandaclient import panda_api
0033 from pandaclient import Client
0034
0035
0036 TRANS_PATH = "https://pandaserver-doma.cern.ch/trf/user/runGen-00-00-02"
0037
0038
0039 def _log(msg):
0040 print(msg, file=sys.stderr, flush=True)
0041
0042
0043 def _source_url():
0044 """sourceURL for ${SURL} substitution, from the client's SSL base URL —
0045 the same extraction the proven recipe does (matches prun)."""
0046 base = getattr(Client, 'baseURLSSL', '')
0047 m = re.search(r'(https?://[^/]+)/', base)
0048 return m.group(1) if m else None
0049
0050
0051 def _upload_sandbox(workdir):
0052 """Tar every file in the sandbox dir and upload it to the PanDA cache,
0053 returning the (possibly de-duplicated) archive name. Mirrors prun --noBuild
0054 / submit_panda_api.py behaviour."""
0055 archive_name = f'jobO.{uuid.uuid4().hex}.tar.gz'
0056 with tempfile.TemporaryDirectory() as tmpdir:
0057 archive_path = os.path.join(tmpdir, archive_name)
0058 _log(f"packing sandbox {workdir} -> {archive_name}")
0059 with tarfile.open(archive_path, 'w:gz') as tar:
0060 for fname in sorted(os.listdir(workdir)):
0061 fpath = os.path.join(workdir, fname)
0062 if os.path.isfile(fpath):
0063 tar.add(fpath, arcname=fname)
0064 _log(f" + {fname}")
0065 old_cwd = os.getcwd()
0066 os.chdir(tmpdir)
0067 try:
0068 status, out = Client.putFile(archive_name, False,
0069 useCacheSrv=False, reuseSandbox=True)
0070 finally:
0071 os.chdir(old_cwd)
0072 if out.startswith("NewFileName:"):
0073 archive_name = out.split(":")[-1]
0074 _log(f"reusing existing sandbox: {archive_name}")
0075 elif out != "True":
0076 _log(f"sandbox upload output: {out}")
0077 if status != 0:
0078 raise RuntimeError(f"sandbox upload failed (status {status})")
0079 else:
0080 _log(f"uploaded sandbox {archive_name}")
0081 return archive_name
0082
0083
0084 def build_task_params(spec, archive_name):
0085 """Assemble the taskParamMap from the PCS spec and the uploaded sandbox."""
0086 params = {
0087 'vo': spec.get('vo', 'epic'),
0088 'site': spec.get('site', 'BNL_OSG_PanDA_1'),
0089 'workingGroup': spec.get('workingGroup', 'EIC'),
0090 'prodSourceLabel': spec.get('prodSourceLabel', 'test'),
0091 'processingType': spec.get('processingType', 'epicproduction'),
0092 'taskType': spec.get('taskType', 'prod'),
0093
0094
0095
0096
0097
0098 'taskPriority': int(spec.get('taskPriority', 900)),
0099 'cloud': spec.get('cloud', spec.get('workingGroup', 'EIC')),
0100 'campaign': spec.get('campaign', ''),
0101 'taskName': spec['outDS'],
0102 'userName': spec.get('userName') or os.environ.get('SWF_TASK_OWNER') or None,
0103 'noInput': True,
0104 'noOutput': True,
0105 'architecture': '',
0106 'transUses': '',
0107 'transHome': None,
0108 'transPath': TRANS_PATH,
0109 'sourceURL': _source_url(),
0110 'coreCount': int(spec.get('nCore', 1)),
0111 'ramCount': int(spec.get('memory', 4096)),
0112 'ramUnit': 'MBPerCore',
0113 'nEvents': int(spec.get('nJobs', 1)),
0114 'nEventsPerJob': int(spec.get('nEventsPerJob', 1)),
0115 'jobParameters': [
0116 {'type': 'constant', 'value': '-j "" --sourceURL ${SURL}'},
0117 {'type': 'constant', 'value': '-r .'},
0118 ],
0119 'multiStepExec': {
0120 'preprocess': {'command': '${TRF}', 'args': '--preprocess ${TRF_ARGS}'},
0121 'postprocess': {'command': '${TRF}', 'args': '--postprocess ${TRF_ARGS}'},
0122 'containerOptions': {
0123 'containerExec': ('echo "=== cat exec script ==="; cat __run_main_exec.sh; '
0124 'echo; echo "=== exec script ==="; /bin/sh __run_main_exec.sh'),
0125 'containerImage': spec.get('containerImage', ''),
0126 },
0127 },
0128 'log': {
0129 'type': 'template',
0130 'param_type': 'log',
0131 'value': f"{spec['outDS']}.$JEDITASKID.${{SN}}.log.tgz",
0132 'dataset': spec['outDS'] + '_log/',
0133 'hidden': True,
0134 },
0135 }
0136
0137 disk = spec.get('disk')
0138 if disk is not None:
0139 params['workDiskCount'] = int(disk)
0140 params['workDiskUnit'] = 'MB'
0141
0142
0143
0144 if spec.get('skipScout'):
0145 params['skipScout'] = True
0146 else:
0147 params['cpuTimeUnit'] = 'HS06sPerEvent'
0148 params['walltime'] = int(float(spec.get('walltimeHours', 2.0)) * 3600)
0149
0150 if spec.get('containerImage'):
0151 params['container_name'] = spec['containerImage']
0152
0153
0154 params['jobParameters'].append({'type': 'constant', 'value': f'-a {archive_name}'})
0155
0156
0157 exec_cmd = spec['exec']
0158 rndm = re.search(r'%RNDM(:|=)(\d+)', exec_cmd)
0159 if rndm:
0160 offset = rndm.group(2)
0161 exec_cmd = re.sub(r'%RNDM(:|=)\d+', '${SEQNUMBER}', exec_cmd)
0162 params['jobParameters'].append({
0163 'type': 'template', 'param_type': 'pseudo_input',
0164 'value': '${SEQNUMBER}', 'dataset': 'seq_number',
0165 'offset': offset, 'hidden': True,
0166 })
0167
0168
0169 encoded = exec_cmd.replace(' ', '%20')
0170 params['jobParameters'].extend([
0171 {'type': 'constant', 'value': '-p "', 'padding': False},
0172 {'type': 'constant', 'value': encoded},
0173 {'type': 'constant', 'value': '"'},
0174 ])
0175 return params
0176
0177
0178 def main():
0179 ap = argparse.ArgumentParser(description="Submit a client-API EVGEN task to PanDA.")
0180 ap.add_argument("--spec", required=True, help="PCS-built spec JSON file")
0181 ap.add_argument("--workdir", default="sandbox",
0182 help="assembled sandbox dir to tar and upload")
0183 args = ap.parse_args()
0184
0185 with open(args.spec) as f:
0186 spec = json.load(f)
0187 if not spec.get('outDS') or not spec.get('exec'):
0188 _log("ERROR: spec missing outDS/exec")
0189 return 2
0190 if not os.path.isdir(args.workdir):
0191 _log(f"ERROR: sandbox dir not found: {args.workdir}")
0192 return 2
0193
0194 try:
0195 archive_name = _upload_sandbox(args.workdir)
0196 except Exception as e:
0197 _log(f"ERROR: sandbox upload failed: {e}")
0198 return 3
0199
0200 params = build_task_params(spec, archive_name)
0201 if not params.get('sourceURL'):
0202 _log("ERROR: could not derive sourceURL from Client.baseURLSSL "
0203 "(no ${SURL} for the payload) — refusing to submit")
0204 return 3
0205 if not params.get('container_name'):
0206 _log("ERROR: no container image in spec — refusing to submit")
0207 return 3
0208 client = panda_api.get_api()
0209 result = client.submit_task(params)
0210 _log(f"submit_task result: {result}")
0211
0212 m = re.search(r'jediTaskID[=:\s]+(\d+)', str(result))
0213 ok = bool(result) and result[0] == 0
0214 if m:
0215
0216 print(f"jediTaskID={m.group(1)}")
0217 if ok and m:
0218 return 0
0219 _log("ERROR: submission did not return a jediTaskID / non-zero status")
0220 return 1
0221
0222
0223 if __name__ == "__main__":
0224 sys.exit(main())