Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:11

0001 """
0002 Command generation for PCS production tasks.
0003 
0004 Generates Condor (submit_csv.sh) and PanDA (prun) submission commands
0005 from a fully specified ProdTask (Dataset + ProdConfig + overrides).
0006 
0007 Reference repos:
0008 - eic/job_submission_condor — Condor submission framework
0009 - eic/simulation_campaign_hepmc3 — in-container execution pipeline
0010 - eic/simulation_campaign_datasets — CSV input files
0011 """
0012 
0013 
0014 def build_condor_command(task):
0015     """
0016     Build the Condor submit_csv.sh command from a ProdTask.
0017 
0018     Produces the env-var-prefixed command used in the Colab notebook:
0019         EBEAM=... PBEAM=... scripts/submit_csv.sh osg_csv hepmc3 {csv} {hours}
0020     """
0021     ds = task.dataset
0022     cfg = task.get_effective_config()
0023     physics = ds.physics_tag.parameters
0024     data = cfg.get('data') or {}
0025 
0026     env = {}
0027     # Beam energies from physics tag
0028     env['EBEAM'] = str(physics.get('beam_energy_electron', ''))
0029     env['PBEAM'] = str(physics.get('beam_energy_hadron', ''))
0030 
0031     # Detector from dataset
0032     env['DETECTOR_VERSION'] = ds.detector_version
0033     env['DETECTOR_CONFIG'] = ds.detector_config
0034 
0035     # Software stack from config
0036     env['JUG_XL_TAG'] = cfg.get('jug_xl_tag') or ''
0037 
0038     # Output flags
0039     env['COPYRECO'] = 'true' if cfg.get('copy_reco') else 'false'
0040     env['COPYFULL'] = 'true' if cfg.get('copy_full') else 'false'
0041     env['COPYLOG'] = 'true' if cfg.get('copy_log') else 'false'
0042 
0043     if cfg.get('use_rucio'):
0044         env['USERUCIO'] = 'true'
0045         env['X509_USER_PROXY'] = 'secrets/x509_user_proxy'
0046 
0047     # Rucio RSE override
0048     if cfg.get('rucio_rse'):
0049         env['OUT_RSE'] = cfg['rucio_rse']
0050 
0051     # Background mixing (conditional)
0052     if cfg.get('bg_mixing'):
0053         evgen = ds.evgen_tag.parameters
0054         env['SIGNAL_FREQ'] = str(evgen.get('signal_freq', '0'))
0055         env['SIGNAL_STATUS'] = str(evgen.get('signal_status', '0'))
0056         if evgen.get('bg_tag_prefix'):
0057             env['TAG_PREFIX'] = evgen['bg_tag_prefix']
0058         if task.csv_file:
0059             env['CSV_FILE'] = task.csv_file
0060         if evgen.get('bg_files'):
0061             env['BG_FILES'] = evgen['bg_files']
0062 
0063     # Build env string (skip empty values)
0064     env_str = ' \\\n  '.join(f'{k}={v}' for k, v in env.items() if v)
0065 
0066     # Target hours from config (default 2)
0067     target_hours = cfg.get('target_hours_per_job') or 2
0068 
0069     csv = task.csv_file or '<csv_file>'
0070     cmd = f'scripts/submit_csv.sh osg_csv hepmc3 {csv} {target_hours}'
0071 
0072     return f'{env_str} \\\n  {cmd}'
0073 
0074 
0075 def build_panda_command(task):
0076     """
0077     Build the PanDA prun command from a ProdTask.
0078 
0079     Produces prun arguments for PanDA submission. The actual submission
0080     uses PrunScript.main() from pandaclient, but this generates the
0081     equivalent CLI command for reference/execution.
0082     """
0083     ds = task.dataset
0084     cfg = task.get_effective_config()
0085     data = cfg.get('data') or {}
0086 
0087     parts = ['prun']
0088 
0089     # Exec command (the payload)
0090     exec_cmd = data.get('exec_command', '')
0091     if exec_cmd:
0092         parts.append(f'--exec "{exec_cmd}"')
0093 
0094     # Output dataset
0095     parts.append(f'--outDS {ds.did}')
0096 
0097     # Container image
0098     container = cfg.get('container_image') or ''
0099     if container:
0100         parts.append(f'--containerImage {container}')
0101 
0102     # Site and queue
0103     if cfg.get('panda_site'):
0104         parts.append(f'--site {cfg["panda_site"]}')
0105 
0106     # Working group
0107     if cfg.get('panda_working_group'):
0108         parts.append(f'--workingGroup {cfg["panda_working_group"]}')
0109 
0110     # Resource type
0111     if cfg.get('panda_resource_type'):
0112         parts.append(f'--resourceType {cfg["panda_resource_type"]}')
0113 
0114     # Authorization
0115     prod_source = data.get('prod_source_label', 'test')
0116     parts.append(f'--prodSourceLabel {prod_source}')
0117 
0118     # VO
0119     vo = data.get('vo', 'wlcg')
0120     parts.append(f'--vo {vo}')
0121 
0122     # Processing type
0123     if data.get('processing_type'):
0124         parts.append(f'--processingType {data["processing_type"]}')
0125 
0126     # Job/event counts
0127     if data.get('n_jobs'):
0128         parts.append(f'--nJobs {data["n_jobs"]}')
0129     if data.get('events_per_job'):
0130         parts.append(f'--nEventsPerJob {data["events_per_job"]}')
0131 
0132     # Core count
0133     if data.get('corecount'):
0134         parts.append(f'--nCore {data["corecount"]}')
0135 
0136     # Flags
0137     if data.get('no_build'):
0138         parts.append('--noBuild')
0139     if data.get('skip_scout'):
0140         parts.append('--expertOnly_skipScout')
0141 
0142     return ' \\\n  '.join(parts)
0143 
0144 
0145 def _build_env_string(task):
0146     """Shared env-var string for Condor command and JEDI jobParameters."""
0147     ds = task.dataset
0148     cfg = task.get_effective_config()
0149     physics = ds.physics_tag.parameters
0150     evgen = ds.evgen_tag.parameters
0151 
0152     env = {
0153         'EBEAM': str(physics.get('beam_energy_electron', '')),
0154         'PBEAM': str(physics.get('beam_energy_hadron', '')),
0155         'DETECTOR_VERSION': ds.detector_version,
0156         'DETECTOR_CONFIG': ds.detector_config,
0157         'JUG_XL_TAG': cfg.get('jug_xl_tag') or '',
0158         'COPYRECO': 'true' if cfg.get('copy_reco') else 'false',
0159         'COPYFULL': 'true' if cfg.get('copy_full') else 'false',
0160         'COPYLOG': 'true' if cfg.get('copy_log') else 'false',
0161     }
0162     if cfg.get('bg_mixing'):
0163         if evgen.get('signal_freq') is not None:
0164             env['SIGNAL_FREQ'] = str(evgen['signal_freq'])
0165         if evgen.get('signal_status') is not None:
0166             env['SIGNAL_STATUS'] = str(evgen['signal_status'])
0167         if evgen.get('bg_tag_prefix'):
0168             env['TAG_PREFIX'] = evgen['bg_tag_prefix']
0169         if evgen.get('bg_files'):
0170             env['BG_FILES'] = evgen['bg_files']
0171     return ' '.join(f'{k}={v}' for k, v in env.items() if v)
0172 
0173 
0174 def build_task_dump(task):
0175     """
0176     Build a fully-resolved dict describing a ProdTask and everything it
0177     references: dataset, all four tags with parameters, the ProdConfig
0178     as stored, and the effective config (after task-level overrides).
0179 
0180     Suitable for human inspection or downstream tooling. Pure read.
0181     """
0182     ds = task.dataset
0183     cfg = task.prod_config
0184 
0185     def _tag(t):
0186         if t is None:
0187             return None
0188         return {
0189             'tag_label': t.tag_label,
0190             'tag_number': t.tag_number,
0191             'status': t.status,
0192             'description': t.description,
0193             'parameters': dict(t.parameters or {}),
0194             'created_by': t.created_by,
0195             'created_at': t.created_at.isoformat() if t.created_at else None,
0196         }
0197 
0198     def _cfg(c):
0199         if c is None:
0200             return None
0201         out = {}
0202         for f in c._meta.get_fields():
0203             if not hasattr(f, 'attname'):
0204                 continue
0205             if f.name in ('id',):
0206                 continue
0207             val = getattr(c, f.name, None)
0208             if hasattr(val, 'isoformat'):
0209                 val = val.isoformat()
0210             out[f.name] = val
0211         return out
0212 
0213     effective = task.get_effective_config()
0214     for k, v in list(effective.items()):
0215         if hasattr(v, 'isoformat'):
0216             effective[k] = v.isoformat()
0217 
0218     return {
0219         'task': {
0220             'id': task.id,
0221             'name': task.name,
0222             'description': task.description,
0223             'status': task.status,
0224             'csv_file': task.csv_file,
0225             'overrides': task.overrides or {},
0226             'created_by': task.created_by,
0227             'created_at': task.created_at.isoformat() if task.created_at else None,
0228             'updated_at': task.updated_at.isoformat() if task.updated_at else None,
0229         },
0230         'dataset': {
0231             'id': ds.id,
0232             'dataset_name': ds.dataset_name,
0233             'did': ds.did,
0234             'scope': ds.scope,
0235             'detector_version': ds.detector_version,
0236             'detector_config': ds.detector_config,
0237             'blocks': ds.blocks,
0238             'description': ds.description,
0239             'created_by': ds.created_by,
0240             'created_at': ds.created_at.isoformat() if ds.created_at else None,
0241         },
0242         'tags': {
0243             'physics': _tag(ds.physics_tag),
0244             'evgen':   _tag(ds.evgen_tag),
0245             'simu':    _tag(ds.simu_tag),
0246             'reco':    _tag(ds.reco_tag),
0247         },
0248         'prod_config': _cfg(cfg),
0249         'effective_config': effective,
0250     }
0251 
0252 
0253 def build_task_params(task):
0254     """
0255     Build a JEDI ``taskParamMap`` dict from a ProdTask.
0256 
0257     The returned dict can be passed directly to
0258     ``pandaclient.Client.insertTaskParams()`` for JEDI submission.
0259     Pure mapping — no database writes, no network.
0260 
0261     Field mapping follows ``docs/JEDI_INTEGRATION.md``.
0262     """
0263     ds = task.dataset
0264     cfg = task.get_effective_config()
0265     data = cfg.get('data') or {}
0266 
0267     task_name = ds.task_name
0268     working_group = cfg.get('panda_working_group') or 'EIC'
0269 
0270     params = {
0271         # Identity
0272         'taskName': task_name,
0273         'userName': task.created_by,
0274         'vo': data.get('vo', 'eic'),
0275         'workingGroup': working_group,
0276         'campaign': ds.detector_version,
0277 
0278         # Processing
0279         'prodSourceLabel': data.get('prod_source_label', 'test'),
0280         'taskType': data.get('task_type', 'production'),
0281         'processingType': data.get('processing_type', 'epicproduction'),
0282         'taskPriority': data.get('task_priority', 900),
0283 
0284         # Executable (containerized)
0285         'transPath': data.get(
0286             'transformation',
0287             'https://pandaserver-doma.cern.ch/trf/user/runGen-00-00-02',
0288         ),
0289         'transUses': '',
0290         'transHome': '',
0291         'architecture': '',
0292         'container_name': cfg.get('container_image') or '',
0293 
0294         # Splitting (MC generation: noInput=True)
0295         'noInput': True,
0296         'nFilesPerJob': data.get('files_per_job', 1),
0297         'coreCount': data.get('corecount', 1),
0298         'ramCount': data.get('ram_count', 2000),
0299         'ramUnit': 'MBPerCore',
0300 
0301         # Site selection
0302         'site': cfg.get('panda_site') or '',
0303         'cloud': data.get('cloud', working_group),
0304     }
0305 
0306     # Job count — for noInput tasks this drives the number of jobs
0307     if data.get('n_jobs'):
0308         params['nFiles'] = data['n_jobs']
0309     if data.get('events_per_job'):
0310         params['nEventsPerJob'] = data['events_per_job']
0311     if cfg.get('events_per_task'):
0312         params['nEvents'] = cfg['events_per_task']
0313 
0314     # Walltime in seconds (JEDI expects seconds)
0315     hours = cfg.get('target_hours_per_job')
0316     if hours is not None:
0317         params['walltime'] = int(float(hours) * 3600)
0318 
0319     # Flags
0320     if data.get('skip_scout'):
0321         params['skipScout'] = True
0322     if data.get('disable_auto_retry'):
0323         params['disableAutoRetry'] = True
0324     if cfg.get('use_rucio'):
0325         params['useRucio'] = True
0326 
0327     # Log/output dataset templates — use task_name (no .bN suffix)
0328     log_dataset = f'{ds.scope}:{task_name}.log'
0329     out_dataset = f'{ds.scope}:{task_name}'
0330     log_filename = f'{task_name}.log.${{SN}}.log.tgz'
0331     params['log'] = {
0332         'dataset': log_dataset,
0333         'type': 'template',
0334         'param_type': 'log',
0335         'token': 'local',
0336         'destination': 'local',
0337         'value': log_filename,
0338     }
0339 
0340     # jobParameters: env + exec command, then output template
0341     env_str = _build_env_string(task)
0342     exec_cmd = data.get('exec_command') or './run.sh'
0343     constant_value = f'{env_str} {exec_cmd}' if env_str else exec_cmd
0344     output_filename = f'{task_name}.${{SN}}.root'
0345     params['jobParameters'] = [
0346         {
0347             'type': 'constant',
0348             'value': constant_value,
0349         },
0350         {
0351             'type': 'template',
0352             'param_type': 'output',
0353             'token': 'local',
0354             'destination': 'local',
0355             'dataset': out_dataset,
0356             'value': output_filename,
0357             'offset': data.get('output_offset', 1000),
0358         },
0359     ]
0360 
0361     return params