Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:24

0001 import csv
0002 import json
0003 import logging
0004 import re
0005 from io import StringIO
0006 from pathlib import PurePosixPath
0007 
0008 from django.db import OperationalError, ProgrammingError, transaction
0009 from django.utils import timezone
0010 
0011 from .models import EpicProdFile, EpicProdJob
0012 
0013 logger = logging.getLogger(__name__)
0014 
0015 
0016 PSEUDO_DATASETS = {'seq_number', 'pseudo_dataset'}
0017 
0018 
0019 def is_pseudo_panda_file(file_info):
0020     return (
0021         file_info.get('type') == 'pseudo_input'
0022         or file_info.get('dataset') in PSEUDO_DATASETS
0023         or file_info.get('lfn') in {'pseudo_lfn'}
0024     )
0025 
0026 
0027 def _jsonable(value):
0028     return json.loads(json.dumps(value, default=str))
0029 
0030 
0031 def _csv_row(row):
0032     parsed = next(csv.reader(StringIO(row)))
0033     if len(parsed) < 4:
0034         raise ValueError(f'EVGEN csv row has fewer than 4 fields: {row!r}')
0035     return parsed[0], parsed[1], parsed[2], parsed[3]
0036 
0037 
0038 def _payload_names(file_col, ext, chunk, env):
0039     """Mirror the current hepmc3 run.sh naming contract.
0040 
0041     The dispatcher calls run.sh with BASENAME=EVGEN/<file_col>, EXTENSION=<ext>,
0042     EVENTS_PER_TASK=<events>, and ichunk=<chunk>. run.sh derives TASKNAME from
0043     basename(BASENAME)+'.'+chunk and TAG from the EVGEN-relative directory under
0044     DETECTOR_VERSION/DETECTOR_CONFIG[/TAG_PREFIX].
0045     """
0046     basename = f'EVGEN/{file_col}'
0047     task_suffix = f'.{chunk}' if chunk else ''
0048     taskname = f'{PurePosixPath(basename).name}{task_suffix}'
0049 
0050     input_file = f'{basename}.{ext}'
0051     input_dir = str(PurePosixPath(input_file).parent)
0052     if input_dir == '.':
0053         evgen_tag = ''
0054     elif input_dir.startswith('EVGEN/'):
0055         evgen_tag = input_dir[len('EVGEN/'):]
0056     elif input_dir == 'EVGEN':
0057         evgen_tag = ''
0058     else:
0059         evgen_tag = input_dir
0060 
0061     tag_parts = [
0062         env.get('DETECTOR_VERSION') or 'main',
0063         env.get('DETECTOR_CONFIG') or '',
0064     ]
0065     tag_prefix = env.get('TAG_PREFIX') or ''
0066     if tag_prefix:
0067         tag_parts.append(tag_prefix.strip('/'))
0068     if evgen_tag:
0069         tag_parts.append(evgen_tag.strip('/'))
0070     tag = '/'.join(p for p in tag_parts if p)
0071     return {
0072         'basename': basename,
0073         'input_file': input_file,
0074         'taskname': taskname,
0075         'tag': tag,
0076         'evgen_tag': evgen_tag,
0077         'log_dir': f'LOG/{tag}',
0078         'full_dir': f'FULL/{tag}',
0079         'reco_dir': f'RECO/{tag}',
0080     }
0081 
0082 
0083 def build_expected_files_for_task(task, spec=None):
0084     """Return expected ePIC production files for a PCS EVGEN task.
0085 
0086     When ``spec`` is supplied this is definition-only and does not query Rucio
0087     or PanDA. If ``spec`` is omitted, the PCS EVGEN spec is regenerated; that is
0088     intended for operator/agent backfill paths, not page rendering.
0089     """
0090     if spec is None:
0091         from pcs.commands import build_evgen_task_params
0092         spec = build_evgen_task_params(task)
0093     env = spec.get('env') or {}
0094     out = []
0095     jeditaskid = task.panda_task_id
0096     rse = env.get('OUT_RSE') or 'EIC-XRD'
0097 
0098     for job_index, row in enumerate(spec.get('csvRows') or []):
0099         file_col, ext, events, chunk = _csv_row(row)
0100         seq_number = job_index + 1
0101         names = _payload_names(file_col, ext, chunk, env)
0102         input_did = f"/{names['input_file']}"
0103         input_lfn = PurePosixPath(input_did).name
0104         input_dataset = str(PurePosixPath(input_did).parent)
0105 
0106         common = {
0107             'prod_task': task,
0108             'jeditaskid': jeditaskid,
0109             'seq_number': seq_number,
0110             'job_index': job_index,
0111             'source': 'pcs_expected',
0112             'status': 'expected',
0113         }
0114         out.append({
0115             **common,
0116             'role': 'input',
0117             'stage': 'EVGEN',
0118             'scope': 'epic',
0119             'dataset_name': input_dataset,
0120             'did_name': input_did,
0121             'lfn': input_lfn,
0122             'rse_expected': '',
0123             'data': {'csv_row': row, 'events': events, 'chunk': chunk},
0124         })
0125 
0126         if str(env.get('COPYFULL', '')).lower() == 'true':
0127             did = f"/{names['full_dir']}/{names['taskname']}.edm4hep.root"
0128             out.append({
0129                 **common,
0130                 'role': 'output',
0131                 'stage': 'FULL',
0132                 'scope': 'epic',
0133                 'dataset_name': f"/{names['full_dir']}",
0134                 'did_name': did,
0135                 'lfn': PurePosixPath(did).name,
0136                 'rse_expected': rse,
0137                 'data': {'csv_row': row, 'chunk': chunk},
0138             })
0139 
0140         if str(env.get('COPYRECO', '')).lower() == 'true':
0141             did = f"/{names['reco_dir']}/{names['taskname']}.eicrecon.edm4eic.root"
0142             out.append({
0143                 **common,
0144                 'role': 'output',
0145                 'stage': 'RECO',
0146                 'scope': 'epic',
0147                 'dataset_name': f"/{names['reco_dir']}",
0148                 'did_name': did,
0149                 'lfn': PurePosixPath(did).name,
0150                 'rse_expected': rse,
0151                 'data': {'csv_row': row, 'chunk': chunk},
0152             })
0153 
0154         if str(env.get('COPYLOG', '')).lower() == 'true':
0155             # The payload log implementation has changed between per-file logs
0156             # and timestamped tarballs. Keep a dataset-level expectation until
0157             # the exact shipped script emits a stable manifest.
0158             out.append({
0159                 **common,
0160                 'role': 'log',
0161                 'stage': 'LOG',
0162                 'scope': 'epic',
0163                 'dataset_name': f"/{names['log_dir']}",
0164                 'did_name': f"/{names['log_dir']}",
0165                 'lfn': f"{names['taskname']} log outputs",
0166                 'rse_expected': 'EIC-XRD-LOG',
0167                 'data': {
0168                     'csv_row': row,
0169                     'chunk': chunk,
0170                     'dataset_level': True,
0171                     'reason': 'payload log file name may include runtime timestamp',
0172                 },
0173             })
0174     return out
0175 
0176 
0177 @transaction.atomic
0178 def sync_expected_files_for_task(task, spec=None):
0179     expected = build_expected_files_for_task(task, spec=spec)
0180     rows = []
0181     for item in expected:
0182         lookup = {
0183             'prod_task': task,
0184             'source': item['source'],
0185             'role': item['role'],
0186             'stage': item['stage'],
0187             'seq_number': item['seq_number'],
0188             'did_name': item['did_name'],
0189         }
0190         defaults = {k: v for k, v in item.items() if k not in lookup}
0191         obj, _ = EpicProdFile.objects.update_or_create(
0192             **lookup,
0193             defaults=defaults,
0194         )
0195         rows.append(obj)
0196     return rows
0197 
0198 
0199 def _seq_number_from_files(files):
0200     for f in files or []:
0201         if f.get('type') == 'pseudo_input' and str(f.get('lfn') or '').isdigit():
0202             return int(f['lfn'])
0203     for f in files or []:
0204         lfn = f.get('lfn') or ''
0205         m = re.search(r'\.(\d{6})\.log\.tgz$', lfn)
0206         if m:
0207             return int(m.group(1))
0208     return None
0209 
0210 
0211 def _prod_task_for_jeditaskid(jeditaskid):
0212     if not jeditaskid:
0213         return None
0214     try:
0215         from pcs.models import ProdTask
0216         return (ProdTask.objects
0217                 .filter(panda_task_id=int(jeditaskid))
0218                 .select_related('dataset', 'prod_config')
0219                 .first())
0220     except Exception:
0221         logger.exception("PCS lookup failed for JEDI task %s", jeditaskid)
0222         return None
0223 
0224 
0225 def _rucio_conflict_details(text):
0226     if 'DataIdentifierAlreadyExists' not in text and 'File DID already exists' not in text:
0227         return None
0228     detail = 'Rucio file DID already exists'
0229     checksum = re.search(
0230         r'Local checksum\s+([0-9a-fA-F]+)\s+does not match remote checksum\s+([0-9a-fA-F]+)',
0231         text,
0232     )
0233     data = {}
0234     if checksum:
0235         data = {'local_checksum': checksum.group(1), 'remote_checksum': checksum.group(2)}
0236         detail = (
0237             f"Rucio file DID already exists; local checksum {checksum.group(1)} "
0238             f"does not match remote checksum {checksum.group(2)}"
0239         )
0240     return detail, data
0241 
0242 
0243 def _timeline_from_log_text(text):
0244     events = []
0245     if 'Finished processing.' in text:
0246         events.append({'phase': 'reconstruction_complete',
0247                        'message': 'eicrecon finished processing'})
0248     valid = re.search(r'VALID:\s+(\S+\.eicrecon\.edm4eic\.root)', text)
0249     if valid:
0250         events.append({'phase': 'reco_validation_passed',
0251                        'message': 'RECO ROOT file validation passed',
0252                        'path': valid.group(1)})
0253     if 'register_to_rucio.py' in text:
0254         events.append({'phase': 'rucio_registration_attempted',
0255                        'message': 'Payload attempted JLab Rucio registration'})
0256     conflict = _rucio_conflict_details(text)
0257     if conflict:
0258         events.append({'phase': 'rucio_registration_failed',
0259                        'message': conflict[0],
0260                        'details': conflict[1]})
0261     return events
0262 
0263 
0264 def _fetch_job_log_texts(pandaid):
0265     texts = []
0266     try:
0267         from askpanda_atlas.log_analysis_impl import _fetch_log_text
0268         from decouple import config
0269         base_url = config('PANDA_BASE_URL', default='https://pandamon01.sdcc.bnl.gov')
0270         for filename in ('payload.stdout', 'payload.stderr', 'pilotlog.txt'):
0271             try:
0272                 text = _fetch_log_text(pandaid, filename, base_url, timeout=30)
0273             except Exception as exc:
0274                 logger.warning("epicprod inventory log fetch failed for %s/%s: %s",
0275                                pandaid, filename, exc)
0276                 continue
0277             if text:
0278                 texts.append(text)
0279     except Exception as exc:
0280         logger.warning("epicprod inventory log fetch unavailable for %s: %s", pandaid, exc)
0281     return texts
0282 
0283 
0284 def sync_job_from_study_data(study_data):
0285     """Persist epicprod diagnosis from an existing study_job() result."""
0286     job = study_data.get('job') or {}
0287     pandaid = int(study_data.get('pandaid') or job.get('pandaid'))
0288     jeditaskid = job.get('jeditaskid')
0289     files = study_data.get('files') or []
0290     seq_number = _seq_number_from_files(files)
0291     prod_task = _prod_task_for_jeditaskid(jeditaskid)
0292 
0293     if prod_task:
0294         sync_expected_files_for_task(prod_task)
0295 
0296     log_analysis = study_data.get('log_analysis') or {}
0297     log_texts = [log_analysis.get('log_excerpt') or '']
0298     log_texts.extend(_fetch_job_log_texts(pandaid))
0299     combined_log_text = '\n'.join(t for t in log_texts if t)
0300     timeline = _timeline_from_log_text(combined_log_text)
0301     conflict = _rucio_conflict_details(combined_log_text)
0302 
0303     phase = ''
0304     failure_summary = ''
0305     if conflict:
0306         phase = 'rucio_registration_failed'
0307         failure_summary = conflict[0]
0308     elif timeline:
0309         phase = timeline[-1]['phase']
0310     elif job.get('jobstatus') in ('failed', 'closed'):
0311         phase = 'failed'
0312         failure_summary = (job.get('piloterrordiag') or '').strip()
0313 
0314     data = {
0315         'panda': {
0316             k: job.get(k)
0317             for k in (
0318                 'pandaid', 'jeditaskid', 'jobname', 'jobstatus', 'computingsite',
0319                 'creationtime', 'starttime', 'endtime', 'piloterrorcode',
0320                 'piloterrordiag', 'transexitcode', 'noutputdatafiles',
0321                 'outputfilebytes',
0322             )
0323             if k in job
0324         },
0325         'timeline': timeline,
0326         'log_analysis': _jsonable(log_analysis),
0327     }
0328 
0329     epic_job, _ = EpicProdJob.objects.update_or_create(
0330         pandaid=pandaid,
0331         defaults={
0332             'jeditaskid': jeditaskid,
0333             'prod_task': prod_task,
0334             'seq_number': seq_number,
0335             'job_index': seq_number - 1 if seq_number else None,
0336             'status': job.get('jobstatus') or '',
0337             'phase': phase,
0338             'failure_summary': failure_summary,
0339             'data': data,
0340             'last_refreshed_at': timezone.now(),
0341         },
0342     )
0343 
0344     if jeditaskid and seq_number:
0345         EpicProdFile.objects.filter(
0346             jeditaskid=jeditaskid,
0347             seq_number=seq_number,
0348             pandaid__isnull=True,
0349         ).update(
0350             job=epic_job,
0351             pandaid=pandaid,
0352             job_index=seq_number - 1,
0353         )
0354 
0355     if conflict and jeditaskid and seq_number:
0356         detail, detail_data = conflict
0357         for f in EpicProdFile.objects.filter(
0358             jeditaskid=jeditaskid,
0359             seq_number=seq_number,
0360             role='output',
0361             stage='RECO',
0362         ):
0363             f.status = 'conflict'
0364             f.status_detail = detail
0365             merged = dict(f.data or {})
0366             merged['rucio_conflict'] = detail_data
0367             f.data = merged
0368             f.save(update_fields=['status', 'status_detail', 'data', 'updated_at'])
0369 
0370     return epic_job
0371 
0372 
0373 def inventory_for_job_context(study_data):
0374     """Return display context for job pages.
0375 
0376     Safe to call before the migration exists: database table errors produce a
0377     fallback filtered PanDA file list.
0378     """
0379     panda_files = [
0380         f for f in (study_data.get('files') or [])
0381         if not is_pseudo_panda_file(f)
0382     ]
0383     rows = []
0384     epic_job = None
0385     try:
0386         pandaid = int(study_data.get('pandaid') or (study_data.get('job') or {}).get('pandaid'))
0387         epic_job = EpicProdJob.objects.filter(pandaid=pandaid).first()
0388         if epic_job:
0389             for f in EpicProdFile.objects.filter(job=epic_job).order_by('role', 'stage', 'lfn'):
0390                 rows.append({
0391                     'role': f.role,
0392                     'stage': f.stage,
0393                     'scope': f.scope,
0394                     'dataset_name': f.dataset_name,
0395                     'did_name': f.did_name,
0396                     'lfn': f.lfn,
0397                     'size': f.bytes,
0398                     'status': f.status,
0399                     'status_detail': f.status_detail,
0400                     'rse': f.rse_expected,
0401                     'source': f.source,
0402                     'data': f.data or {},
0403                 })
0404     except (OperationalError, ProgrammingError):
0405         return {'epicprod_job': None, 'display_files': _panda_display_rows(panda_files)}
0406 
0407     existing = {(r.get('source'), r.get('lfn'), r.get('did_name')) for r in rows}
0408     for r in _panda_display_rows(panda_files):
0409         key = (r.get('source'), r.get('lfn'), r.get('did_name'))
0410         if key not in existing:
0411             rows.append(r)
0412     return {'epicprod_job': epic_job, 'display_files': rows or _panda_display_rows(panda_files)}
0413 
0414 
0415 def _panda_display_rows(panda_files):
0416     rows = []
0417     for f in panda_files:
0418         rows.append({
0419             'role': 'log' if f.get('type') == 'log' else f.get('type', ''),
0420             'stage': 'PANDA_LOG' if f.get('type') == 'log' else '',
0421             'scope': f.get('scope') or '',
0422             'dataset_name': f.get('dataset') or f.get('destinationdblock') or '',
0423             'did_name': f.get('lfn') or '',
0424             'lfn': f.get('lfn') or '',
0425             'size': f.get('fsize'),
0426             'status': f.get('status') or '',
0427             'status_detail': '',
0428             'rse': '',
0429             'source': 'panda_filestable',
0430             'data': _jsonable(f),
0431         })
0432     return rows