Warning, file /swf-monitor/src/pcs/services.py was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001 """
0002 PCS business logic — single source of truth for intake, linkage,
0003 lifecycle, and submission state changes.
0004
0005 Both REST viewset actions and MCP tools call these functions. Functions
0006 take plain Python types (no HTTP request, no MCP context) and return
0007 model instances or raise ServiceError. The caller translates errors
0008 to its native shape (DRF Response, MCP error dict).
0009 """
0010 import csv as _csv
0011 import hashlib as _hashlib
0012 import logging as _logging
0013 import re as _re
0014
0015 from django.db import transaction
0016
0017 _log = _logging.getLogger(__name__)
0018
0019 from .models import (
0020 Dataset, ProdConfig, ProdTask,
0021 Campaign, ProdRequest,
0022 PhysicsTag, EvgenTag, SimuTag, RecoTag,
0023 )
0024
0025
0026 class ServiceError(Exception):
0027 """Domain error with an HTTP-shaped status hint and detail message."""
0028 def __init__(self, detail, status=400):
0029 self.detail = detail
0030 self.status = status
0031 super().__init__(detail)
0032
0033
0034
0035
0036
0037 PRODTASK_TRANSITIONS = {
0038 'draft': {'ready'},
0039 'ready': {'draft', 'submitted'},
0040 'submitted': {'completed', 'failed'},
0041 'completed': set(),
0042 'failed': set(),
0043 }
0044
0045 PUBLIC_CATALOG_KEYS = (
0046 'public_catalog_repo', 'public_catalog_issue', 'public_catalog_pr',
0047 'public_catalog_row_index', 'public_catalog_csv_path',
0048 'public_catalog_row_key', 'public_catalog_page_url',
0049 'public_catalog_commit_sha',
0050 )
0051
0052
0053
0054
0055
0056 REQUEST_TO_TASK_COPY_FIELDS = (
0057 'requestor', 'priority',
0058 'pre_tdr_use', 'early_science_use', 'other_use', 'new_request',
0059 )
0060
0061
0062
0063
0064
0065
0066 def dataset_intake(*, source_location, source_kind='csv_manifest',
0067 scope='group.EIC.evgen', stage='evgen',
0068 detector_version=None, detector_config=None,
0069 physics_tag_label=None, evgen_tag_label=None,
0070 simu_tag_label=None, reco_tag_label=None,
0071 description='', created_by):
0072 """
0073 Idempotent intake of an external (e.g. EVGEN CSV manifest) Dataset.
0074
0075 Idempotency key: ``(source.kind, source.location)``.
0076
0077 Returns ``(dataset, was_created)``.
0078 """
0079 if not source_location:
0080 raise ServiceError('source_location is required')
0081
0082 existing = Dataset.objects.filter(
0083 metadata__source__location=source_location,
0084 metadata__source__kind=source_kind,
0085 ).first()
0086 if existing:
0087 return existing, False
0088
0089 required = {
0090 'detector_version': detector_version,
0091 'detector_config': detector_config,
0092 'physics_tag': physics_tag_label,
0093 'evgen_tag': evgen_tag_label,
0094 'simu_tag': simu_tag_label,
0095 'reco_tag': reco_tag_label,
0096 }
0097 missing = [k for k, v in required.items() if not v]
0098 if missing:
0099 raise ServiceError(
0100 f'No existing Dataset for {source_kind}:{source_location}; '
0101 f'creation requires: {", ".join(missing)}'
0102 )
0103
0104 tag_specs = {
0105 'physics_tag': (PhysicsTag, physics_tag_label),
0106 'evgen_tag': (EvgenTag, evgen_tag_label),
0107 'simu_tag': (SimuTag, simu_tag_label),
0108 'reco_tag': (RecoTag, reco_tag_label),
0109 }
0110 tags = {}
0111 for field, (model, label) in tag_specs.items():
0112 tag = model.objects.filter(tag_label=label).first()
0113 if not tag:
0114 raise ServiceError(f'{field} not found: {label}')
0115 if tag.status != 'locked':
0116 raise ServiceError(f'{field} {label} must be locked before use')
0117 tags[field] = tag
0118
0119 ds = Dataset(
0120 scope=scope,
0121 detector_version=detector_version,
0122 detector_config=detector_config,
0123 description=description,
0124 metadata={
0125 'stage': stage,
0126 'source': {'kind': source_kind, 'location': source_location},
0127 },
0128 created_by=created_by,
0129 **tags,
0130 )
0131 try:
0132 ds.save()
0133 except Exception as e:
0134 raise ServiceError(str(e))
0135 return ds, True
0136
0137
0138
0139
0140
0141
0142 def prodtask_intake(*, payload, created_by):
0143 """
0144 Idempotent intake of a draft ProdTask.
0145
0146 Idempotency key (one of, in payload):
0147 public_catalog_issue (preferred), or
0148 (public_catalog_csv_path, public_catalog_row_key)
0149
0150 On match the existing task is updated (catalogue mapping merged
0151 into overrides; description optionally refreshed; input_dataset_did
0152 optionally set/updated). On no match, a draft task is created and
0153 requires ``name``, ``dataset`` (DID or name), and ``prod_config``
0154 (name) in the payload.
0155
0156 Returns ``(task, was_created)``.
0157 """
0158 key_issue = payload.get('public_catalog_issue')
0159 key_csv_path = payload.get('public_catalog_csv_path')
0160 key_row = payload.get('public_catalog_row_key')
0161 if not key_issue and not (key_csv_path and key_row):
0162 raise ServiceError(
0163 'Idempotency key required: public_catalog_issue or '
0164 '(public_catalog_csv_path, public_catalog_row_key)'
0165 )
0166
0167 if key_issue:
0168 existing = ProdTask.objects.filter(
0169 overrides__public_catalog_issue=key_issue
0170 ).first()
0171 else:
0172 existing = ProdTask.objects.filter(
0173 overrides__public_catalog_csv_path=key_csv_path,
0174 overrides__public_catalog_row_key=key_row,
0175 ).first()
0176
0177 new_catalog = {k: payload[k] for k in PUBLIC_CATALOG_KEYS if k in payload}
0178 new_input_did = payload.get('input_dataset_did')
0179
0180 if existing:
0181 ov = dict(existing.overrides or {})
0182 ov.update(new_catalog)
0183 if new_input_did:
0184 ov['input_dataset_did'] = new_input_did
0185 existing.overrides = ov
0186 update_fields = ['overrides', 'updated_at']
0187 if payload.get('description') is not None:
0188 existing.description = payload['description']
0189 update_fields.append('description')
0190 existing.save(update_fields=update_fields)
0191 return existing, False
0192
0193 name = payload.get('name')
0194 dataset_handle = payload.get('dataset')
0195 config_handle = payload.get('prod_config')
0196 missing = [k for k, v in [('name', name),
0197 ('dataset', dataset_handle),
0198 ('prod_config', config_handle)]
0199 if not v]
0200 if missing:
0201 raise ServiceError(
0202 f'Creating a new task requires: {", ".join(missing)}'
0203 )
0204 output_ds = (Dataset.objects.filter(did=dataset_handle).first()
0205 or Dataset.objects.filter(dataset_name=dataset_handle).first())
0206 if not output_ds:
0207 raise ServiceError(f'Output dataset not found: {dataset_handle}')
0208 config = ProdConfig.objects.filter(name=config_handle).first()
0209 if not config:
0210 raise ServiceError(f'ProdConfig not found: {config_handle}')
0211
0212 ov = dict(new_catalog)
0213 if new_input_did:
0214 ov['input_dataset_did'] = new_input_did
0215 task = ProdTask.objects.create(
0216 name=name,
0217 description=payload.get('description', ''),
0218 status='draft',
0219 dataset=output_ds,
0220 prod_config=config,
0221 overrides=ov or None,
0222 created_by=created_by,
0223 )
0224 return task, True
0225
0226
0227 def prodtask_link_input(*, task, did=None, dids=None):
0228 """
0229 Link input Dataset(s) to a ProdTask via overrides JSON. Provide
0230 one of ``did`` (single) or ``dids`` (list), not both. Linked
0231 Datasets must already exist; this never creates Datasets.
0232 """
0233 if did and dids:
0234 raise ServiceError('Provide one of did or dids, not both')
0235 if not did and not dids:
0236 raise ServiceError('did or dids is required')
0237 targets = [did] if did else list(dids)
0238 found = set(Dataset.objects.filter(did__in=targets)
0239 .values_list('did', flat=True))
0240 missing = [d for d in targets if d not in found]
0241 if missing:
0242 raise ServiceError(f'Dataset(s) not found: {missing}')
0243 ov = dict(task.overrides or {})
0244 if did:
0245 ov['input_dataset_did'] = did
0246 ov.pop('input_dataset_dids', None)
0247 else:
0248 ov['input_dataset_dids'] = list(dids)
0249 ov.pop('input_dataset_did', None)
0250 task.overrides = ov
0251 task.save(update_fields=['overrides', 'updated_at'])
0252 return task
0253
0254
0255 def _known_prodtask_statuses():
0256 """Universe of legal ProdTask status values, derived from the
0257 transition map rather than a CharField choices enum."""
0258 known = set(PRODTASK_TRANSITIONS.keys())
0259 for trans in PRODTASK_TRANSITIONS.values():
0260 known.update(trans)
0261 return known
0262
0263
0264 def prodtask_set_status(*, task, new_status):
0265 """Lifecycle transition with rule enforcement."""
0266 valid = _known_prodtask_statuses()
0267 if new_status not in valid:
0268 raise ServiceError(
0269 f'Invalid status. Choose from: {", ".join(sorted(valid))}'
0270 )
0271 allowed = PRODTASK_TRANSITIONS.get(task.status, set())
0272 if new_status != task.status and new_status not in allowed:
0273 raise ServiceError(
0274 f'Cannot transition from {task.status!r} to {new_status!r}. '
0275 f'Allowed from {task.status!r}: '
0276 f'{sorted(allowed) or "(terminal)"}'
0277 )
0278 task.status = new_status
0279 task.save(update_fields=['status', 'updated_at'])
0280 return task
0281
0282
0283 def prodtask_apply_request(task, request, *, save=True):
0284 """Copy seed fields from ``request`` onto ``task`` (request → task).
0285
0286 Mutates ``task.request`` and the fields listed in
0287 ``REQUEST_TO_TASK_COPY_FIELDS``. Saves by default. After this call
0288 both rows are independently mutable; a later request edit does
0289 *not* automatically resync to the task. Call this again explicitly
0290 if a resync is desired.
0291 """
0292 if request is None:
0293 raise ServiceError('request is required')
0294 task.request = request
0295 for field in REQUEST_TO_TASK_COPY_FIELDS:
0296 setattr(task, field, getattr(request, field))
0297 if save:
0298 task.save(update_fields=['request'] + list(REQUEST_TO_TASK_COPY_FIELDS)
0299 + ['updated_at'])
0300 return task
0301
0302
0303
0304
0305
0306
0307 def campaign_set_current(campaign):
0308 """Atomically promote ``campaign`` to lifecycle='current'.
0309
0310 Any other Campaign currently at 'current' is demoted to 'past'.
0311 No-op if ``campaign`` is already current. Service-layer enforcement
0312 (no DB constraint) — direct admin saves can violate the invariant.
0313 """
0314 with transaction.atomic():
0315 if campaign.lifecycle == 'current':
0316 return campaign
0317 Campaign.objects.filter(lifecycle='current').exclude(pk=campaign.pk) \
0318 .update(lifecycle='past')
0319 campaign.lifecycle = 'current'
0320 campaign.save(update_fields=['lifecycle', 'updated_at'])
0321 return campaign
0322
0323
0324 def campaign_clone_to_new(source, *, name, created_by,
0325 description='', lifecycle='future'):
0326 """Create a new Campaign whose ``clone_of`` points to ``source``.
0327
0328 The new campaign is blank — tasks are not cloned by this helper.
0329 Task cloning is a separate operation (potentially per-task, as
0330 dataset rebinding for the new campaign's detector_version is
0331 non-trivial). Use ``campaign_set_current`` separately to promote.
0332 """
0333 if Campaign.objects.filter(name=name).exists():
0334 raise ServiceError(f'Campaign name already in use: {name}')
0335 if lifecycle not in {'past', 'current', 'future'}:
0336 raise ServiceError(f'Invalid lifecycle: {lifecycle!r}')
0337 new_c = Campaign.objects.create(
0338 name=name,
0339 lifecycle=lifecycle,
0340 description=description,
0341 clone_of=source,
0342 created_by=created_by,
0343 )
0344 return new_c
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355
0356 DEFAULT_DATASETS_CSV_PATH = (
0357 '/data/wenauseic/github/epic-prod/docs/_data/datasets.csv'
0358 )
0359
0360
0361 _YESNO_RECOGNISED = {'', 'yes', 'y', 'no', 'n', 'true', 'false', '0', '1'}
0362
0363
0364 def _yesno(value):
0365 """Parse Sakib's Yes/No into a boolean; everything else -> False.
0366
0367 Logs a WARN when a non-empty value is unrecognised (e.g. 'Maybe'),
0368 so silent drops surface in the log. NO-SILENT-FAILURES rule.
0369 """
0370 s = str(value or '').strip().lower()
0371 if s and s not in _YESNO_RECOGNISED:
0372 _log.warning('_yesno: unrecognised value %r treated as False', value)
0373 return s in ('yes', 'y', 'true', '1')
0374
0375
0376 _GH_EIC_RELEASE_RE = _re.compile(r'https?://github\.com/eic/([^/]+)/releases\b', _re.I)
0377 _GH_OTHER_RELEASE_RE = _re.compile(r'https?://github\.com/([^/]+)/([^/]+)/releases\b', _re.I)
0378 _GL_EIC_RE = _re.compile(r'https?://gitlab\.com/eic/([^?#]+?)/-/', _re.I)
0379 _GL_OTHER_RE = _re.compile(r'https?://gitlab\.com/([^?#]+?)/-/', _re.I)
0380
0381
0382
0383
0384
0385 _EVGEN_STRIP_PREFIXES = ('JeffersonLab/', 'mceg/')
0386
0387
0388 def _extract_evgen(value):
0389 """Derive a short evgen identifier from a Generator/Dataset Version cell.
0390
0391 - github.com/eic/<repo>/releases/... -> '<repo>'
0392 - github.com/<owner>/<repo>/releases/... (non-eic) -> '<owner>/<repo>'
0393 - gitlab.com/eic/<path>/-/... -> '<path>' (collapses
0394 tag variants of the
0395 same project under
0396 one filter value)
0397 - gitlab.com/<path>/-/... (non-eic) -> '<path>'
0398 - plain strings ('starlight', 'Pythia 8', ...) -> unchanged
0399
0400 Stored at ingest in overrides.csv_import.evgen so the catalog can
0401 filter and group by generator without re-parsing per render. The
0402 catalog will eventually bind these to canonical EvgenTag entries.
0403 """
0404 s = str(value or '').strip()
0405 if not s:
0406 return ''
0407 out = s
0408 m = _GH_EIC_RELEASE_RE.match(s)
0409 if m:
0410 out = m.group(1)
0411 elif (m := _GH_OTHER_RELEASE_RE.match(s)):
0412 out = f'{m.group(1)}/{m.group(2)}'
0413 elif (m := _GL_EIC_RE.match(s)):
0414 out = m.group(1)
0415 elif (m := _GL_OTHER_RE.match(s)):
0416 out = m.group(1)
0417 for prefix in _EVGEN_STRIP_PREFIXES:
0418 if out.startswith(prefix):
0419 return out[len(prefix):]
0420 return out
0421
0422
0423 def _parse_event_count(value):
0424 """Parse Sakib's event-count strings ('400k', '1M', '2.75M', plain
0425 integers) into an absolute event count.
0426
0427 Returns None for empty input (legitimate); WARN-logs and returns
0428 None for non-empty unparseable input. NO-SILENT-FAILURES rule —
0429 the previous strict int() parse dropped every populated row in
0430 the CSV without surfacing it anywhere.
0431 """
0432 raw = value
0433 s = str(value or '').strip().replace(',', '').replace(' ', '')
0434 if not s:
0435 return None
0436 mult = 1
0437 if s[-1] in ('k', 'K'):
0438 mult, s = 1_000, s[:-1]
0439 elif s[-1] in ('m', 'M'):
0440 mult, s = 1_000_000, s[:-1]
0441 elif s[-1] in ('b', 'B', 'g', 'G'):
0442 mult, s = 1_000_000_000, s[:-1]
0443 try:
0444 return int(round(float(s) * mult))
0445 except (TypeError, ValueError):
0446 _log.warning('_parse_event_count: unparseable value %r', raw)
0447 return None
0448
0449
0450 def _possible(value):
0451 """Truthy unless explicitly negative — 'Maybe' counts as yes.
0452 Used for pTDR ('possible pre-TDR use') and Other Use, where any
0453 non-empty, non-No marker indicates the use applies."""
0454 s = str(value or '').strip().lower()
0455 return bool(s) and s not in ('no', 'n', 'false', '0')
0456
0457
0458 def _csvimport_slug(dataset_path, gen_version):
0459 """Short stable slug derived from the (path, generator) idempotency key."""
0460 key = f'{dataset_path}|{gen_version}'.encode()
0461 return _hashlib.sha1(key).hexdigest()[:12]
0462
0463
0464 def _ensure_csvimport_anchors():
0465 """Resolve the placeholder Dataset-FK targets used by CSV-imported rows.
0466
0467 Returns ``(physics_tag, evgen_tag, simu_tag, reco_tag, prod_config,
0468 campaign)``. All must already exist; this is a lookup, not a
0469 creator. ``PROTECT`` FKs from Dataset to Tag and from ProdTask to
0470 ProdConfig mean we cannot synthesize ad hoc — we pin to whatever's
0471 already locked in the DB. The prod team can replace these per-task
0472 when they bind a real Dataset/Config to a CSV-imported task.
0473 """
0474 def first_locked(model, label):
0475 t = model.objects.filter(status='locked').order_by('tag_number').first()
0476 if not t:
0477 raise ServiceError(f'No locked {label} tag available for CSV import')
0478 return t
0479
0480 physics = first_locked(PhysicsTag, 'physics')
0481 evgen = first_locked(EvgenTag, 'evgen')
0482 simu = first_locked(SimuTag, 'simu')
0483 reco = first_locked(RecoTag, 'reco')
0484
0485 cfg = (ProdConfig.objects.filter(name__icontains='26.02.0 Standard').first()
0486 or ProdConfig.objects.first())
0487 if not cfg:
0488 raise ServiceError('No ProdConfig available for CSV import anchor')
0489
0490 campaign = Campaign.objects.filter(lifecycle='current').order_by('-updated_at').first()
0491 if not campaign:
0492 raise ServiceError('No current Campaign for CSV import')
0493
0494 return physics, evgen, simu, reco, cfg, campaign
0495
0496
0497 def import_default_datasets_csv(csv_path=None, *, created_by='csv_import'):
0498 """Import Sakib's epic-prod datasets.csv into the catalog.
0499
0500 Each CSV row becomes (idempotently):
0501 - one Dataset (placeholder tags + ``26.02.0`` campaign labels,
0502 full row preserved in ``metadata['csv_import']``),
0503 - one ProdTask (status='csv_import', linked to the Dataset, the
0504 anchor ProdConfig, and the current Campaign).
0505
0506 Idempotency key per row: ``(Dataset Path, Generator/Dataset Version)``.
0507 Re-running updates the existing rows in place.
0508
0509 Returns dict::
0510
0511 {'rows': int, 'created': int, 'updated': int, 'errors': [str, ...]}
0512 """
0513 path = csv_path or DEFAULT_DATASETS_CSV_PATH
0514 physics, evgen, simu, reco, cfg, campaign = _ensure_csvimport_anchors()
0515
0516 with open(path, newline='') as f:
0517 rows = list(_csv.DictReader(f))
0518
0519 summary = {'rows': len(rows), 'created': 0, 'updated': 0, 'errors': []}
0520
0521 with transaction.atomic():
0522 for i, row in enumerate(rows, 1):
0523 ds_path = (row.get('Dataset Path') or '').strip()
0524 gen_ver = (row.get('Generator/Dataset Version') or '').strip()
0525 if not ds_path and not gen_ver:
0526 summary['errors'].append(f'row {i}: empty path and gen_version, skipped')
0527 continue
0528
0529 slug = _csvimport_slug(ds_path, gen_ver)
0530 dataset_name = f'csv_import.{slug}'
0531 task_name = f'csv_import.{slug}'
0532
0533 raw_priority = (row.get('Priority') or '').strip()
0534 try:
0535 priority = int(raw_priority) if raw_priority else None
0536 except (TypeError, ValueError):
0537 _log.warning('csv_import row %d: unparseable Priority %r', i, raw_priority)
0538 priority = None
0539 nevents = _parse_event_count(row.get('Number of Events'))
0540
0541 metadata = {
0542 'stage': 'evgen',
0543 'source': {'kind': 'csv_manifest', 'location': ds_path},
0544 'csv_import': {k: (v or '').strip() for k, v in row.items()},
0545 }
0546 if gen_ver:
0547 metadata['source']['gen_version'] = gen_ver
0548
0549 ds = Dataset.objects.filter(dataset_name=dataset_name, block_num=1).first()
0550 if ds:
0551 ds.description = (row.get('Description') or '').strip()
0552 ds.metadata = metadata
0553 ds.save()
0554 ds_created = False
0555 else:
0556 ds = Dataset(
0557 dataset_name=dataset_name,
0558 scope='group.EIC',
0559 detector_version='26.02.0',
0560 detector_config='epic_craterlake',
0561 physics_tag=physics, evgen_tag=evgen,
0562 simu_tag=simu, reco_tag=reco,
0563 description=(row.get('Description') or '').strip(),
0564 metadata=metadata,
0565 created_by=created_by,
0566 )
0567 ds.save()
0568 ds_created = True
0569
0570 task_defaults = dict(
0571 description=(row.get('Description') or '').strip(),
0572 status='csv_import',
0573 dataset=ds,
0574 prod_config=cfg,
0575 campaign=campaign,
0576 requestor=(row.get('DSC or PWG') or '').strip().upper(),
0577 priority=priority,
0578 pre_tdr_use=_possible(row.get('Pre-TDR Use')),
0579 early_science_use=_yesno(row.get('Early Science Use')),
0580 other_use=_possible(row.get('Other Use')),
0581 new_request=_yesno(row.get('New Request')),
0582 overrides={
0583 'csv_import': {
0584 'background': (row.get('Background') or '').strip(),
0585 'nevents': nevents,
0586 'issue_url': (row.get('Issue') or '').strip(),
0587 'gen_version': gen_ver,
0588 'evgen': _extract_evgen(gen_ver),
0589 'other_use_text': (row.get('Other Use') or '').strip(),
0590 'filters': _extract_csv_filters(ds_path, ds.detector_config),
0591 },
0592 },
0593 created_by=created_by,
0594 )
0595
0596 existing = ProdTask.objects.filter(name=task_name).first()
0597 if existing:
0598
0599
0600 preserve_status = existing.status != 'csv_import'
0601 for k, v in task_defaults.items():
0602 if k == 'status' and preserve_status:
0603 continue
0604 if k == 'overrides':
0605
0606
0607
0608 merged = dict(existing.overrides or {})
0609 merged.update(v)
0610 v = merged
0611 setattr(existing, k, v)
0612 existing.save()
0613 summary['updated'] += 1
0614 else:
0615 ProdTask.objects.create(name=task_name, **task_defaults)
0616 summary['created'] += 1
0617
0618 return summary
0619
0620
0621
0622
0623
0624
0625 EPIC_PROD_PATH = '/data/wenauseic/github/epic-prod'
0626 PAST_CAMPAIGN_STAGES = ('FULL', 'RECO')
0627 PAST_CAMPAIGN_YEAR_PREFIXES = ('25.', '26.')
0628
0629 _PAST_DID_RE = _re.compile(r'^===\s*(epic:/\S+)\s*===\s*$')
0630 _PAST_RSE_RE = _re.compile(r'RSE:\s*(\S+)\s+Files:\s*(\d+)/(\d+)\s*\(([^)]+)\)')
0631 _PAST_SIZE_RE = _re.compile(r'Total Size:\s*([\d.]+)\s*([KMGTP]?B)\s*\((\d+)\s*files?\)', _re.I)
0632 _PAST_SUMMARY_RE = _re.compile(r'^===\s*CAMPAIGN SUMMARY\s*===\s*$')
0633
0634 _SIZE_UNIT_BYTES = {'B': 1, 'KB': 1_000, 'MB': 1_000_000,
0635 'GB': 1_000_000_000, 'TB': 1_000_000_000_000,
0636 'PB': 1_000_000_000_000_000}
0637
0638
0639 def _parse_size_to_bytes(value, unit):
0640 try:
0641 n = float(value)
0642 except (TypeError, ValueError):
0643 _log.warning('past_ingest: unparseable size value %r', value)
0644 return 0
0645 mult = _SIZE_UNIT_BYTES.get(unit.upper())
0646 if mult is None:
0647 _log.warning('past_ingest: unrecognised size unit %r', unit)
0648 return 0
0649 return int(round(n * mult))
0650
0651
0652 def _parse_past_index(text):
0653 """Parse an epic-prod docs/<STAGE>/<version>/index.md file.
0654
0655 Yields dicts: {did, rses: [{name, files, total, status}], file_count,
0656 data_size_bytes, complete}. The campaign summary block at the end is
0657 skipped.
0658 """
0659 block = None
0660
0661 def _emit():
0662 nonlocal block
0663 if block:
0664 yield_block, block = block, None
0665 return yield_block
0666 return None
0667
0668 for raw in text.splitlines():
0669 line = raw.strip()
0670 if not line or line == '```':
0671 continue
0672 if _PAST_SUMMARY_RE.match(line):
0673 done = _emit()
0674 if done is not None:
0675 yield done
0676 break
0677 m = _PAST_DID_RE.match(line)
0678 if m:
0679 done = _emit()
0680 if done is not None:
0681 yield done
0682 block = {'did': m.group(1), 'rses': [], 'file_count': 0,
0683 'data_size_bytes': 0, 'complete': True}
0684 continue
0685 if block is None:
0686 continue
0687 m = _PAST_RSE_RE.search(line)
0688 if m:
0689 files, total, status = int(m.group(2)), int(m.group(3)), m.group(4).strip()
0690 block['rses'].append({'name': m.group(1), 'files': files,
0691 'total': total, 'status': status})
0692 if status != 'complete':
0693 block['complete'] = False
0694 continue
0695 m = _PAST_SIZE_RE.search(line)
0696 if m:
0697 block['data_size_bytes'] = _parse_size_to_bytes(m.group(1), m.group(2))
0698 block['file_count'] = int(m.group(3))
0699 continue
0700 done = _emit()
0701 if done is not None:
0702 yield done
0703
0704
0705 _PAST_BEAM_RE = _re.compile(r'(\d+x\d+)')
0706 _PAST_Q2_RE = _re.compile(r'(minQ2=\d+|q2_\d+(?:to\d+)?)')
0707 _PAST_PHYS_TOP = ('DIS', 'SIDIS', 'DDIS', 'EXCLUSIVE', 'SINGLE', 'BACKGROUNDS')
0708
0709
0710 _PAST_ENERGY_BARE_RE = _re.compile(
0711 r'\b(\d+(?:\.\d+)?(?:eV|keV|MeV|GeV|TeV))\b', _re.I)
0712
0713
0714 def _extract_path_filters(segments):
0715 """Extract {beam, physics, q2, species, energy} from a list of path
0716 segments. Shared by the past-output DID and the current-tab CSV
0717 input-dataset path so both filter bars speak the same vocabulary.
0718 """
0719 seg_str = '/'.join(segments)
0720 beam_m = _PAST_BEAM_RE.search(seg_str)
0721 q2_m = _PAST_Q2_RE.search(seg_str)
0722 physics = next((p for p in _PAST_PHYS_TOP if p in segments), '')
0723 species, energy = '', ''
0724 if physics == 'SINGLE':
0725 i = segments.index('SINGLE')
0726 if len(segments) > i + 1:
0727 species = segments[i + 1]
0728 if len(segments) > i + 2:
0729 energy = segments[i + 2]
0730 if not energy:
0731 em = _PAST_ENERGY_BARE_RE.search(seg_str)
0732 if em:
0733 energy = em.group(1)
0734
0735
0736 dis_type = next((t for t in ('NC', 'CC') if t in segments), '')
0737 if physics == 'DIS' and dis_type:
0738 physics = dis_type
0739 return {
0740 'beam': beam_m.group(1) if beam_m else '',
0741 'physics': physics,
0742 'q2': q2_m.group(1) if q2_m else '',
0743 'species': species,
0744 'energy': energy,
0745 }
0746
0747
0748 def _extract_past_filters(did):
0749 """Past-output filter fields: detector (from path segment 3) plus
0750 the shared {beam, physics, q2, species, energy}. Empty string for
0751 any dimension the path doesn't carry."""
0752 parts = did.split(':', 1)
0753 rest_str = (parts[1] if len(parts) == 2 else did).lstrip('/')
0754 segs = rest_str.split('/')
0755 detector = segs[2] if len(segs) > 2 else ''
0756 out = _extract_path_filters(segs[3:])
0757 out['detector'] = detector
0758 return out
0759
0760
0761 def _extract_csv_filters(path, detector_config):
0762 """Current-tab filter fields from a CSV input dataset path.
0763
0764 Path shape: /volatile/eic/EPIC/EVGEN/<physics>/<gen>/.../<beam>/...
0765 Geometry comes from the dataset's detector_config field (not the
0766 path); everything else from the path tail after the EVGEN prefix.
0767 """
0768 segs = (path or '').lstrip('/').split('/')
0769
0770
0771 if len(segs) >= 4 and segs[:4] == ['volatile', 'eic', 'EPIC', 'EVGEN']:
0772 tail = segs[4:]
0773 else:
0774 tail = segs
0775 out = _extract_path_filters(tail)
0776 out['detector'] = detector_config or ''
0777 return out
0778
0779
0780 def _decompose_past_did(did):
0781 """Break an epic-prod DID into the path-level fields we filter on.
0782
0783 Expected form: epic:/STAGE/version/detector_config/<bg-chain>/<phys-chain>/beam/paramset
0784 Everything after detector_config is best-effort; what we don't recognise
0785 stays in metadata['past_output']['path_remainder'] for the row to render
0786 verbatim. NO-SILENT-FAILURES: a DID that doesn't even split into the
0787 leading STAGE/version/detector parts is logged.
0788 """
0789 parts = did.split(':', 1)
0790 rest = parts[1] if len(parts) == 2 else did
0791 rest = rest.lstrip('/')
0792 segs = rest.split('/')
0793 if len(segs) < 3:
0794 _log.warning('past_ingest: DID %r has too few segments', did)
0795 return {}
0796 out = {'stage': segs[0], 'version': segs[1], 'detector_config': segs[2]}
0797 if len(segs) > 3:
0798 out['path_remainder'] = '/'.join(segs[3:])
0799 return out
0800
0801
0802 def import_epic_prod_past_campaigns(*, epic_prod_path=EPIC_PROD_PATH,
0803 created_by='past_import'):
0804 """Import 2026 past-campaign output datasets from a cloned epic-prod.
0805
0806 Walks ``docs/FULL/<v>/index.md`` and ``docs/RECO/<v>/index.md`` for
0807 every 2026 version listed in ``docs/_data/{full,reco}_content.yml``.
0808 The 'main' alias is excluded (it's a moving target, not a frozen
0809 archive).
0810
0811 For each parsed dataset we get-or-create:
0812 - Campaign(name='{STAGE}/{version}', lifecycle='past')
0813 - Dataset(did=PCS internal DID, dataset_name='past.{STAGE}.{ver}.{slug}'),
0814 with the epic-prod Rucio DID stored in metadata['source']['location']
0815 and the per-RSE breakdown in metadata['past_output'].
0816 - ProdTask(name='past.{STAGE}.{ver}.{slug}', status='past_output',
0817 linked to the Dataset, anchor ProdConfig, and Campaign).
0818
0819 Idempotency key: (STAGE, version, epic_prod_did). Re-running refreshes
0820 file_count / data_size / rse breakdown but leaves any operator-touched
0821 status / overrides intact (same rule as csv_import).
0822 """
0823 import os as _os
0824 import yaml as _yaml
0825 physics, evgen, simu, reco, cfg, _ = _ensure_csvimport_anchors()
0826
0827 summary = {'campaigns': 0, 'rows': 0, 'created': 0, 'updated': 0, 'errors': []}
0828
0829 versions_by_stage = {}
0830 for stage in PAST_CAMPAIGN_STAGES:
0831 yml_name = f'{stage.lower()}_content.yml'
0832 yml_path = _os.path.join(epic_prod_path, 'docs', '_data', yml_name)
0833 try:
0834 with open(yml_path) as f:
0835 entries = _yaml.safe_load(f) or []
0836 except (OSError, _yaml.YAMLError) as e:
0837 summary['errors'].append(f'{stage}: cannot read {yml_name}: {e}')
0838 continue
0839 versions_by_stage[stage] = [
0840 e['text'] for e in entries
0841 if isinstance(e, dict)
0842 and any(e.get('text', '').startswith(p) for p in PAST_CAMPAIGN_YEAR_PREFIXES)
0843 and e['text'] != 'main'
0844 ]
0845
0846 with transaction.atomic():
0847 for stage, versions in versions_by_stage.items():
0848 for version in versions:
0849 campaign_name = f'{stage}/{version}'
0850 index_path = _os.path.join(epic_prod_path, 'docs', stage, version, 'index.md')
0851 try:
0852 with open(index_path) as f:
0853 text = f.read()
0854 except OSError as e:
0855 summary['errors'].append(f'{campaign_name}: {e}')
0856 continue
0857
0858 campaign, _ = Campaign.objects.get_or_create(
0859 name=campaign_name,
0860 defaults={'lifecycle': 'past',
0861 'description': f'{stage} campaign {version} '
0862 f'(epic-prod {index_path})',
0863 'created_by': created_by},
0864 )
0865 if campaign.lifecycle != 'past':
0866 campaign.lifecycle = 'past'
0867 campaign.save(update_fields=['lifecycle'])
0868 summary['campaigns'] += 1
0869
0870 campaign_files = 0
0871 campaign_bytes = 0
0872 for block in _parse_past_index(text):
0873 summary['rows'] += 1
0874 epic_did = block['did']
0875 slug = _hashlib.sha1(epic_did.encode()).hexdigest()[:12]
0876 pcs_name = f'past.{stage}.{version}.{slug}'
0877 decomposed = _decompose_past_did(epic_did)
0878
0879 metadata = {
0880 'stage': stage.lower(),
0881 'source': {'kind': 'rucio_did', 'location': epic_did},
0882 'past_output': {
0883 'campaign_name': campaign_name,
0884 'stage': stage,
0885 'version': version,
0886 'rses': block['rses'],
0887 'complete': block['complete'],
0888 'path': decomposed,
0889 'filters': _extract_past_filters(epic_did),
0890 'index_path': index_path,
0891 },
0892 }
0893
0894 pcs_did = f'group.EIC:{pcs_name}.b1'
0895 ds, ds_created = Dataset.objects.get_or_create(
0896 dataset_name=pcs_name, block_num=1,
0897 defaults=dict(
0898 scope='group.EIC', did=pcs_did,
0899 detector_version=version,
0900 detector_config=decomposed.get('detector_config', ''),
0901 physics_tag=physics, evgen_tag=evgen,
0902 simu_tag=simu, reco_tag=reco,
0903 file_count=block['file_count'],
0904 data_size=block['data_size_bytes'],
0905 description='',
0906 metadata=metadata,
0907 created_by=created_by,
0908 ),
0909 )
0910 if not ds_created:
0911 ds.file_count = block['file_count']
0912 ds.data_size = block['data_size_bytes']
0913 ds.metadata = metadata
0914 ds.detector_version = version
0915 ds.detector_config = decomposed.get('detector_config', '')
0916 ds.save()
0917
0918 task_defaults = dict(
0919 description='',
0920 dataset=ds,
0921 prod_config=cfg,
0922 campaign=campaign,
0923 overrides={'past_output': metadata['past_output']},
0924 created_by=created_by,
0925 )
0926 existing = ProdTask.objects.filter(name=pcs_name).first()
0927 if existing:
0928 preserve_status = existing.status != 'past_output'
0929 for k, v in task_defaults.items():
0930 if k == 'overrides':
0931 merged = dict(existing.overrides or {})
0932 merged.update(v)
0933 v = merged
0934 setattr(existing, k, v)
0935 if not preserve_status:
0936 existing.status = 'past_output'
0937 existing.save()
0938 summary['updated'] += 1
0939 else:
0940 ProdTask.objects.create(name=pcs_name,
0941 status='past_output',
0942 **task_defaults)
0943 summary['created'] += 1
0944 campaign_files += block['file_count']
0945 campaign_bytes += block['data_size_bytes']
0946
0947 campaign.data = {
0948 **(campaign.data or {}),
0949 'past_summary': {
0950 'file_count': campaign_files,
0951 'data_size_bytes': campaign_bytes,
0952 'stage': stage,
0953 'version': version,
0954 },
0955 }
0956 campaign.save(update_fields=['data', 'updated_at'])
0957
0958 return summary
0959
0960
0961
0962
0963
0964
0965
0966
0967
0968
0969
0970
0971
0972
0973
0974 JLAB_RUCIO_URL = '/'.join(['https://rucio-server.jlab.org:443'])
0975 JLAB_RUCIO_ACCOUNT = 'eicread'
0976 JLAB_RUCIO_USERNAME = 'eicread'
0977 JLAB_RUCIO_PASSWORD = 'eicread'
0978 RUCIO_SNAPSHOT_DIR = '/opt/swf-monitor/shared/rucio-snapshots'
0979
0980
0981 def _jlab_rucio_auth(timeout=30):
0982 """userpass-auth against JLab Rucio. Returns the X-Rucio-Auth-Token."""
0983 import urllib.request as _ur
0984 import ssl as _ssl
0985 import os as _os
0986 ctx = _ssl.create_default_context()
0987 ctx.check_hostname = False
0988 ctx.verify_mode = _ssl.CERT_NONE
0989 url = (_os.environ.get('JLAB_RUCIO_URL') or JLAB_RUCIO_URL) + '/auth/userpass'
0990 req = _ur.Request(url)
0991 req.add_header('X-Rucio-Account', _os.environ.get('JLAB_RUCIO_ACCOUNT', JLAB_RUCIO_ACCOUNT))
0992 req.add_header('X-Rucio-Username', _os.environ.get('JLAB_RUCIO_USERNAME', JLAB_RUCIO_USERNAME))
0993 req.add_header('X-Rucio-Password', _os.environ.get('JLAB_RUCIO_PASSWORD', JLAB_RUCIO_PASSWORD))
0994 resp = _ur.urlopen(req, context=ctx, timeout=timeout)
0995 token = resp.headers['X-Rucio-Auth-Token']
0996 if not token:
0997 raise ServiceError('JLab Rucio auth returned no token')
0998 return token
0999
1000
1001 def _jlab_rucio_get(path, token, *, timeout=60, **q):
1002 """GET a JLab Rucio path with the auth token; returns response text."""
1003 import urllib.request as _ur
1004 import urllib.parse as _up
1005 import ssl as _ssl
1006 import os as _os
1007 ctx = _ssl.create_default_context()
1008 ctx.check_hostname = False
1009 ctx.verify_mode = _ssl.CERT_NONE
1010 url = (_os.environ.get('JLAB_RUCIO_URL') or JLAB_RUCIO_URL) + path
1011 if q:
1012 url += '?' + _up.urlencode(q)
1013 req = _ur.Request(url)
1014 req.add_header('X-Rucio-Auth-Token', token)
1015 return _ur.urlopen(req, context=ctx, timeout=timeout).read().decode()
1016
1017
1018 def _ndjson(text):
1019 """Parse a newline-delimited-JSON Rucio response (strings or dicts)."""
1020 import json as _json
1021 out = []
1022 for line in text.splitlines():
1023 line = line.strip()
1024 if not line:
1025 continue
1026 try:
1027 out.append(_json.loads(line))
1028 except _json.JSONDecodeError:
1029 out.append(line)
1030 return out
1031
1032
1033 def fetch_jlab_rucio_campaign(campaign_path, *, scope='epic', token=None,
1034 max_workers=16):
1035 """Fetch the full Rucio snapshot for one campaign path (e.g. '/RECO/26.02.0').
1036
1037 Returns {count, datasets:[{did, length, bytes, rse_replicas:[{rse, ...}]}, ...]}.
1038 Each dataset's rse_replicas mirrors what /replicas/<scope>/<name>/datasets
1039 returns — per-RSE found/total/state/bytes, exactly the shape the
1040 epic-prod nightly workflow uses to produce its index.md replica lines.
1041
1042 Per-dataset metadata + replica fetches run in a ThreadPoolExecutor so a
1043 365-dataset campaign completes in ~5-10s instead of ~80s, keeping the
1044 'Update from Rucio' button under Apache's request timeout.
1045 """
1046 import json as _json
1047 from concurrent.futures import ThreadPoolExecutor as _Pool
1048 if token is None:
1049 token = _jlab_rucio_auth()
1050 names = _ndjson(_jlab_rucio_get(
1051 f'/dids/{scope}/dids/search', token,
1052 type='dataset', name=campaign_path + '/*'))
1053
1054 def _one(name):
1055 if not isinstance(name, str):
1056 return None
1057 try:
1058 meta = _json.loads(_jlab_rucio_get(f'/dids/{scope}/{name}', token))
1059 except Exception as e:
1060 _log.warning('rucio meta %s/%s: %s', scope, name, e)
1061 meta = {}
1062 try:
1063 rse_records = _ndjson(
1064 _jlab_rucio_get(f'/replicas/{scope}/{name}/datasets', token))
1065 except Exception as e:
1066 _log.warning('rucio replicas %s/%s: %s', scope, name, e)
1067 rse_records = []
1068 return {
1069 'did': f'{scope}:{name}',
1070 'length': meta.get('length'),
1071 'bytes': meta.get('bytes'),
1072 'rse_replicas': rse_records,
1073 }
1074
1075 with _Pool(max_workers=max_workers) as pool:
1076 results = list(pool.map(_one, names))
1077 datasets = [r for r in results if r is not None]
1078 return {'count': len(datasets), 'datasets': datasets}
1079
1080
1081 def _request_input_tail(ds_path):
1082 """Return the comparable tail of a CSV input dataset path.
1083
1084 /volatile/eic/EPIC/EVGEN/<TAIL> -> '<TAIL>' (lower-cased)
1085 anything else -> '' (no match)
1086 """
1087 if not ds_path:
1088 return ''
1089 parts = ds_path.strip('/').split('/')
1090 if len(parts) >= 5 and parts[:4] == ['volatile', 'eic', 'EPIC', 'EVGEN']:
1091 return '/'.join(parts[4:]).lower()
1092 return ''
1093
1094
1095 def _did_path_tail(did):
1096 """Drop scope + /STAGE/<v>/<detector>/ and return the rest, lower-cased.
1097
1098 epic:/RECO/26.02.0/epic_craterlake/DDIS/rapgap3.../noRad/ep/10x100/q2_1to10
1099 -> 'ddis/rapgap3.../norad/ep/10x100/q2_1to10'
1100 """
1101 name = did.split(':', 1)[-1].lstrip('/')
1102 parts = name.split('/')
1103 if len(parts) <= 3:
1104 return ''
1105 return '/'.join(parts[3:]).lower()
1106
1107
1108 def _aggregate_rucio_match(matches):
1109 """Per-stage rollup over a list of matched dataset records.
1110
1111 Rucio's /dids/<scope>/<name> endpoint returns length/bytes as null
1112 for these datasets; the canonical file count and byte size live on
1113 each RSE replica record (they all carry the same value since RSEs
1114 hold replicas of the same data). Take the max across RSEs.
1115 """
1116 by_stage = {}
1117 for m in matches:
1118 st = by_stage.setdefault(m['stage'], {
1119 'count': 0, 'files': 0, 'bytes': 0,
1120 'rses': {}, 'incomplete': 0,
1121 })
1122 st['count'] += 1
1123 ds_files = m.get('length') or 0
1124 ds_bytes = m.get('bytes') or 0
1125 complete = True
1126 for r in m.get('rse_replicas', []):
1127 rse = r.get('rse')
1128 if not rse:
1129 continue
1130 ds_files = max(ds_files, r.get('length') or 0)
1131 ds_bytes = max(ds_bytes, r.get('bytes') or 0)
1132 st['rses'].setdefault(rse, {'complete': 0, 'incomplete': 0})
1133 if r.get('available_length') == r.get('length') and r.get('length'):
1134 st['rses'][rse]['complete'] += 1
1135 else:
1136 st['rses'][rse]['incomplete'] += 1
1137 complete = False
1138 st['files'] += ds_files
1139 st['bytes'] += ds_bytes
1140 if not complete:
1141 st['incomplete'] += 1
1142 return by_stage
1143
1144
1145 def _index_snapshot_by_tail(snapshot):
1146 """Pre-index a snapshot for fast tail-prefix lookup.
1147
1148 Returns a list of (tail, dataset_record_with_stage) tuples sorted by
1149 tail length DESC, so the most-specific tails are tried first.
1150 """
1151 idx = []
1152 for cpath, info in snapshot.get('campaigns', {}).items():
1153
1154 cp_parts = cpath.strip('/').split('/')
1155 stage = cp_parts[0] if cp_parts else ''
1156 for d in info.get('datasets', []):
1157 tail = _did_path_tail(d.get('did', ''))
1158 if not tail:
1159 continue
1160 idx.append((tail, {**d, 'stage': stage}))
1161 return idx
1162
1163
1164 _Q2_MIN_RE = _re.compile(r'^minQ2=([\d.]+)$', _re.I)
1165 _Q2_RANGE_RE = _re.compile(r'^q2_([\d.]+)to([\d.]+)$', _re.I)
1166 _Q2_POINT_RE = _re.compile(r'^q2_([\d.]+)$', _re.I)
1167
1168
1169 def _q2_range(s):
1170 """Convert a Q² label into a (lo, hi) numeric range.
1171
1172 'minQ2=1' -> (1, inf)
1173 'q2_1to10' -> (1, 10)
1174 'q2_20' -> (20, 20)
1175 anything else / empty -> None
1176 """
1177 if not s:
1178 return None
1179 m = _Q2_MIN_RE.match(s)
1180 if m:
1181 return (float(m.group(1)), float('inf'))
1182 m = _Q2_RANGE_RE.match(s)
1183 if m:
1184 return (float(m.group(1)), float(m.group(2)))
1185 m = _Q2_POINT_RE.match(s)
1186 if m:
1187 v = float(m.group(1))
1188 return (v, v)
1189 return None
1190
1191
1192 def _q2_overlap(req_q2, did_q2):
1193 """True if request and DID Q² labels could plausibly intersect."""
1194 if not req_q2 or not did_q2:
1195 return True
1196 a = _q2_range(req_q2)
1197 b = _q2_range(did_q2)
1198 if a is None or b is None:
1199 return req_q2 == did_q2
1200 return a[0] <= b[1] and b[0] <= a[1]
1201
1202
1203 def _filter_match(req, did):
1204 """True if a request's filter fields match an output DID's.
1205
1206 Matches on the semantic axes shared by both schemas (detector,
1207 beam, physics, Q²) rather than on path strings — the output DID
1208 carries extra segments (generator, radiation, charge) and a
1209 different Q² spelling that defeat string matching. See
1210 project-pcs-request-vs-output-paths.
1211 """
1212 for k in ('detector', 'beam', 'physics'):
1213 rv, dv = req.get(k), did.get(k)
1214 if rv and dv and rv != dv:
1215 return False
1216 if rv and not dv:
1217 return False
1218 if not _q2_overlap(req.get('q2'), did.get('q2')):
1219 return False
1220
1221
1222 for k in ('species', 'energy'):
1223 rv, dv = req.get(k), did.get(k)
1224 if rv and dv and rv != dv:
1225 return False
1226 return True
1227
1228
1229 def _did_tail_segments(did_tail):
1230 return did_tail.split('/') if did_tail else []
1231
1232
1233 def _path_aligned_match(did_segs, req_segs):
1234 """Is `req_segs` a contiguous subsequence of `did_segs`?
1235
1236 Rucio output paths often carry a background-mixing chain prefix
1237 (e.g. Bkg_Exact1S_2us/GoldCt/5um/) AND a Q²-bin suffix that the
1238 CSV input tail doesn't carry. Subsequence matching handles both.
1239 """
1240 if not req_segs or len(req_segs) > len(did_segs):
1241 return False
1242 n = len(req_segs)
1243 for i in range(len(did_segs) - n + 1):
1244 if did_segs[i:i + n] == req_segs:
1245 return True
1246 return False
1247
1248
1249 def match_requests_to_rucio_snapshot(snapshot, *, campaign):
1250 """For every ProdTask in `campaign` whose CSV input has a usable tail,
1251 stash an `overrides['csv_import']['output']` rollup of matching Rucio
1252 datasets. A Rucio dataset matches when its DID-name tail
1253 (after /STAGE/<v>/<detector>/) contains the request input tail as a
1254 contiguous, path-aligned segment sequence — so BG-mixing prefixes
1255 and Q²-bin suffixes in the output don't break the match.
1256
1257 Also stashes the unmatched Rucio datasets on
1258 ``campaign.data['rucio_unmatched']`` so the catalog view can surface
1259 them as synthetic table rows (unmatched output popping up).
1260 """
1261
1262 idx = _index_snapshot_by_tail(snapshot)
1263 idx_filtered = [(rec, _extract_past_filters(rec['did']))
1264 for _tail, rec in idx]
1265 qs = ProdTask.objects.filter(campaign=campaign).select_related('dataset')
1266 summary = {'tasks_seen': 0, 'tasks_matched': 0, 'tasks_unmatched': 0}
1267 matched_dids = set()
1268 for t in qs:
1269 summary['tasks_seen'] += 1
1270
1271
1272
1273 req_filters = ((t.overrides or {}).get('csv_import') or {}).get('filters') or {}
1274 if not req_filters:
1275 ds_path = (t.dataset.metadata or {}).get('source', {}).get('location', '') \
1276 if (t.dataset.metadata or {}) else ''
1277 req_filters = _extract_csv_filters(ds_path, t.dataset.detector_config) \
1278 if ds_path else {}
1279 matches = []
1280 if req_filters and any(req_filters.get(k) for k in ('beam', 'physics')):
1281 for rec, did_filters in idx_filtered:
1282 if _filter_match(req_filters, did_filters):
1283 matches.append(rec)
1284 for m in matches:
1285 matched_dids.add(m['did'])
1286 by_stage = _aggregate_rucio_match(matches)
1287 any_incomplete = any(s.get('incomplete', 0) for s in by_stage.values())
1288 has_output = len(matches) > 0
1289 if not has_output:
1290 output_status = 'no_output'
1291 elif any_incomplete:
1292 output_status = 'incomplete'
1293 else:
1294 output_status = 'complete'
1295 overrides = dict(t.overrides or {})
1296 cv = dict(overrides.get('csv_import', {}) or {})
1297 cv['output'] = {
1298 'by_stage': by_stage,
1299 'matched_count': len(matches),
1300 'campaign_name': campaign.name,
1301 'status': output_status,
1302 'has_output': has_output,
1303 'has_incomplete': any_incomplete,
1304 'has_simu': 'FULL' in by_stage,
1305 'has_reco': 'RECO' in by_stage,
1306 }
1307 overrides['csv_import'] = cv
1308 t.overrides = overrides
1309 t.save(update_fields=['overrides', 'updated_at'])
1310 if matches:
1311 summary['tasks_matched'] += 1
1312 else:
1313 summary['tasks_unmatched'] += 1
1314
1315
1316
1317 unmatched = []
1318 for cpath, info in (snapshot.get('campaigns') or {}).items():
1319 cp_parts = cpath.strip('/').split('/')
1320 stage = cp_parts[0] if cp_parts else ''
1321 for d in info.get('datasets') or []:
1322 did = d.get('did', '')
1323 if did and did not in matched_dids:
1324
1325
1326
1327 rollup = _aggregate_rucio_match([{**d, 'stage': stage}])
1328 files = sum(s['files'] for s in rollup.values())
1329 bytes_ = sum(s['bytes'] for s in rollup.values())
1330 any_incomplete = any(s.get('incomplete', 0) for s in rollup.values())
1331 unmatched.append({
1332 'did': did,
1333 'stage': stage,
1334 'files': files,
1335 'bytes': bytes_,
1336 'rse_names': sorted({r.get('rse', '') for r in d.get('rse_replicas', []) if r.get('rse')}),
1337 'by_stage': rollup,
1338 'incomplete': any_incomplete,
1339 'filters': _extract_past_filters(did),
1340 })
1341 campaign.data = {**(campaign.data or {}), 'rucio_unmatched': unmatched}
1342 campaign.save(update_fields=['data', 'updated_at'])
1343 summary['rucio_unmatched'] = len(unmatched)
1344 return summary
1345
1346
1347 def summarize_rucio_timeline(snapshot, *, bin_hours=12):
1348 """Build a per-bin cumulative arrival timeline from a Rucio snapshot.
1349
1350 'Arrival' = the earliest created_at across all RSE replicas of a
1351 dataset. Datasets without any usable timestamp are dropped. Each
1352 arrival lands in the nearest `bin_hours`-wide bin (default 12h,
1353 aligned to UTC midnight). Returns a dict suitable for Plotly:
1354
1355 {'dates': ['YYYY-MM-DDTHH:00:00', ...],
1356 'bin_hours': 12,
1357 'simu': {'cum_datasets':[...], 'cum_files':[...], 'cum_bytes':[...]},
1358 'reco': {'cum_datasets':[...], 'cum_files':[...], 'cum_bytes':[...]}}
1359 """
1360 from email.utils import parsedate_to_datetime as _pd
1361 import datetime as _dt
1362 bin_size = _dt.timedelta(hours=bin_hours)
1363
1364 def _bucket(dt):
1365 """Floor `dt` to the nearest `bin_hours`-bin aligned to UTC midnight."""
1366 midnight = dt.replace(hour=0, minute=0, second=0, microsecond=0)
1367 offset = (dt - midnight).total_seconds()
1368 return midnight + _dt.timedelta(
1369 seconds=(int(offset) // int(bin_size.total_seconds()))
1370 * int(bin_size.total_seconds()))
1371
1372 arrivals = []
1373 for cpath, info in (snapshot.get('campaigns') or {}).items():
1374 cp_parts = cpath.strip('/').split('/')
1375 stage = cp_parts[0] if cp_parts else ''
1376 for d in info.get('datasets') or []:
1377 rses = d.get('rse_replicas') or []
1378 tsps = []
1379 files = bytes_ = 0
1380 for r in rses:
1381 ts_str = r.get('created_at')
1382 if ts_str:
1383 try:
1384 tsps.append(_pd(ts_str))
1385 except Exception:
1386 pass
1387 files = max(files, r.get('length') or 0)
1388 bytes_ = max(bytes_, r.get('bytes') or 0)
1389 if not tsps:
1390 continue
1391 arrivals.append((_bucket(min(tsps)).strftime('%Y-%m-%dT%H:%M:%S'),
1392 stage, files, bytes_))
1393 arrivals.sort()
1394 if not arrivals:
1395 return {'dates': [], 'bin_hours': bin_hours, 'simu': {}, 'reco': {}}
1396
1397 first = _dt.datetime.fromisoformat(arrivals[0][0])
1398 last_arr = _dt.datetime.fromisoformat(arrivals[-1][0])
1399
1400
1401
1402 last = max(last_arr, _bucket(_dt.datetime.utcnow()))
1403 span = last - first
1404 n_bins = int(span.total_seconds() // bin_size.total_seconds()) + 1
1405 dates = [(first + i * bin_size).strftime('%Y-%m-%dT%H:%M:%S')
1406 for i in range(n_bins)]
1407 idx = {d: i for i, d in enumerate(dates)}
1408
1409 def _empty():
1410 return {'cum_datasets': [0]*n_bins, 'cum_files': [0]*n_bins, 'cum_bytes': [0]*n_bins}
1411 out = {'dates': dates, 'bin_hours': bin_hours, 'simu': _empty(), 'reco': _empty()}
1412 per_bin = {'FULL': _empty(), 'RECO': _empty()}
1413 for d, stage, files, bytes_ in arrivals:
1414 if stage not in per_bin or d not in idx:
1415 continue
1416 i = idx[d]
1417 per_bin[stage]['cum_datasets'][i] += 1
1418 per_bin[stage]['cum_files'][i] += files
1419 per_bin[stage]['cum_bytes'][i] += bytes_
1420 for stage, key in (('FULL', 'simu'), ('RECO', 'reco')):
1421 cd = cf = cb = 0
1422 for i in range(n_bins):
1423 cd += per_bin[stage]['cum_datasets'][i]
1424 cf += per_bin[stage]['cum_files'][i]
1425 cb += per_bin[stage]['cum_bytes'][i]
1426 out[key]['cum_datasets'][i] = cd
1427 out[key]['cum_files'][i] = cf
1428 out[key]['cum_bytes'][i] = cb
1429 return out
1430
1431
1432 def _detect_active_releases(token=None, *, year_prefix='26.'):
1433 """Per-release dataset counts under epic:/RECO/<v>/ in JLab Rucio.
1434
1435 Returns list of {version, count} for every <year_prefix>x.y release
1436 that has at least one dataset, sorted newest-first by component
1437 version. 'main' excluded. Trial runs that land a handful of
1438 datasets before real production starts are deliberately NOT
1439 filtered out here — the operator judges from the counts, and
1440 nothing is auto-promoted. Humans switch (see
1441 feedback-humans-switch-lifecycle).
1442 """
1443 if token is None:
1444 token = _jlab_rucio_auth()
1445 names = _ndjson(_jlab_rucio_get(
1446 '/dids/epic/dids/search', token, type='dataset', name='/RECO/*'))
1447 from collections import Counter as _Counter
1448 counts = _Counter()
1449 for n in names:
1450 if not isinstance(n, str):
1451 continue
1452 parts = n.lstrip('/').split('/')
1453 if len(parts) < 2:
1454 continue
1455 v = parts[1]
1456 if v == 'main' or not v.startswith(year_prefix):
1457 continue
1458 counts[v] += 1
1459 def _key(v):
1460 out = []
1461 for part in v.split('.'):
1462 try:
1463 out.append((0, int(part)))
1464 except ValueError:
1465 out.append((1, part))
1466 return tuple(out)
1467 return [{'version': v, 'count': counts[v]}
1468 for v in sorted(counts, key=_key, reverse=True)]
1469
1470
1471 def load_rucio_snapshot(campaign_name, *, snapshot_dir=RUCIO_SNAPSHOT_DIR):
1472 """Read a saved JLab Rucio snapshot. Returns None if absent."""
1473 import json as _json
1474 import os as _os
1475 path = _os.path.join(snapshot_dir, f'current-{campaign_name}.json')
1476 if not _os.path.exists(path):
1477 return None
1478 try:
1479 with open(path) as f:
1480 return _json.load(f)
1481 except (OSError, _json.JSONDecodeError) as e:
1482 _log.warning('load_rucio_snapshot %s: %s', path, e)
1483 return None
1484
1485
1486 def import_jlab_rucio_current_snapshot(*, campaign_name=None,
1487 snapshot_dir=RUCIO_SNAPSHOT_DIR,
1488 created_by='rucio_snapshot'):
1489 """Pull the JLab Rucio snapshot for the PCS current campaign and save
1490 it under the snapshot directory.
1491
1492 campaign_name: '26.02.0' (or override). If None, uses the
1493 lifecycle='current' Campaign. Both /RECO/<v> and /FULL/<v> are
1494 fetched.
1495 snapshot_dir: writable directory. One JSON file per current campaign:
1496 '<snapshot_dir>/current-<campaign>.json'.
1497 NO-SILENT-FAILURES: every network / parse error is collected into
1498 summary['errors'] and surfaced to the operator.
1499 """
1500 import json as _json
1501 import os as _os
1502 import time as _time
1503
1504 summary = {'campaign': '', 'paths': {}, 'errors': [], 'snapshot_path': ''}
1505
1506
1507
1508 if campaign_name is None:
1509 current = (Campaign.objects.filter(lifecycle='current')
1510 .order_by('-updated_at').first())
1511 if not current:
1512 raise ServiceError('No current Campaign defined in PCS')
1513 campaign_name = current.name
1514 summary['campaign'] = campaign_name
1515
1516 _os.makedirs(snapshot_dir, exist_ok=True)
1517 out_path = _os.path.join(snapshot_dir, f'current-{campaign_name}.json')
1518 summary['snapshot_path'] = out_path
1519
1520 try:
1521 token = _jlab_rucio_auth()
1522 except Exception as e:
1523 raise ServiceError(f'JLab Rucio auth failed: {e}')
1524
1525 snapshot = {
1526 'fetched_at': _time.strftime('%Y-%m-%dT%H:%M:%S%z'),
1527 'scope': 'epic',
1528 'campaign_name': campaign_name,
1529 'campaigns': {},
1530 }
1531 for stage in ('RECO', 'FULL'):
1532 cpath = f'/{stage}/{campaign_name}'
1533 try:
1534 snapshot['campaigns'][cpath] = fetch_jlab_rucio_campaign(
1535 cpath, token=token)
1536 summary['paths'][cpath] = snapshot['campaigns'][cpath]['count']
1537 except Exception as e:
1538 summary['errors'].append(f'{cpath}: {e}')
1539 snapshot['campaigns'][cpath] = {'count': 0, 'datasets': [],
1540 'error': str(e)}
1541 summary['paths'][cpath] = 0
1542
1543 with open(out_path, 'w') as f:
1544 _json.dump(snapshot, f, indent=2)
1545 summary['file_bytes'] = _os.path.getsize(out_path)
1546
1547
1548
1549
1550
1551 try:
1552 token = _jlab_rucio_auth()
1553 detected = _detect_active_releases(token=token)
1554 except Exception as e:
1555 summary['errors'].append(f'detect_active_releases: {e}')
1556 detected = []
1557 summary['detected_releases'] = detected
1558
1559
1560
1561
1562 try:
1563 camp = Campaign.objects.get(name=campaign_name)
1564
1565
1566 camp.data = {**(camp.data or {}), 'detected_releases': detected}
1567 camp.save(update_fields=['data', 'updated_at'])
1568
1569
1570
1571
1572 if camp.lifecycle == 'current':
1573 match_summary = match_requests_to_rucio_snapshot(snapshot, campaign=camp)
1574 summary['match'] = match_summary
1575 except Campaign.DoesNotExist:
1576 summary['errors'].append(
1577 f"campaign '{campaign_name}' not in PCS - skipping match step")
1578 except Exception as e:
1579 summary['errors'].append(f'request match step failed: {e}')
1580
1581 return summary
1582
1583
1584 def set_pcs_campaign_lifecycle(new_name, target_lifecycle, *, created_by='operator'):
1585 """Set the PCS Campaign with lifecycle=`target_lifecycle` to `new_name`.
1586
1587 target_lifecycle is 'current' or 'last' (singular slots — at most one
1588 Campaign at a time). If the slot is occupied, the existing occupant
1589 is renamed in place; ProdTask FKs are preserved (same row mutated).
1590 If empty, a new Campaign(lifecycle=target_lifecycle) is created.
1591 Operator-initiated only — never call from a sync / refresh handler
1592 (see feedback-humans-switch-lifecycle).
1593 """
1594 if not new_name:
1595 raise ServiceError('set_pcs_campaign_lifecycle: empty target name')
1596 if target_lifecycle not in ('current', 'last'):
1597 raise ServiceError(f'unsupported lifecycle {target_lifecycle!r}')
1598 existing = (Campaign.objects.filter(lifecycle=target_lifecycle)
1599 .order_by('-updated_at').first())
1600 if existing is not None:
1601 if existing.name == new_name:
1602 return {'changed': False, 'name': new_name, 'lifecycle': target_lifecycle}
1603 if Campaign.objects.filter(name=new_name).exclude(pk=existing.pk).exists():
1604 raise ServiceError(
1605 f'Campaign named {new_name!r} already exists; '
1606 f'cannot rename {existing.name!r} into it')
1607 old_name = existing.name
1608 existing.name = new_name
1609 existing.save(update_fields=['name', 'updated_at'])
1610 _log.info('PCS %s campaign renamed: %s -> %s (by %s)',
1611 target_lifecycle, old_name, new_name, created_by)
1612 return {'changed': True, 'old_name': old_name, 'name': new_name,
1613 'lifecycle': target_lifecycle, 'created': False}
1614
1615
1616 if Campaign.objects.filter(name=new_name).exists():
1617 raise ServiceError(
1618 f'Campaign named {new_name!r} already exists; '
1619 f'cannot create a new {target_lifecycle} with that name')
1620 Campaign.objects.create(name=new_name, lifecycle=target_lifecycle,
1621 created_by=created_by)
1622 _log.info('PCS %s campaign created: %s (by %s)',
1623 target_lifecycle, new_name, created_by)
1624 return {'changed': True, 'old_name': None, 'name': new_name,
1625 'lifecycle': target_lifecycle, 'created': True}
1626
1627
1628
1629 def rename_pcs_current_campaign(new_name, *, created_by='operator'):
1630 return set_pcs_campaign_lifecycle(new_name, 'current', created_by=created_by)
1631
1632
1633 def prodtask_record_submission(*, task, jedi_task_id, new_status='submitted'):
1634 """
1635 Record outcome of a JEDI submission. Two gates:
1636
1637 - ``task.status`` must be 'ready' (no submit from draft, no re-submit).
1638 - ``task.panda_task_id`` must be unset; refuses to overwrite (409).
1639 """
1640 if task.status != 'ready':
1641 raise ServiceError(
1642 f'Task must be in status=ready before submission '
1643 f'(current: {task.status!r}). Mark it ready via '
1644 f'set-status first.'
1645 )
1646 if task.panda_task_id is not None:
1647 raise ServiceError(
1648 f'Task already records panda_task_id={task.panda_task_id}. '
1649 f'Refusing to overwrite.',
1650 status=409,
1651 )
1652 try:
1653 task.panda_task_id = int(jedi_task_id)
1654 except (TypeError, ValueError):
1655 raise ServiceError('jedi_task_id must be an integer')
1656
1657 valid = _known_prodtask_statuses()
1658 if new_status not in valid:
1659 raise ServiceError(
1660 f'Invalid status. Choose from: {", ".join(sorted(valid))}'
1661 )
1662 task.status = new_status
1663 task.save(update_fields=['panda_task_id', 'status', 'updated_at'])
1664 return task