File indexing completed on 2026-04-25 08:29:11
0001 """
0002 Command generation for PCS production tasks.
0003
0004 Generates Condor (submit_csv.sh) and PanDA (prun) submission commands
0005 from a fully specified ProdTask (Dataset + ProdConfig + overrides).
0006
0007 Reference repos:
0008 - eic/job_submission_condor — Condor submission framework
0009 - eic/simulation_campaign_hepmc3 — in-container execution pipeline
0010 - eic/simulation_campaign_datasets — CSV input files
0011 """
0012
0013
0014 def build_condor_command(task):
0015 """
0016 Build the Condor submit_csv.sh command from a ProdTask.
0017
0018 Produces the env-var-prefixed command used in the Colab notebook:
0019 EBEAM=... PBEAM=... scripts/submit_csv.sh osg_csv hepmc3 {csv} {hours}
0020 """
0021 ds = task.dataset
0022 cfg = task.get_effective_config()
0023 physics = ds.physics_tag.parameters
0024 data = cfg.get('data') or {}
0025
0026 env = {}
0027
0028 env['EBEAM'] = str(physics.get('beam_energy_electron', ''))
0029 env['PBEAM'] = str(physics.get('beam_energy_hadron', ''))
0030
0031
0032 env['DETECTOR_VERSION'] = ds.detector_version
0033 env['DETECTOR_CONFIG'] = ds.detector_config
0034
0035
0036 env['JUG_XL_TAG'] = cfg.get('jug_xl_tag') or ''
0037
0038
0039 env['COPYRECO'] = 'true' if cfg.get('copy_reco') else 'false'
0040 env['COPYFULL'] = 'true' if cfg.get('copy_full') else 'false'
0041 env['COPYLOG'] = 'true' if cfg.get('copy_log') else 'false'
0042
0043 if cfg.get('use_rucio'):
0044 env['USERUCIO'] = 'true'
0045 env['X509_USER_PROXY'] = 'secrets/x509_user_proxy'
0046
0047
0048 if cfg.get('rucio_rse'):
0049 env['OUT_RSE'] = cfg['rucio_rse']
0050
0051
0052 if cfg.get('bg_mixing'):
0053 evgen = ds.evgen_tag.parameters
0054 env['SIGNAL_FREQ'] = str(evgen.get('signal_freq', '0'))
0055 env['SIGNAL_STATUS'] = str(evgen.get('signal_status', '0'))
0056 if evgen.get('bg_tag_prefix'):
0057 env['TAG_PREFIX'] = evgen['bg_tag_prefix']
0058 if task.csv_file:
0059 env['CSV_FILE'] = task.csv_file
0060 if evgen.get('bg_files'):
0061 env['BG_FILES'] = evgen['bg_files']
0062
0063
0064 env_str = ' \\\n '.join(f'{k}={v}' for k, v in env.items() if v)
0065
0066
0067 target_hours = cfg.get('target_hours_per_job') or 2
0068
0069 csv = task.csv_file or '<csv_file>'
0070 cmd = f'scripts/submit_csv.sh osg_csv hepmc3 {csv} {target_hours}'
0071
0072 return f'{env_str} \\\n {cmd}'
0073
0074
0075 def build_panda_command(task):
0076 """
0077 Build the PanDA prun command from a ProdTask.
0078
0079 Produces prun arguments for PanDA submission. The actual submission
0080 uses PrunScript.main() from pandaclient, but this generates the
0081 equivalent CLI command for reference/execution.
0082 """
0083 ds = task.dataset
0084 cfg = task.get_effective_config()
0085 data = cfg.get('data') or {}
0086
0087 parts = ['prun']
0088
0089
0090 exec_cmd = data.get('exec_command', '')
0091 if exec_cmd:
0092 parts.append(f'--exec "{exec_cmd}"')
0093
0094
0095 parts.append(f'--outDS {ds.did}')
0096
0097
0098 container = cfg.get('container_image') or ''
0099 if container:
0100 parts.append(f'--containerImage {container}')
0101
0102
0103 if cfg.get('panda_site'):
0104 parts.append(f'--site {cfg["panda_site"]}')
0105
0106
0107 if cfg.get('panda_working_group'):
0108 parts.append(f'--workingGroup {cfg["panda_working_group"]}')
0109
0110
0111 if cfg.get('panda_resource_type'):
0112 parts.append(f'--resourceType {cfg["panda_resource_type"]}')
0113
0114
0115 prod_source = data.get('prod_source_label', 'test')
0116 parts.append(f'--prodSourceLabel {prod_source}')
0117
0118
0119 vo = data.get('vo', 'wlcg')
0120 parts.append(f'--vo {vo}')
0121
0122
0123 if data.get('processing_type'):
0124 parts.append(f'--processingType {data["processing_type"]}')
0125
0126
0127 if data.get('n_jobs'):
0128 parts.append(f'--nJobs {data["n_jobs"]}')
0129 if data.get('events_per_job'):
0130 parts.append(f'--nEventsPerJob {data["events_per_job"]}')
0131
0132
0133 if data.get('corecount'):
0134 parts.append(f'--nCore {data["corecount"]}')
0135
0136
0137 if data.get('no_build'):
0138 parts.append('--noBuild')
0139 if data.get('skip_scout'):
0140 parts.append('--expertOnly_skipScout')
0141
0142 return ' \\\n '.join(parts)
0143
0144
0145 def _build_env_string(task):
0146 """Shared env-var string for Condor command and JEDI jobParameters."""
0147 ds = task.dataset
0148 cfg = task.get_effective_config()
0149 physics = ds.physics_tag.parameters
0150 evgen = ds.evgen_tag.parameters
0151
0152 env = {
0153 'EBEAM': str(physics.get('beam_energy_electron', '')),
0154 'PBEAM': str(physics.get('beam_energy_hadron', '')),
0155 'DETECTOR_VERSION': ds.detector_version,
0156 'DETECTOR_CONFIG': ds.detector_config,
0157 'JUG_XL_TAG': cfg.get('jug_xl_tag') or '',
0158 'COPYRECO': 'true' if cfg.get('copy_reco') else 'false',
0159 'COPYFULL': 'true' if cfg.get('copy_full') else 'false',
0160 'COPYLOG': 'true' if cfg.get('copy_log') else 'false',
0161 }
0162 if cfg.get('bg_mixing'):
0163 if evgen.get('signal_freq') is not None:
0164 env['SIGNAL_FREQ'] = str(evgen['signal_freq'])
0165 if evgen.get('signal_status') is not None:
0166 env['SIGNAL_STATUS'] = str(evgen['signal_status'])
0167 if evgen.get('bg_tag_prefix'):
0168 env['TAG_PREFIX'] = evgen['bg_tag_prefix']
0169 if evgen.get('bg_files'):
0170 env['BG_FILES'] = evgen['bg_files']
0171 return ' '.join(f'{k}={v}' for k, v in env.items() if v)
0172
0173
0174 def build_task_dump(task):
0175 """
0176 Build a fully-resolved dict describing a ProdTask and everything it
0177 references: dataset, all four tags with parameters, the ProdConfig
0178 as stored, and the effective config (after task-level overrides).
0179
0180 Suitable for human inspection or downstream tooling. Pure read.
0181 """
0182 ds = task.dataset
0183 cfg = task.prod_config
0184
0185 def _tag(t):
0186 if t is None:
0187 return None
0188 return {
0189 'tag_label': t.tag_label,
0190 'tag_number': t.tag_number,
0191 'status': t.status,
0192 'description': t.description,
0193 'parameters': dict(t.parameters or {}),
0194 'created_by': t.created_by,
0195 'created_at': t.created_at.isoformat() if t.created_at else None,
0196 }
0197
0198 def _cfg(c):
0199 if c is None:
0200 return None
0201 out = {}
0202 for f in c._meta.get_fields():
0203 if not hasattr(f, 'attname'):
0204 continue
0205 if f.name in ('id',):
0206 continue
0207 val = getattr(c, f.name, None)
0208 if hasattr(val, 'isoformat'):
0209 val = val.isoformat()
0210 out[f.name] = val
0211 return out
0212
0213 effective = task.get_effective_config()
0214 for k, v in list(effective.items()):
0215 if hasattr(v, 'isoformat'):
0216 effective[k] = v.isoformat()
0217
0218 return {
0219 'task': {
0220 'id': task.id,
0221 'name': task.name,
0222 'description': task.description,
0223 'status': task.status,
0224 'csv_file': task.csv_file,
0225 'overrides': task.overrides or {},
0226 'created_by': task.created_by,
0227 'created_at': task.created_at.isoformat() if task.created_at else None,
0228 'updated_at': task.updated_at.isoformat() if task.updated_at else None,
0229 },
0230 'dataset': {
0231 'id': ds.id,
0232 'dataset_name': ds.dataset_name,
0233 'did': ds.did,
0234 'scope': ds.scope,
0235 'detector_version': ds.detector_version,
0236 'detector_config': ds.detector_config,
0237 'blocks': ds.blocks,
0238 'description': ds.description,
0239 'created_by': ds.created_by,
0240 'created_at': ds.created_at.isoformat() if ds.created_at else None,
0241 },
0242 'tags': {
0243 'physics': _tag(ds.physics_tag),
0244 'evgen': _tag(ds.evgen_tag),
0245 'simu': _tag(ds.simu_tag),
0246 'reco': _tag(ds.reco_tag),
0247 },
0248 'prod_config': _cfg(cfg),
0249 'effective_config': effective,
0250 }
0251
0252
0253 def build_task_params(task):
0254 """
0255 Build a JEDI ``taskParamMap`` dict from a ProdTask.
0256
0257 The returned dict can be passed directly to
0258 ``pandaclient.Client.insertTaskParams()`` for JEDI submission.
0259 Pure mapping — no database writes, no network.
0260
0261 Field mapping follows ``docs/JEDI_INTEGRATION.md``.
0262 """
0263 ds = task.dataset
0264 cfg = task.get_effective_config()
0265 data = cfg.get('data') or {}
0266
0267 task_name = ds.task_name
0268 working_group = cfg.get('panda_working_group') or 'EIC'
0269
0270 params = {
0271
0272 'taskName': task_name,
0273 'userName': task.created_by,
0274 'vo': data.get('vo', 'eic'),
0275 'workingGroup': working_group,
0276 'campaign': ds.detector_version,
0277
0278
0279 'prodSourceLabel': data.get('prod_source_label', 'test'),
0280 'taskType': data.get('task_type', 'production'),
0281 'processingType': data.get('processing_type', 'epicproduction'),
0282 'taskPriority': data.get('task_priority', 900),
0283
0284
0285 'transPath': data.get(
0286 'transformation',
0287 'https://pandaserver-doma.cern.ch/trf/user/runGen-00-00-02',
0288 ),
0289 'transUses': '',
0290 'transHome': '',
0291 'architecture': '',
0292 'container_name': cfg.get('container_image') or '',
0293
0294
0295 'noInput': True,
0296 'nFilesPerJob': data.get('files_per_job', 1),
0297 'coreCount': data.get('corecount', 1),
0298 'ramCount': data.get('ram_count', 2000),
0299 'ramUnit': 'MBPerCore',
0300
0301
0302 'site': cfg.get('panda_site') or '',
0303 'cloud': data.get('cloud', working_group),
0304 }
0305
0306
0307 if data.get('n_jobs'):
0308 params['nFiles'] = data['n_jobs']
0309 if data.get('events_per_job'):
0310 params['nEventsPerJob'] = data['events_per_job']
0311 if cfg.get('events_per_task'):
0312 params['nEvents'] = cfg['events_per_task']
0313
0314
0315 hours = cfg.get('target_hours_per_job')
0316 if hours is not None:
0317 params['walltime'] = int(float(hours) * 3600)
0318
0319
0320 if data.get('skip_scout'):
0321 params['skipScout'] = True
0322 if data.get('disable_auto_retry'):
0323 params['disableAutoRetry'] = True
0324 if cfg.get('use_rucio'):
0325 params['useRucio'] = True
0326
0327
0328 log_dataset = f'{ds.scope}:{task_name}.log'
0329 out_dataset = f'{ds.scope}:{task_name}'
0330 log_filename = f'{task_name}.log.${{SN}}.log.tgz'
0331 params['log'] = {
0332 'dataset': log_dataset,
0333 'type': 'template',
0334 'param_type': 'log',
0335 'token': 'local',
0336 'destination': 'local',
0337 'value': log_filename,
0338 }
0339
0340
0341 env_str = _build_env_string(task)
0342 exec_cmd = data.get('exec_command') or './run.sh'
0343 constant_value = f'{env_str} {exec_cmd}' if env_str else exec_cmd
0344 output_filename = f'{task_name}.${{SN}}.root'
0345 params['jobParameters'] = [
0346 {
0347 'type': 'constant',
0348 'value': constant_value,
0349 },
0350 {
0351 'type': 'template',
0352 'param_type': 'output',
0353 'token': 'local',
0354 'destination': 'local',
0355 'dataset': out_dataset,
0356 'value': output_filename,
0357 'offset': data.get('output_offset', 1000),
0358 },
0359 ]
0360
0361 return params