File indexing completed on 2026-06-26 08:40:24
0001 import csv
0002 import json
0003 import logging
0004 import re
0005 from io import StringIO
0006 from pathlib import PurePosixPath
0007
0008 from django.db import OperationalError, ProgrammingError, transaction
0009 from django.utils import timezone
0010
0011 from .models import EpicProdFile, EpicProdJob
0012
0013 logger = logging.getLogger(__name__)
0014
0015
0016 PSEUDO_DATASETS = {'seq_number', 'pseudo_dataset'}
0017
0018
0019 def is_pseudo_panda_file(file_info):
0020 return (
0021 file_info.get('type') == 'pseudo_input'
0022 or file_info.get('dataset') in PSEUDO_DATASETS
0023 or file_info.get('lfn') in {'pseudo_lfn'}
0024 )
0025
0026
0027 def _jsonable(value):
0028 return json.loads(json.dumps(value, default=str))
0029
0030
0031 def _csv_row(row):
0032 parsed = next(csv.reader(StringIO(row)))
0033 if len(parsed) < 4:
0034 raise ValueError(f'EVGEN csv row has fewer than 4 fields: {row!r}')
0035 return parsed[0], parsed[1], parsed[2], parsed[3]
0036
0037
0038 def _payload_names(file_col, ext, chunk, env):
0039 """Mirror the current hepmc3 run.sh naming contract.
0040
0041 The dispatcher calls run.sh with BASENAME=EVGEN/<file_col>, EXTENSION=<ext>,
0042 EVENTS_PER_TASK=<events>, and ichunk=<chunk>. run.sh derives TASKNAME from
0043 basename(BASENAME)+'.'+chunk and TAG from the EVGEN-relative directory under
0044 DETECTOR_VERSION/DETECTOR_CONFIG[/TAG_PREFIX].
0045 """
0046 basename = f'EVGEN/{file_col}'
0047 task_suffix = f'.{chunk}' if chunk else ''
0048 taskname = f'{PurePosixPath(basename).name}{task_suffix}'
0049
0050 input_file = f'{basename}.{ext}'
0051 input_dir = str(PurePosixPath(input_file).parent)
0052 if input_dir == '.':
0053 evgen_tag = ''
0054 elif input_dir.startswith('EVGEN/'):
0055 evgen_tag = input_dir[len('EVGEN/'):]
0056 elif input_dir == 'EVGEN':
0057 evgen_tag = ''
0058 else:
0059 evgen_tag = input_dir
0060
0061 tag_parts = [
0062 env.get('DETECTOR_VERSION') or 'main',
0063 env.get('DETECTOR_CONFIG') or '',
0064 ]
0065 tag_prefix = env.get('TAG_PREFIX') or ''
0066 if tag_prefix:
0067 tag_parts.append(tag_prefix.strip('/'))
0068 if evgen_tag:
0069 tag_parts.append(evgen_tag.strip('/'))
0070 tag = '/'.join(p for p in tag_parts if p)
0071 return {
0072 'basename': basename,
0073 'input_file': input_file,
0074 'taskname': taskname,
0075 'tag': tag,
0076 'evgen_tag': evgen_tag,
0077 'log_dir': f'LOG/{tag}',
0078 'full_dir': f'FULL/{tag}',
0079 'reco_dir': f'RECO/{tag}',
0080 }
0081
0082
0083 def build_expected_files_for_task(task, spec=None):
0084 """Return expected ePIC production files for a PCS EVGEN task.
0085
0086 When ``spec`` is supplied this is definition-only and does not query Rucio
0087 or PanDA. If ``spec`` is omitted, the PCS EVGEN spec is regenerated; that is
0088 intended for operator/agent backfill paths, not page rendering.
0089 """
0090 if spec is None:
0091 from pcs.commands import build_evgen_task_params
0092 spec = build_evgen_task_params(task)
0093 env = spec.get('env') or {}
0094 out = []
0095 jeditaskid = task.panda_task_id
0096 rse = env.get('OUT_RSE') or 'EIC-XRD'
0097
0098 for job_index, row in enumerate(spec.get('csvRows') or []):
0099 file_col, ext, events, chunk = _csv_row(row)
0100 seq_number = job_index + 1
0101 names = _payload_names(file_col, ext, chunk, env)
0102 input_did = f"/{names['input_file']}"
0103 input_lfn = PurePosixPath(input_did).name
0104 input_dataset = str(PurePosixPath(input_did).parent)
0105
0106 common = {
0107 'prod_task': task,
0108 'jeditaskid': jeditaskid,
0109 'seq_number': seq_number,
0110 'job_index': job_index,
0111 'source': 'pcs_expected',
0112 'status': 'expected',
0113 }
0114 out.append({
0115 **common,
0116 'role': 'input',
0117 'stage': 'EVGEN',
0118 'scope': 'epic',
0119 'dataset_name': input_dataset,
0120 'did_name': input_did,
0121 'lfn': input_lfn,
0122 'rse_expected': '',
0123 'data': {'csv_row': row, 'events': events, 'chunk': chunk},
0124 })
0125
0126 if str(env.get('COPYFULL', '')).lower() == 'true':
0127 did = f"/{names['full_dir']}/{names['taskname']}.edm4hep.root"
0128 out.append({
0129 **common,
0130 'role': 'output',
0131 'stage': 'FULL',
0132 'scope': 'epic',
0133 'dataset_name': f"/{names['full_dir']}",
0134 'did_name': did,
0135 'lfn': PurePosixPath(did).name,
0136 'rse_expected': rse,
0137 'data': {'csv_row': row, 'chunk': chunk},
0138 })
0139
0140 if str(env.get('COPYRECO', '')).lower() == 'true':
0141 did = f"/{names['reco_dir']}/{names['taskname']}.eicrecon.edm4eic.root"
0142 out.append({
0143 **common,
0144 'role': 'output',
0145 'stage': 'RECO',
0146 'scope': 'epic',
0147 'dataset_name': f"/{names['reco_dir']}",
0148 'did_name': did,
0149 'lfn': PurePosixPath(did).name,
0150 'rse_expected': rse,
0151 'data': {'csv_row': row, 'chunk': chunk},
0152 })
0153
0154 if str(env.get('COPYLOG', '')).lower() == 'true':
0155
0156
0157
0158 out.append({
0159 **common,
0160 'role': 'log',
0161 'stage': 'LOG',
0162 'scope': 'epic',
0163 'dataset_name': f"/{names['log_dir']}",
0164 'did_name': f"/{names['log_dir']}",
0165 'lfn': f"{names['taskname']} log outputs",
0166 'rse_expected': 'EIC-XRD-LOG',
0167 'data': {
0168 'csv_row': row,
0169 'chunk': chunk,
0170 'dataset_level': True,
0171 'reason': 'payload log file name may include runtime timestamp',
0172 },
0173 })
0174 return out
0175
0176
0177 @transaction.atomic
0178 def sync_expected_files_for_task(task, spec=None):
0179 expected = build_expected_files_for_task(task, spec=spec)
0180 rows = []
0181 for item in expected:
0182 lookup = {
0183 'prod_task': task,
0184 'source': item['source'],
0185 'role': item['role'],
0186 'stage': item['stage'],
0187 'seq_number': item['seq_number'],
0188 'did_name': item['did_name'],
0189 }
0190 defaults = {k: v for k, v in item.items() if k not in lookup}
0191 obj, _ = EpicProdFile.objects.update_or_create(
0192 **lookup,
0193 defaults=defaults,
0194 )
0195 rows.append(obj)
0196 return rows
0197
0198
0199 def _seq_number_from_files(files):
0200 for f in files or []:
0201 if f.get('type') == 'pseudo_input' and str(f.get('lfn') or '').isdigit():
0202 return int(f['lfn'])
0203 for f in files or []:
0204 lfn = f.get('lfn') or ''
0205 m = re.search(r'\.(\d{6})\.log\.tgz$', lfn)
0206 if m:
0207 return int(m.group(1))
0208 return None
0209
0210
0211 def _prod_task_for_jeditaskid(jeditaskid):
0212 if not jeditaskid:
0213 return None
0214 try:
0215 from pcs.models import ProdTask
0216 return (ProdTask.objects
0217 .filter(panda_task_id=int(jeditaskid))
0218 .select_related('dataset', 'prod_config')
0219 .first())
0220 except Exception:
0221 logger.exception("PCS lookup failed for JEDI task %s", jeditaskid)
0222 return None
0223
0224
0225 def _rucio_conflict_details(text):
0226 if 'DataIdentifierAlreadyExists' not in text and 'File DID already exists' not in text:
0227 return None
0228 detail = 'Rucio file DID already exists'
0229 checksum = re.search(
0230 r'Local checksum\s+([0-9a-fA-F]+)\s+does not match remote checksum\s+([0-9a-fA-F]+)',
0231 text,
0232 )
0233 data = {}
0234 if checksum:
0235 data = {'local_checksum': checksum.group(1), 'remote_checksum': checksum.group(2)}
0236 detail = (
0237 f"Rucio file DID already exists; local checksum {checksum.group(1)} "
0238 f"does not match remote checksum {checksum.group(2)}"
0239 )
0240 return detail, data
0241
0242
0243 def _timeline_from_log_text(text):
0244 events = []
0245 if 'Finished processing.' in text:
0246 events.append({'phase': 'reconstruction_complete',
0247 'message': 'eicrecon finished processing'})
0248 valid = re.search(r'VALID:\s+(\S+\.eicrecon\.edm4eic\.root)', text)
0249 if valid:
0250 events.append({'phase': 'reco_validation_passed',
0251 'message': 'RECO ROOT file validation passed',
0252 'path': valid.group(1)})
0253 if 'register_to_rucio.py' in text:
0254 events.append({'phase': 'rucio_registration_attempted',
0255 'message': 'Payload attempted JLab Rucio registration'})
0256 conflict = _rucio_conflict_details(text)
0257 if conflict:
0258 events.append({'phase': 'rucio_registration_failed',
0259 'message': conflict[0],
0260 'details': conflict[1]})
0261 return events
0262
0263
0264 def _fetch_job_log_texts(pandaid):
0265 texts = []
0266 try:
0267 from askpanda_atlas.log_analysis_impl import _fetch_log_text
0268 from decouple import config
0269 base_url = config('PANDA_BASE_URL', default='https://pandamon01.sdcc.bnl.gov')
0270 for filename in ('payload.stdout', 'payload.stderr', 'pilotlog.txt'):
0271 try:
0272 text = _fetch_log_text(pandaid, filename, base_url, timeout=30)
0273 except Exception as exc:
0274 logger.warning("epicprod inventory log fetch failed for %s/%s: %s",
0275 pandaid, filename, exc)
0276 continue
0277 if text:
0278 texts.append(text)
0279 except Exception as exc:
0280 logger.warning("epicprod inventory log fetch unavailable for %s: %s", pandaid, exc)
0281 return texts
0282
0283
0284 def sync_job_from_study_data(study_data):
0285 """Persist epicprod diagnosis from an existing study_job() result."""
0286 job = study_data.get('job') or {}
0287 pandaid = int(study_data.get('pandaid') or job.get('pandaid'))
0288 jeditaskid = job.get('jeditaskid')
0289 files = study_data.get('files') or []
0290 seq_number = _seq_number_from_files(files)
0291 prod_task = _prod_task_for_jeditaskid(jeditaskid)
0292
0293 if prod_task:
0294 sync_expected_files_for_task(prod_task)
0295
0296 log_analysis = study_data.get('log_analysis') or {}
0297 log_texts = [log_analysis.get('log_excerpt') or '']
0298 log_texts.extend(_fetch_job_log_texts(pandaid))
0299 combined_log_text = '\n'.join(t for t in log_texts if t)
0300 timeline = _timeline_from_log_text(combined_log_text)
0301 conflict = _rucio_conflict_details(combined_log_text)
0302
0303 phase = ''
0304 failure_summary = ''
0305 if conflict:
0306 phase = 'rucio_registration_failed'
0307 failure_summary = conflict[0]
0308 elif timeline:
0309 phase = timeline[-1]['phase']
0310 elif job.get('jobstatus') in ('failed', 'closed'):
0311 phase = 'failed'
0312 failure_summary = (job.get('piloterrordiag') or '').strip()
0313
0314 data = {
0315 'panda': {
0316 k: job.get(k)
0317 for k in (
0318 'pandaid', 'jeditaskid', 'jobname', 'jobstatus', 'computingsite',
0319 'creationtime', 'starttime', 'endtime', 'piloterrorcode',
0320 'piloterrordiag', 'transexitcode', 'noutputdatafiles',
0321 'outputfilebytes',
0322 )
0323 if k in job
0324 },
0325 'timeline': timeline,
0326 'log_analysis': _jsonable(log_analysis),
0327 }
0328
0329 epic_job, _ = EpicProdJob.objects.update_or_create(
0330 pandaid=pandaid,
0331 defaults={
0332 'jeditaskid': jeditaskid,
0333 'prod_task': prod_task,
0334 'seq_number': seq_number,
0335 'job_index': seq_number - 1 if seq_number else None,
0336 'status': job.get('jobstatus') or '',
0337 'phase': phase,
0338 'failure_summary': failure_summary,
0339 'data': data,
0340 'last_refreshed_at': timezone.now(),
0341 },
0342 )
0343
0344 if jeditaskid and seq_number:
0345 EpicProdFile.objects.filter(
0346 jeditaskid=jeditaskid,
0347 seq_number=seq_number,
0348 pandaid__isnull=True,
0349 ).update(
0350 job=epic_job,
0351 pandaid=pandaid,
0352 job_index=seq_number - 1,
0353 )
0354
0355 if conflict and jeditaskid and seq_number:
0356 detail, detail_data = conflict
0357 for f in EpicProdFile.objects.filter(
0358 jeditaskid=jeditaskid,
0359 seq_number=seq_number,
0360 role='output',
0361 stage='RECO',
0362 ):
0363 f.status = 'conflict'
0364 f.status_detail = detail
0365 merged = dict(f.data or {})
0366 merged['rucio_conflict'] = detail_data
0367 f.data = merged
0368 f.save(update_fields=['status', 'status_detail', 'data', 'updated_at'])
0369
0370 return epic_job
0371
0372
0373 def inventory_for_job_context(study_data):
0374 """Return display context for job pages.
0375
0376 Safe to call before the migration exists: database table errors produce a
0377 fallback filtered PanDA file list.
0378 """
0379 panda_files = [
0380 f for f in (study_data.get('files') or [])
0381 if not is_pseudo_panda_file(f)
0382 ]
0383 rows = []
0384 epic_job = None
0385 try:
0386 pandaid = int(study_data.get('pandaid') or (study_data.get('job') or {}).get('pandaid'))
0387 epic_job = EpicProdJob.objects.filter(pandaid=pandaid).first()
0388 if epic_job:
0389 for f in EpicProdFile.objects.filter(job=epic_job).order_by('role', 'stage', 'lfn'):
0390 rows.append({
0391 'role': f.role,
0392 'stage': f.stage,
0393 'scope': f.scope,
0394 'dataset_name': f.dataset_name,
0395 'did_name': f.did_name,
0396 'lfn': f.lfn,
0397 'size': f.bytes,
0398 'status': f.status,
0399 'status_detail': f.status_detail,
0400 'rse': f.rse_expected,
0401 'source': f.source,
0402 'data': f.data or {},
0403 })
0404 except (OperationalError, ProgrammingError):
0405 return {'epicprod_job': None, 'display_files': _panda_display_rows(panda_files)}
0406
0407 existing = {(r.get('source'), r.get('lfn'), r.get('did_name')) for r in rows}
0408 for r in _panda_display_rows(panda_files):
0409 key = (r.get('source'), r.get('lfn'), r.get('did_name'))
0410 if key not in existing:
0411 rows.append(r)
0412 return {'epicprod_job': epic_job, 'display_files': rows or _panda_display_rows(panda_files)}
0413
0414
0415 def _panda_display_rows(panda_files):
0416 rows = []
0417 for f in panda_files:
0418 rows.append({
0419 'role': 'log' if f.get('type') == 'log' else f.get('type', ''),
0420 'stage': 'PANDA_LOG' if f.get('type') == 'log' else '',
0421 'scope': f.get('scope') or '',
0422 'dataset_name': f.get('dataset') or f.get('destinationdblock') or '',
0423 'did_name': f.get('lfn') or '',
0424 'lfn': f.get('lfn') or '',
0425 'size': f.get('fsize'),
0426 'status': f.get('status') or '',
0427 'status_detail': '',
0428 'rse': '',
0429 'source': 'panda_filestable',
0430 'data': _jsonable(f),
0431 })
0432 return rows