Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:22

0001 #!/usr/bin/env python3
0002 """
0003 Client-API EVGEN submission kernel — our owned reproduction of the proven
0004 condor-side recipe (eic/job_submission_condor submit_panda_api.py, spec only).
0005 
0006 Given a PCS-built spec (``--spec spec.json``) and an already-assembled sandbox
0007 directory (``--workdir``), this builds the PanDA taskParamMap, uploads the
0008 sandbox to the PanDA cache, and submits the task with pandaclient under the
0009 caller's PanDA identity. It is run by scripts/submit-evgen-task.py inside a
0010 shell that has sourced the panda-client environment (so ``pandaclient`` and a
0011 valid OIDC token are present); it is not the credentialed orchestrator itself.
0012 
0013 The taskParamMap is noInput+noOutput: the containerized payload xrootd-streams
0014 the EVGEN input from JLab and self-registers RECO to JLab Rucio, so PanDA stays
0015 out of the science-data path (docs/JEDI_INTEGRATION.md § single-Rucio
0016 constraint). The %RNDM=0 in the exec becomes a per-job ${SEQNUMBER} that selects
0017 the manifest row.
0018 
0019 On success it prints a normalized ``jediTaskID=<N>`` line the orchestrator
0020 parses. Every failure path is surfaced (stderr + non-zero exit); nothing is
0021 swallowed.
0022 """
0023 import argparse
0024 import json
0025 import os
0026 import re
0027 import sys
0028 import tarfile
0029 import tempfile
0030 import uuid
0031 
0032 from pandaclient import panda_api
0033 from pandaclient import Client
0034 
0035 # The generic-payload TRF that runGen wraps; same value the proven recipe uses.
0036 TRANS_PATH = "https://pandaserver-doma.cern.ch/trf/user/runGen-00-00-02"
0037 
0038 
0039 def _log(msg):
0040     print(msg, file=sys.stderr, flush=True)
0041 
0042 
0043 def _source_url():
0044     """sourceURL for ${SURL} substitution, from the client's SSL base URL —
0045     the same extraction the proven recipe does (matches prun)."""
0046     base = getattr(Client, 'baseURLSSL', '')
0047     m = re.search(r'(https?://[^/]+)/', base)
0048     return m.group(1) if m else None
0049 
0050 
0051 def _upload_sandbox(workdir):
0052     """Tar every file in the sandbox dir and upload it to the PanDA cache,
0053     returning the (possibly de-duplicated) archive name. Mirrors prun --noBuild
0054     / submit_panda_api.py behaviour."""
0055     archive_name = f'jobO.{uuid.uuid4().hex}.tar.gz'
0056     with tempfile.TemporaryDirectory() as tmpdir:
0057         archive_path = os.path.join(tmpdir, archive_name)
0058         _log(f"packing sandbox {workdir} -> {archive_name}")
0059         with tarfile.open(archive_path, 'w:gz') as tar:
0060             for fname in sorted(os.listdir(workdir)):
0061                 fpath = os.path.join(workdir, fname)
0062                 if os.path.isfile(fpath):
0063                     tar.add(fpath, arcname=fname)   # flat: name only, no path
0064                     _log(f"  + {fname}")
0065         old_cwd = os.getcwd()
0066         os.chdir(tmpdir)
0067         try:
0068             status, out = Client.putFile(archive_name, False,
0069                                          useCacheSrv=False, reuseSandbox=True)
0070         finally:
0071             os.chdir(old_cwd)
0072     if out.startswith("NewFileName:"):
0073         archive_name = out.split(":")[-1]
0074         _log(f"reusing existing sandbox: {archive_name}")
0075     elif out != "True":
0076         _log(f"sandbox upload output: {out}")
0077         if status != 0:
0078             raise RuntimeError(f"sandbox upload failed (status {status})")
0079     else:
0080         _log(f"uploaded sandbox {archive_name}")
0081     return archive_name
0082 
0083 
0084 def build_task_params(spec, archive_name):
0085     """Assemble the taskParamMap from the PCS spec and the uploaded sandbox."""
0086     params = {
0087         'vo': spec.get('vo', 'epic'),
0088         'site': spec.get('site', 'BNL_OSG_PanDA_1'),
0089         'workingGroup': spec.get('workingGroup', 'EIC'),
0090         'prodSourceLabel': spec.get('prodSourceLabel', 'test'),
0091         'processingType': spec.get('processingType', 'epicproduction'),
0092         'taskType': spec.get('taskType', 'prod'),
0093         # Producer envelope from commands.py (the proven 36439 path). A prod-role
0094         # submission gets NO server-side defaults — insertTaskParamsPanda only
0095         # fills userName/taskType/taskPriority on the non-prodRole branch — so a
0096         # producer must supply them itself. The omitted taskPriority is precisely
0097         # what broke this Sakib-derived user-mode template under our prod token.
0098         'taskPriority': int(spec.get('taskPriority', 900)),
0099         'cloud': spec.get('cloud', spec.get('workingGroup', 'EIC')),
0100         'campaign': spec.get('campaign', ''),
0101         'taskName': spec['outDS'],
0102         'userName': spec.get('userName') or os.environ.get('SWF_TASK_OWNER') or None,
0103         'noInput': True,                 # payload stages its own EVGEN input
0104         'noOutput': True,                # payload self-registers RECO to JLab
0105         'architecture': '',
0106         'transUses': '',
0107         'transHome': None,
0108         'transPath': TRANS_PATH,
0109         'sourceURL': _source_url(),
0110         'coreCount': int(spec.get('nCore', 1)),
0111         'ramCount': int(spec.get('memory', 4096)),
0112         'ramUnit': 'MBPerCore',                      # producer envelope (commands.py)
0113         'nEvents': int(spec.get('nJobs', 1)),       # one job per manifest row
0114         'nEventsPerJob': int(spec.get('nEventsPerJob', 1)),
0115         'jobParameters': [
0116             {'type': 'constant', 'value': '-j "" --sourceURL ${SURL}'},
0117             {'type': 'constant', 'value': '-r .'},
0118         ],
0119         'multiStepExec': {
0120             'preprocess': {'command': '${TRF}', 'args': '--preprocess ${TRF_ARGS}'},
0121             'postprocess': {'command': '${TRF}', 'args': '--postprocess ${TRF_ARGS}'},
0122             'containerOptions': {
0123                 'containerExec': ('echo "=== cat exec script ==="; cat __run_main_exec.sh; '
0124                                   'echo; echo "=== exec script ==="; /bin/sh __run_main_exec.sh'),
0125                 'containerImage': spec.get('containerImage', ''),
0126             },
0127         },
0128         'log': {
0129             'type': 'template',
0130             'param_type': 'log',
0131             'value': f"{spec['outDS']}.$JEDITASKID.${{SN}}.log.tgz",
0132             'dataset': spec['outDS'] + '_log/',
0133             'hidden': True,
0134         },
0135     }
0136 
0137     disk = spec.get('disk')
0138     if disk is not None:
0139         params['workDiskCount'] = int(disk)
0140         params['workDiskUnit'] = 'MB'
0141 
0142     # Scouts off -> walltime used directly; scouts on -> HS06 per-event routing
0143     # (avoids the noInput pseudo-input 1MB-file walltime inflation).
0144     if spec.get('skipScout'):
0145         params['skipScout'] = True
0146     else:
0147         params['cpuTimeUnit'] = 'HS06sPerEvent'
0148     params['walltime'] = int(float(spec.get('walltimeHours', 2.0)) * 3600)
0149 
0150     if spec.get('containerImage'):
0151         params['container_name'] = spec['containerImage']
0152 
0153     # -a <sandbox>
0154     params['jobParameters'].append({'type': 'constant', 'value': f'-a {archive_name}'})
0155 
0156     # %RNDM=X -> ${SEQNUMBER}; add the pseudo_input template (one job per row).
0157     exec_cmd = spec['exec']
0158     rndm = re.search(r'%RNDM(:|=)(\d+)', exec_cmd)
0159     if rndm:
0160         offset = rndm.group(2)
0161         exec_cmd = re.sub(r'%RNDM(:|=)\d+', '${SEQNUMBER}', exec_cmd)
0162         params['jobParameters'].append({
0163             'type': 'template', 'param_type': 'pseudo_input',
0164             'value': '${SEQNUMBER}', 'dataset': 'seq_number',
0165             'offset': offset, 'hidden': True,
0166         })
0167 
0168     # -p "<url-encoded exec>"
0169     encoded = exec_cmd.replace(' ', '%20')
0170     params['jobParameters'].extend([
0171         {'type': 'constant', 'value': '-p "', 'padding': False},
0172         {'type': 'constant', 'value': encoded},
0173         {'type': 'constant', 'value': '"'},
0174     ])
0175     return params
0176 
0177 
0178 def main():
0179     ap = argparse.ArgumentParser(description="Submit a client-API EVGEN task to PanDA.")
0180     ap.add_argument("--spec", required=True, help="PCS-built spec JSON file")
0181     ap.add_argument("--workdir", default="sandbox",
0182                     help="assembled sandbox dir to tar and upload")
0183     args = ap.parse_args()
0184 
0185     with open(args.spec) as f:
0186         spec = json.load(f)
0187     if not spec.get('outDS') or not spec.get('exec'):
0188         _log("ERROR: spec missing outDS/exec")
0189         return 2
0190     if not os.path.isdir(args.workdir):
0191         _log(f"ERROR: sandbox dir not found: {args.workdir}")
0192         return 2
0193 
0194     try:
0195         archive_name = _upload_sandbox(args.workdir)
0196     except Exception as e:
0197         _log(f"ERROR: sandbox upload failed: {e}")
0198         return 3
0199 
0200     params = build_task_params(spec, archive_name)
0201     if not params.get('sourceURL'):
0202         _log("ERROR: could not derive sourceURL from Client.baseURLSSL "
0203              "(no ${SURL} for the payload) — refusing to submit")
0204         return 3
0205     if not params.get('container_name'):
0206         _log("ERROR: no container image in spec — refusing to submit")
0207         return 3
0208     client = panda_api.get_api()
0209     result = client.submit_task(params)
0210     _log(f"submit_task result: {result}")
0211 
0212     m = re.search(r'jediTaskID[=:\s]+(\d+)', str(result))
0213     ok = bool(result) and result[0] == 0
0214     if m:
0215         # Normalized line the orchestrator parses.
0216         print(f"jediTaskID={m.group(1)}")
0217     if ok and m:
0218         return 0
0219     _log("ERROR: submission did not return a jediTaskID / non-zero status")
0220     return 1
0221 
0222 
0223 if __name__ == "__main__":
0224     sys.exit(main())