Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /swf-monitor/src/pcs/services.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 """
0002 PCS business logic — single source of truth for intake, linkage,
0003 lifecycle, and submission state changes.
0004 
0005 Both REST viewset actions and MCP tools call these functions. Functions
0006 take plain Python types (no HTTP request, no MCP context) and return
0007 model instances or raise ServiceError. The caller translates errors
0008 to its native shape (DRF Response, MCP error dict).
0009 """
0010 import csv as _csv
0011 import hashlib as _hashlib
0012 import logging as _logging
0013 import re as _re
0014 
0015 from django.db import transaction
0016 
0017 _log = _logging.getLogger(__name__)
0018 
0019 from .models import (
0020     Dataset, ProdConfig, ProdTask,
0021     Campaign, ProdRequest,
0022     PhysicsTag, EvgenTag, SimuTag, RecoTag,
0023 )
0024 
0025 
0026 class ServiceError(Exception):
0027     """Domain error with an HTTP-shaped status hint and detail message."""
0028     def __init__(self, detail, status=400):
0029         self.detail = detail
0030         self.status = status
0031         super().__init__(detail)
0032 
0033 
0034 # Allowed ProdTask lifecycle transitions. Submission and post-submission
0035 # state changes are recorded via prodtask_record_submission and
0036 # automation, not direct human transitions.
0037 PRODTASK_TRANSITIONS = {
0038     'draft':     {'ready'},
0039     'ready':     {'draft', 'submitted'},
0040     'submitted': {'completed', 'failed'},
0041     'completed': set(),
0042     'failed':    set(),
0043 }
0044 
0045 PUBLIC_CATALOG_KEYS = (
0046     'public_catalog_repo', 'public_catalog_issue', 'public_catalog_pr',
0047     'public_catalog_row_index', 'public_catalog_csv_path',
0048     'public_catalog_row_key', 'public_catalog_page_url',
0049     'public_catalog_commit_sha',
0050 )
0051 
0052 # Fields copied from ProdRequest → ProdTask at task creation. Both rows
0053 # are independently mutable thereafter. Per
0054 # memory:feedback-denormalization-ok — the duplication is intentional so
0055 # catalog filter/display reads direct ProdTask columns without joining.
0056 REQUEST_TO_TASK_COPY_FIELDS = (
0057     'requestor', 'priority',
0058     'pre_tdr_use', 'early_science_use', 'other_use', 'new_request',
0059 )
0060 
0061 
0062 # ---------------------------------------------------------------------------
0063 # Dataset
0064 # ---------------------------------------------------------------------------
0065 
0066 def dataset_intake(*, source_location, source_kind='csv_manifest',
0067                    scope='group.EIC.evgen', stage='evgen',
0068                    detector_version=None, detector_config=None,
0069                    physics_tag_label=None, evgen_tag_label=None,
0070                    simu_tag_label=None, reco_tag_label=None,
0071                    description='', created_by):
0072     """
0073     Idempotent intake of an external (e.g. EVGEN CSV manifest) Dataset.
0074 
0075     Idempotency key: ``(source.kind, source.location)``.
0076 
0077     Returns ``(dataset, was_created)``.
0078     """
0079     if not source_location:
0080         raise ServiceError('source_location is required')
0081 
0082     existing = Dataset.objects.filter(
0083         metadata__source__location=source_location,
0084         metadata__source__kind=source_kind,
0085     ).first()
0086     if existing:
0087         return existing, False
0088 
0089     required = {
0090         'detector_version': detector_version,
0091         'detector_config':  detector_config,
0092         'physics_tag':      physics_tag_label,
0093         'evgen_tag':        evgen_tag_label,
0094         'simu_tag':         simu_tag_label,
0095         'reco_tag':         reco_tag_label,
0096     }
0097     missing = [k for k, v in required.items() if not v]
0098     if missing:
0099         raise ServiceError(
0100             f'No existing Dataset for {source_kind}:{source_location}; '
0101             f'creation requires: {", ".join(missing)}'
0102         )
0103 
0104     tag_specs = {
0105         'physics_tag': (PhysicsTag, physics_tag_label),
0106         'evgen_tag':   (EvgenTag,   evgen_tag_label),
0107         'simu_tag':    (SimuTag,    simu_tag_label),
0108         'reco_tag':    (RecoTag,    reco_tag_label),
0109     }
0110     tags = {}
0111     for field, (model, label) in tag_specs.items():
0112         tag = model.objects.filter(tag_label=label).first()
0113         if not tag:
0114             raise ServiceError(f'{field} not found: {label}')
0115         if tag.status != 'locked':
0116             raise ServiceError(f'{field} {label} must be locked before use')
0117         tags[field] = tag
0118 
0119     ds = Dataset(
0120         scope=scope,
0121         detector_version=detector_version,
0122         detector_config=detector_config,
0123         description=description,
0124         metadata={
0125             'stage': stage,
0126             'source': {'kind': source_kind, 'location': source_location},
0127         },
0128         created_by=created_by,
0129         **tags,
0130     )
0131     try:
0132         ds.save()
0133     except Exception as e:
0134         raise ServiceError(str(e))
0135     return ds, True
0136 
0137 
0138 # ---------------------------------------------------------------------------
0139 # ProdTask
0140 # ---------------------------------------------------------------------------
0141 
0142 def prodtask_intake(*, payload, created_by):
0143     """
0144     Idempotent intake of a draft ProdTask.
0145 
0146     Idempotency key (one of, in payload):
0147         public_catalog_issue (preferred), or
0148         (public_catalog_csv_path, public_catalog_row_key)
0149 
0150     On match the existing task is updated (catalogue mapping merged
0151     into overrides; description optionally refreshed; input_dataset_did
0152     optionally set/updated). On no match, a draft task is created and
0153     requires ``name``, ``dataset`` (DID or name), and ``prod_config``
0154     (name) in the payload.
0155 
0156     Returns ``(task, was_created)``.
0157     """
0158     key_issue = payload.get('public_catalog_issue')
0159     key_csv_path = payload.get('public_catalog_csv_path')
0160     key_row = payload.get('public_catalog_row_key')
0161     if not key_issue and not (key_csv_path and key_row):
0162         raise ServiceError(
0163             'Idempotency key required: public_catalog_issue or '
0164             '(public_catalog_csv_path, public_catalog_row_key)'
0165         )
0166 
0167     if key_issue:
0168         existing = ProdTask.objects.filter(
0169             overrides__public_catalog_issue=key_issue
0170         ).first()
0171     else:
0172         existing = ProdTask.objects.filter(
0173             overrides__public_catalog_csv_path=key_csv_path,
0174             overrides__public_catalog_row_key=key_row,
0175         ).first()
0176 
0177     new_catalog = {k: payload[k] for k in PUBLIC_CATALOG_KEYS if k in payload}
0178     new_input_did = payload.get('input_dataset_did')
0179 
0180     if existing:
0181         ov = dict(existing.overrides or {})
0182         ov.update(new_catalog)
0183         if new_input_did:
0184             ov['input_dataset_did'] = new_input_did
0185         existing.overrides = ov
0186         update_fields = ['overrides', 'updated_at']
0187         if payload.get('description') is not None:
0188             existing.description = payload['description']
0189             update_fields.append('description')
0190         existing.save(update_fields=update_fields)
0191         return existing, False
0192 
0193     name = payload.get('name')
0194     dataset_handle = payload.get('dataset')
0195     config_handle = payload.get('prod_config')
0196     missing = [k for k, v in [('name', name),
0197                               ('dataset', dataset_handle),
0198                               ('prod_config', config_handle)]
0199                if not v]
0200     if missing:
0201         raise ServiceError(
0202             f'Creating a new task requires: {", ".join(missing)}'
0203         )
0204     output_ds = (Dataset.objects.filter(did=dataset_handle).first()
0205                  or Dataset.objects.filter(dataset_name=dataset_handle).first())
0206     if not output_ds:
0207         raise ServiceError(f'Output dataset not found: {dataset_handle}')
0208     config = ProdConfig.objects.filter(name=config_handle).first()
0209     if not config:
0210         raise ServiceError(f'ProdConfig not found: {config_handle}')
0211 
0212     ov = dict(new_catalog)
0213     if new_input_did:
0214         ov['input_dataset_did'] = new_input_did
0215     task = ProdTask.objects.create(
0216         name=name,
0217         description=payload.get('description', ''),
0218         status='draft',
0219         dataset=output_ds,
0220         prod_config=config,
0221         overrides=ov or None,
0222         created_by=created_by,
0223     )
0224     return task, True
0225 
0226 
0227 def prodtask_link_input(*, task, did=None, dids=None):
0228     """
0229     Link input Dataset(s) to a ProdTask via overrides JSON. Provide
0230     one of ``did`` (single) or ``dids`` (list), not both. Linked
0231     Datasets must already exist; this never creates Datasets.
0232     """
0233     if did and dids:
0234         raise ServiceError('Provide one of did or dids, not both')
0235     if not did and not dids:
0236         raise ServiceError('did or dids is required')
0237     targets = [did] if did else list(dids)
0238     found = set(Dataset.objects.filter(did__in=targets)
0239                 .values_list('did', flat=True))
0240     missing = [d for d in targets if d not in found]
0241     if missing:
0242         raise ServiceError(f'Dataset(s) not found: {missing}')
0243     ov = dict(task.overrides or {})
0244     if did:
0245         ov['input_dataset_did'] = did
0246         ov.pop('input_dataset_dids', None)
0247     else:
0248         ov['input_dataset_dids'] = list(dids)
0249         ov.pop('input_dataset_did', None)
0250     task.overrides = ov
0251     task.save(update_fields=['overrides', 'updated_at'])
0252     return task
0253 
0254 
0255 def _known_prodtask_statuses():
0256     """Universe of legal ProdTask status values, derived from the
0257     transition map rather than a CharField choices enum."""
0258     known = set(PRODTASK_TRANSITIONS.keys())
0259     for trans in PRODTASK_TRANSITIONS.values():
0260         known.update(trans)
0261     return known
0262 
0263 
0264 def prodtask_set_status(*, task, new_status):
0265     """Lifecycle transition with rule enforcement."""
0266     valid = _known_prodtask_statuses()
0267     if new_status not in valid:
0268         raise ServiceError(
0269             f'Invalid status. Choose from: {", ".join(sorted(valid))}'
0270         )
0271     allowed = PRODTASK_TRANSITIONS.get(task.status, set())
0272     if new_status != task.status and new_status not in allowed:
0273         raise ServiceError(
0274             f'Cannot transition from {task.status!r} to {new_status!r}. '
0275             f'Allowed from {task.status!r}: '
0276             f'{sorted(allowed) or "(terminal)"}'
0277         )
0278     task.status = new_status
0279     task.save(update_fields=['status', 'updated_at'])
0280     return task
0281 
0282 
0283 def prodtask_apply_request(task, request, *, save=True):
0284     """Copy seed fields from ``request`` onto ``task`` (request → task).
0285 
0286     Mutates ``task.request`` and the fields listed in
0287     ``REQUEST_TO_TASK_COPY_FIELDS``. Saves by default. After this call
0288     both rows are independently mutable; a later request edit does
0289     *not* automatically resync to the task. Call this again explicitly
0290     if a resync is desired.
0291     """
0292     if request is None:
0293         raise ServiceError('request is required')
0294     task.request = request
0295     for field in REQUEST_TO_TASK_COPY_FIELDS:
0296         setattr(task, field, getattr(request, field))
0297     if save:
0298         task.save(update_fields=['request'] + list(REQUEST_TO_TASK_COPY_FIELDS)
0299                                 + ['updated_at'])
0300     return task
0301 
0302 
0303 # ---------------------------------------------------------------------------
0304 # Campaign
0305 # ---------------------------------------------------------------------------
0306 
0307 def campaign_set_current(campaign):
0308     """Atomically promote ``campaign`` to lifecycle='current'.
0309 
0310     Any other Campaign currently at 'current' is demoted to 'past'.
0311     No-op if ``campaign`` is already current. Service-layer enforcement
0312     (no DB constraint) — direct admin saves can violate the invariant.
0313     """
0314     with transaction.atomic():
0315         if campaign.lifecycle == 'current':
0316             return campaign
0317         Campaign.objects.filter(lifecycle='current').exclude(pk=campaign.pk) \
0318                         .update(lifecycle='past')
0319         campaign.lifecycle = 'current'
0320         campaign.save(update_fields=['lifecycle', 'updated_at'])
0321     return campaign
0322 
0323 
0324 def campaign_clone_to_new(source, *, name, created_by,
0325                           description='', lifecycle='future'):
0326     """Create a new Campaign whose ``clone_of`` points to ``source``.
0327 
0328     The new campaign is blank — tasks are not cloned by this helper.
0329     Task cloning is a separate operation (potentially per-task, as
0330     dataset rebinding for the new campaign's detector_version is
0331     non-trivial). Use ``campaign_set_current`` separately to promote.
0332     """
0333     if Campaign.objects.filter(name=name).exists():
0334         raise ServiceError(f'Campaign name already in use: {name}')
0335     if lifecycle not in {'past', 'current', 'future'}:
0336         raise ServiceError(f'Invalid lifecycle: {lifecycle!r}')
0337     new_c = Campaign.objects.create(
0338         name=name,
0339         lifecycle=lifecycle,
0340         description=description,
0341         clone_of=source,
0342         created_by=created_by,
0343     )
0344     return new_c
0345 
0346 
0347 # ---------------------------------------------------------------------------
0348 # Submission state changes
0349 # ---------------------------------------------------------------------------
0350 
0351 # ---------------------------------------------------------------------------
0352 # Default-datasets CSV import (Sakib's epic-prod docs/_data/datasets.csv)
0353 # ---------------------------------------------------------------------------
0354 
0355 # Local path to the cloned epic-prod CSV on swf-testbed.
0356 DEFAULT_DATASETS_CSV_PATH = (
0357     '/data/wenauseic/github/epic-prod/docs/_data/datasets.csv'
0358 )
0359 
0360 
0361 _YESNO_RECOGNISED = {'', 'yes', 'y', 'no', 'n', 'true', 'false', '0', '1'}
0362 
0363 
0364 def _yesno(value):
0365     """Parse Sakib's Yes/No into a boolean; everything else -> False.
0366 
0367     Logs a WARN when a non-empty value is unrecognised (e.g. 'Maybe'),
0368     so silent drops surface in the log. NO-SILENT-FAILURES rule.
0369     """
0370     s = str(value or '').strip().lower()
0371     if s and s not in _YESNO_RECOGNISED:
0372         _log.warning('_yesno: unrecognised value %r treated as False', value)
0373     return s in ('yes', 'y', 'true', '1')
0374 
0375 
0376 _GH_EIC_RELEASE_RE = _re.compile(r'https?://github\.com/eic/([^/]+)/releases\b', _re.I)
0377 _GH_OTHER_RELEASE_RE = _re.compile(r'https?://github\.com/([^/]+)/([^/]+)/releases\b', _re.I)
0378 _GL_EIC_RE   = _re.compile(r'https?://gitlab\.com/eic/([^?#]+?)/-/', _re.I)
0379 _GL_OTHER_RE = _re.compile(r'https?://gitlab\.com/([^?#]+?)/-/', _re.I)
0380 
0381 # Owner / group prefixes to strip from the extracted evgen identifier.
0382 # They're consistent organisational namespaces, not generator names, so
0383 # leaving them in just makes the filter values noisy and prevents merging
0384 # of variants. Add new prefixes here as new sources appear.
0385 _EVGEN_STRIP_PREFIXES = ('JeffersonLab/', 'mceg/')
0386 
0387 
0388 def _extract_evgen(value):
0389     """Derive a short evgen identifier from a Generator/Dataset Version cell.
0390 
0391     - github.com/eic/<repo>/releases/...            -> '<repo>'
0392     - github.com/<owner>/<repo>/releases/... (non-eic) -> '<owner>/<repo>'
0393     - gitlab.com/eic/<path>/-/...                   -> '<path>'  (collapses
0394                                                        tag variants of the
0395                                                        same project under
0396                                                        one filter value)
0397     - gitlab.com/<path>/-/... (non-eic)             -> '<path>'
0398     - plain strings ('starlight', 'Pythia 8', ...)  -> unchanged
0399 
0400     Stored at ingest in overrides.csv_import.evgen so the catalog can
0401     filter and group by generator without re-parsing per render. The
0402     catalog will eventually bind these to canonical EvgenTag entries.
0403     """
0404     s = str(value or '').strip()
0405     if not s:
0406         return ''
0407     out = s
0408     m = _GH_EIC_RELEASE_RE.match(s)
0409     if m:
0410         out = m.group(1)
0411     elif (m := _GH_OTHER_RELEASE_RE.match(s)):
0412         out = f'{m.group(1)}/{m.group(2)}'
0413     elif (m := _GL_EIC_RE.match(s)):
0414         out = m.group(1)
0415     elif (m := _GL_OTHER_RE.match(s)):
0416         out = m.group(1)
0417     for prefix in _EVGEN_STRIP_PREFIXES:
0418         if out.startswith(prefix):
0419             return out[len(prefix):]
0420     return out
0421 
0422 
0423 def _parse_event_count(value):
0424     """Parse Sakib's event-count strings ('400k', '1M', '2.75M', plain
0425     integers) into an absolute event count.
0426 
0427     Returns None for empty input (legitimate); WARN-logs and returns
0428     None for non-empty unparseable input. NO-SILENT-FAILURES rule —
0429     the previous strict int() parse dropped every populated row in
0430     the CSV without surfacing it anywhere.
0431     """
0432     raw = value
0433     s = str(value or '').strip().replace(',', '').replace(' ', '')
0434     if not s:
0435         return None
0436     mult = 1
0437     if s[-1] in ('k', 'K'):
0438         mult, s = 1_000, s[:-1]
0439     elif s[-1] in ('m', 'M'):
0440         mult, s = 1_000_000, s[:-1]
0441     elif s[-1] in ('b', 'B', 'g', 'G'):
0442         mult, s = 1_000_000_000, s[:-1]
0443     try:
0444         return int(round(float(s) * mult))
0445     except (TypeError, ValueError):
0446         _log.warning('_parse_event_count: unparseable value %r', raw)
0447         return None
0448 
0449 
0450 def _possible(value):
0451     """Truthy unless explicitly negative — 'Maybe' counts as yes.
0452     Used for pTDR ('possible pre-TDR use') and Other Use, where any
0453     non-empty, non-No marker indicates the use applies."""
0454     s = str(value or '').strip().lower()
0455     return bool(s) and s not in ('no', 'n', 'false', '0')
0456 
0457 
0458 def _csvimport_slug(dataset_path, gen_version):
0459     """Short stable slug derived from the (path, generator) idempotency key."""
0460     key = f'{dataset_path}|{gen_version}'.encode()
0461     return _hashlib.sha1(key).hexdigest()[:12]
0462 
0463 
0464 def _ensure_csvimport_anchors():
0465     """Resolve the placeholder Dataset-FK targets used by CSV-imported rows.
0466 
0467     Returns ``(physics_tag, evgen_tag, simu_tag, reco_tag, prod_config,
0468     campaign)``. All must already exist; this is a lookup, not a
0469     creator. ``PROTECT`` FKs from Dataset to Tag and from ProdTask to
0470     ProdConfig mean we cannot synthesize ad hoc — we pin to whatever's
0471     already locked in the DB. The prod team can replace these per-task
0472     when they bind a real Dataset/Config to a CSV-imported task.
0473     """
0474     def first_locked(model, label):
0475         t = model.objects.filter(status='locked').order_by('tag_number').first()
0476         if not t:
0477             raise ServiceError(f'No locked {label} tag available for CSV import')
0478         return t
0479 
0480     physics = first_locked(PhysicsTag, 'physics')
0481     evgen = first_locked(EvgenTag, 'evgen')
0482     simu = first_locked(SimuTag, 'simu')
0483     reco = first_locked(RecoTag, 'reco')
0484 
0485     cfg = (ProdConfig.objects.filter(name__icontains='26.02.0 Standard').first()
0486            or ProdConfig.objects.first())
0487     if not cfg:
0488         raise ServiceError('No ProdConfig available for CSV import anchor')
0489 
0490     campaign = Campaign.objects.filter(lifecycle='current').order_by('-updated_at').first()
0491     if not campaign:
0492         raise ServiceError('No current Campaign for CSV import')
0493 
0494     return physics, evgen, simu, reco, cfg, campaign
0495 
0496 
0497 def import_default_datasets_csv(csv_path=None, *, created_by='csv_import'):
0498     """Import Sakib's epic-prod datasets.csv into the catalog.
0499 
0500     Each CSV row becomes (idempotently):
0501       - one Dataset (placeholder tags + ``26.02.0`` campaign labels,
0502         full row preserved in ``metadata['csv_import']``),
0503       - one ProdTask (status='csv_import', linked to the Dataset, the
0504         anchor ProdConfig, and the current Campaign).
0505 
0506     Idempotency key per row: ``(Dataset Path, Generator/Dataset Version)``.
0507     Re-running updates the existing rows in place.
0508 
0509     Returns dict::
0510 
0511         {'rows': int, 'created': int, 'updated': int, 'errors': [str, ...]}
0512     """
0513     path = csv_path or DEFAULT_DATASETS_CSV_PATH
0514     physics, evgen, simu, reco, cfg, campaign = _ensure_csvimport_anchors()
0515 
0516     with open(path, newline='') as f:
0517         rows = list(_csv.DictReader(f))
0518 
0519     summary = {'rows': len(rows), 'created': 0, 'updated': 0, 'errors': []}
0520 
0521     with transaction.atomic():
0522         for i, row in enumerate(rows, 1):
0523             ds_path = (row.get('Dataset Path') or '').strip()
0524             gen_ver = (row.get('Generator/Dataset Version') or '').strip()
0525             if not ds_path and not gen_ver:
0526                 summary['errors'].append(f'row {i}: empty path and gen_version, skipped')
0527                 continue
0528 
0529             slug = _csvimport_slug(ds_path, gen_ver)
0530             dataset_name = f'csv_import.{slug}'
0531             task_name = f'csv_import.{slug}'
0532 
0533             raw_priority = (row.get('Priority') or '').strip()
0534             try:
0535                 priority = int(raw_priority) if raw_priority else None
0536             except (TypeError, ValueError):
0537                 _log.warning('csv_import row %d: unparseable Priority %r', i, raw_priority)
0538                 priority = None
0539             nevents = _parse_event_count(row.get('Number of Events'))
0540 
0541             metadata = {
0542                 'stage': 'evgen',
0543                 'source': {'kind': 'csv_manifest', 'location': ds_path},
0544                 'csv_import': {k: (v or '').strip() for k, v in row.items()},
0545             }
0546             if gen_ver:
0547                 metadata['source']['gen_version'] = gen_ver
0548 
0549             ds = Dataset.objects.filter(dataset_name=dataset_name, block_num=1).first()
0550             if ds:
0551                 ds.description = (row.get('Description') or '').strip()
0552                 ds.metadata = metadata
0553                 ds.save()
0554                 ds_created = False
0555             else:
0556                 ds = Dataset(
0557                     dataset_name=dataset_name,
0558                     scope='group.EIC',
0559                     detector_version='26.02.0',
0560                     detector_config='epic_craterlake',
0561                     physics_tag=physics, evgen_tag=evgen,
0562                     simu_tag=simu, reco_tag=reco,
0563                     description=(row.get('Description') or '').strip(),
0564                     metadata=metadata,
0565                     created_by=created_by,
0566                 )
0567                 ds.save()
0568                 ds_created = True
0569 
0570             task_defaults = dict(
0571                 description=(row.get('Description') or '').strip(),
0572                 status='csv_import',
0573                 dataset=ds,
0574                 prod_config=cfg,
0575                 campaign=campaign,
0576                 requestor=(row.get('DSC or PWG') or '').strip().upper(),
0577                 priority=priority,
0578                 pre_tdr_use=_possible(row.get('Pre-TDR Use')),
0579                 early_science_use=_yesno(row.get('Early Science Use')),
0580                 other_use=_possible(row.get('Other Use')),
0581                 new_request=_yesno(row.get('New Request')),
0582                 overrides={
0583                     'csv_import': {
0584                         'background': (row.get('Background') or '').strip(),
0585                         'nevents': nevents,
0586                         'issue_url': (row.get('Issue') or '').strip(),
0587                         'gen_version': gen_ver,
0588                         'evgen': _extract_evgen(gen_ver),
0589                         'other_use_text': (row.get('Other Use') or '').strip(),
0590                         'filters': _extract_csv_filters(ds_path, ds.detector_config),
0591                     },
0592                 },
0593                 created_by=created_by,
0594             )
0595 
0596             existing = ProdTask.objects.filter(name=task_name).first()
0597             if existing:
0598                 # Preserve status once an operator has moved the task off
0599                 # csv_import — re-imports must not roll back lifecycle.
0600                 preserve_status = existing.status != 'csv_import'
0601                 for k, v in task_defaults.items():
0602                     if k == 'status' and preserve_status:
0603                         continue
0604                     if k == 'overrides':
0605                         # Merge so non-csv_import override keys (added by
0606                         # operators or other code paths) are preserved;
0607                         # the csv_import bucket itself is fully refreshed.
0608                         merged = dict(existing.overrides or {})
0609                         merged.update(v)
0610                         v = merged
0611                     setattr(existing, k, v)
0612                 existing.save()
0613                 summary['updated'] += 1
0614             else:
0615                 ProdTask.objects.create(name=task_name, **task_defaults)
0616                 summary['created'] += 1
0617 
0618     return summary
0619 
0620 
0621 # ---------------------------------------------------------------------------
0622 # Past-campaign output ingest (epic-prod FULL/RECO/<version>/index.md)
0623 # ---------------------------------------------------------------------------
0624 
0625 EPIC_PROD_PATH = '/data/wenauseic/github/epic-prod'
0626 PAST_CAMPAIGN_STAGES = ('FULL', 'RECO')
0627 PAST_CAMPAIGN_YEAR_PREFIXES = ('25.', '26.')   # 2025 + 2026 campaigns
0628 
0629 _PAST_DID_RE   = _re.compile(r'^===\s*(epic:/\S+)\s*===\s*$')
0630 _PAST_RSE_RE   = _re.compile(r'RSE:\s*(\S+)\s+Files:\s*(\d+)/(\d+)\s*\(([^)]+)\)')
0631 _PAST_SIZE_RE  = _re.compile(r'Total Size:\s*([\d.]+)\s*([KMGTP]?B)\s*\((\d+)\s*files?\)', _re.I)
0632 _PAST_SUMMARY_RE = _re.compile(r'^===\s*CAMPAIGN SUMMARY\s*===\s*$')
0633 
0634 _SIZE_UNIT_BYTES = {'B': 1, 'KB': 1_000, 'MB': 1_000_000,
0635                     'GB': 1_000_000_000, 'TB': 1_000_000_000_000,
0636                     'PB': 1_000_000_000_000_000}
0637 
0638 
0639 def _parse_size_to_bytes(value, unit):
0640     try:
0641         n = float(value)
0642     except (TypeError, ValueError):
0643         _log.warning('past_ingest: unparseable size value %r', value)
0644         return 0
0645     mult = _SIZE_UNIT_BYTES.get(unit.upper())
0646     if mult is None:
0647         _log.warning('past_ingest: unrecognised size unit %r', unit)
0648         return 0
0649     return int(round(n * mult))
0650 
0651 
0652 def _parse_past_index(text):
0653     """Parse an epic-prod docs/<STAGE>/<version>/index.md file.
0654 
0655     Yields dicts: {did, rses: [{name, files, total, status}], file_count,
0656     data_size_bytes, complete}. The campaign summary block at the end is
0657     skipped.
0658     """
0659     block = None
0660 
0661     def _emit():
0662         nonlocal block
0663         if block:
0664             yield_block, block = block, None
0665             return yield_block
0666         return None
0667 
0668     for raw in text.splitlines():
0669         line = raw.strip()
0670         if not line or line == '```':
0671             continue
0672         if _PAST_SUMMARY_RE.match(line):
0673             done = _emit()
0674             if done is not None:
0675                 yield done
0676             break
0677         m = _PAST_DID_RE.match(line)
0678         if m:
0679             done = _emit()
0680             if done is not None:
0681                 yield done
0682             block = {'did': m.group(1), 'rses': [], 'file_count': 0,
0683                      'data_size_bytes': 0, 'complete': True}
0684             continue
0685         if block is None:
0686             continue
0687         m = _PAST_RSE_RE.search(line)
0688         if m:
0689             files, total, status = int(m.group(2)), int(m.group(3)), m.group(4).strip()
0690             block['rses'].append({'name': m.group(1), 'files': files,
0691                                   'total': total, 'status': status})
0692             if status != 'complete':
0693                 block['complete'] = False
0694             continue
0695         m = _PAST_SIZE_RE.search(line)
0696         if m:
0697             block['data_size_bytes'] = _parse_size_to_bytes(m.group(1), m.group(2))
0698             block['file_count'] = int(m.group(3))
0699             continue
0700     done = _emit()
0701     if done is not None:
0702         yield done
0703 
0704 
0705 _PAST_BEAM_RE = _re.compile(r'(\d+x\d+)')
0706 _PAST_Q2_RE   = _re.compile(r'(minQ2=\d+|q2_\d+(?:to\d+)?)')
0707 _PAST_PHYS_TOP = ('DIS', 'SIDIS', 'DDIS', 'EXCLUSIVE', 'SINGLE', 'BACKGROUNDS')
0708 
0709 
0710 _PAST_ENERGY_BARE_RE = _re.compile(
0711     r'\b(\d+(?:\.\d+)?(?:eV|keV|MeV|GeV|TeV))\b', _re.I)
0712 
0713 
0714 def _extract_path_filters(segments):
0715     """Extract {beam, physics, q2, species, energy} from a list of path
0716     segments. Shared by the past-output DID and the current-tab CSV
0717     input-dataset path so both filter bars speak the same vocabulary.
0718     """
0719     seg_str = '/'.join(segments)
0720     beam_m = _PAST_BEAM_RE.search(seg_str)
0721     q2_m   = _PAST_Q2_RE.search(seg_str)
0722     physics = next((p for p in _PAST_PHYS_TOP if p in segments), '')
0723     species, energy = '', ''
0724     if physics == 'SINGLE':
0725         i = segments.index('SINGLE')
0726         if len(segments) > i + 1:
0727             species = segments[i + 1]
0728         if len(segments) > i + 2:
0729             energy = segments[i + 2]
0730     if not energy:
0731         em = _PAST_ENERGY_BARE_RE.search(seg_str)
0732         if em:
0733             energy = em.group(1)
0734     # Fold DIS subtype (NC / CC) directly into physics so the Physics
0735     # filter shows NC and CC as siblings of DIS, not a separate row.
0736     dis_type = next((t for t in ('NC', 'CC') if t in segments), '')
0737     if physics == 'DIS' and dis_type:
0738         physics = dis_type
0739     return {
0740         'beam':    beam_m.group(1) if beam_m else '',
0741         'physics': physics,
0742         'q2':      q2_m.group(1) if q2_m else '',
0743         'species': species,
0744         'energy':  energy,
0745     }
0746 
0747 
0748 def _extract_past_filters(did):
0749     """Past-output filter fields: detector (from path segment 3) plus
0750     the shared {beam, physics, q2, species, energy}. Empty string for
0751     any dimension the path doesn't carry."""
0752     parts = did.split(':', 1)
0753     rest_str = (parts[1] if len(parts) == 2 else did).lstrip('/')
0754     segs = rest_str.split('/')
0755     detector = segs[2] if len(segs) > 2 else ''
0756     out = _extract_path_filters(segs[3:])
0757     out['detector'] = detector
0758     return out
0759 
0760 
0761 def _extract_csv_filters(path, detector_config):
0762     """Current-tab filter fields from a CSV input dataset path.
0763 
0764     Path shape: /volatile/eic/EPIC/EVGEN/<physics>/<gen>/.../<beam>/...
0765     Geometry comes from the dataset's detector_config field (not the
0766     path); everything else from the path tail after the EVGEN prefix.
0767     """
0768     segs = (path or '').lstrip('/').split('/')
0769     # Drop the fixed prefix volatile/eic/EPIC/EVGEN when present, but
0770     # be tolerant if a different shape appears later.
0771     if len(segs) >= 4 and segs[:4] == ['volatile', 'eic', 'EPIC', 'EVGEN']:
0772         tail = segs[4:]
0773     else:
0774         tail = segs
0775     out = _extract_path_filters(tail)
0776     out['detector'] = detector_config or ''
0777     return out
0778 
0779 
0780 def _decompose_past_did(did):
0781     """Break an epic-prod DID into the path-level fields we filter on.
0782 
0783     Expected form: epic:/STAGE/version/detector_config/<bg-chain>/<phys-chain>/beam/paramset
0784     Everything after detector_config is best-effort; what we don't recognise
0785     stays in metadata['past_output']['path_remainder'] for the row to render
0786     verbatim. NO-SILENT-FAILURES: a DID that doesn't even split into the
0787     leading STAGE/version/detector parts is logged.
0788     """
0789     parts = did.split(':', 1)
0790     rest = parts[1] if len(parts) == 2 else did
0791     rest = rest.lstrip('/')
0792     segs = rest.split('/')
0793     if len(segs) < 3:
0794         _log.warning('past_ingest: DID %r has too few segments', did)
0795         return {}
0796     out = {'stage': segs[0], 'version': segs[1], 'detector_config': segs[2]}
0797     if len(segs) > 3:
0798         out['path_remainder'] = '/'.join(segs[3:])
0799     return out
0800 
0801 
0802 def import_epic_prod_past_campaigns(*, epic_prod_path=EPIC_PROD_PATH,
0803                                     created_by='past_import'):
0804     """Import 2026 past-campaign output datasets from a cloned epic-prod.
0805 
0806     Walks ``docs/FULL/<v>/index.md`` and ``docs/RECO/<v>/index.md`` for
0807     every 2026 version listed in ``docs/_data/{full,reco}_content.yml``.
0808     The 'main' alias is excluded (it's a moving target, not a frozen
0809     archive).
0810 
0811     For each parsed dataset we get-or-create:
0812       - Campaign(name='{STAGE}/{version}', lifecycle='past')
0813       - Dataset(did=PCS internal DID, dataset_name='past.{STAGE}.{ver}.{slug}'),
0814         with the epic-prod Rucio DID stored in metadata['source']['location']
0815         and the per-RSE breakdown in metadata['past_output'].
0816       - ProdTask(name='past.{STAGE}.{ver}.{slug}', status='past_output',
0817         linked to the Dataset, anchor ProdConfig, and Campaign).
0818 
0819     Idempotency key: (STAGE, version, epic_prod_did). Re-running refreshes
0820     file_count / data_size / rse breakdown but leaves any operator-touched
0821     status / overrides intact (same rule as csv_import).
0822     """
0823     import os as _os
0824     import yaml as _yaml
0825     physics, evgen, simu, reco, cfg, _ = _ensure_csvimport_anchors()
0826 
0827     summary = {'campaigns': 0, 'rows': 0, 'created': 0, 'updated': 0, 'errors': []}
0828 
0829     versions_by_stage = {}
0830     for stage in PAST_CAMPAIGN_STAGES:
0831         yml_name = f'{stage.lower()}_content.yml'
0832         yml_path = _os.path.join(epic_prod_path, 'docs', '_data', yml_name)
0833         try:
0834             with open(yml_path) as f:
0835                 entries = _yaml.safe_load(f) or []
0836         except (OSError, _yaml.YAMLError) as e:
0837             summary['errors'].append(f'{stage}: cannot read {yml_name}: {e}')
0838             continue
0839         versions_by_stage[stage] = [
0840             e['text'] for e in entries
0841             if isinstance(e, dict)
0842             and any(e.get('text', '').startswith(p) for p in PAST_CAMPAIGN_YEAR_PREFIXES)
0843             and e['text'] != 'main'
0844         ]
0845 
0846     with transaction.atomic():
0847         for stage, versions in versions_by_stage.items():
0848             for version in versions:
0849                 campaign_name = f'{stage}/{version}'
0850                 index_path = _os.path.join(epic_prod_path, 'docs', stage, version, 'index.md')
0851                 try:
0852                     with open(index_path) as f:
0853                         text = f.read()
0854                 except OSError as e:
0855                     summary['errors'].append(f'{campaign_name}: {e}')
0856                     continue
0857 
0858                 campaign, _ = Campaign.objects.get_or_create(
0859                     name=campaign_name,
0860                     defaults={'lifecycle': 'past',
0861                               'description': f'{stage} campaign {version} '
0862                                              f'(epic-prod {index_path})',
0863                               'created_by': created_by},
0864                 )
0865                 if campaign.lifecycle != 'past':
0866                     campaign.lifecycle = 'past'
0867                     campaign.save(update_fields=['lifecycle'])
0868                 summary['campaigns'] += 1
0869 
0870                 campaign_files = 0
0871                 campaign_bytes = 0
0872                 for block in _parse_past_index(text):
0873                     summary['rows'] += 1
0874                     epic_did = block['did']
0875                     slug = _hashlib.sha1(epic_did.encode()).hexdigest()[:12]
0876                     pcs_name = f'past.{stage}.{version}.{slug}'
0877                     decomposed = _decompose_past_did(epic_did)
0878 
0879                     metadata = {
0880                         'stage': stage.lower(),
0881                         'source': {'kind': 'rucio_did', 'location': epic_did},
0882                         'past_output': {
0883                             'campaign_name': campaign_name,
0884                             'stage': stage,
0885                             'version': version,
0886                             'rses': block['rses'],
0887                             'complete': block['complete'],
0888                             'path': decomposed,
0889                             'filters': _extract_past_filters(epic_did),
0890                             'index_path': index_path,
0891                         },
0892                     }
0893 
0894                     pcs_did = f'group.EIC:{pcs_name}.b1'
0895                     ds, ds_created = Dataset.objects.get_or_create(
0896                         dataset_name=pcs_name, block_num=1,
0897                         defaults=dict(
0898                             scope='group.EIC', did=pcs_did,
0899                             detector_version=version,
0900                             detector_config=decomposed.get('detector_config', ''),
0901                             physics_tag=physics, evgen_tag=evgen,
0902                             simu_tag=simu, reco_tag=reco,
0903                             file_count=block['file_count'],
0904                             data_size=block['data_size_bytes'],
0905                             description='',
0906                             metadata=metadata,
0907                             created_by=created_by,
0908                         ),
0909                     )
0910                     if not ds_created:
0911                         ds.file_count = block['file_count']
0912                         ds.data_size = block['data_size_bytes']
0913                         ds.metadata = metadata
0914                         ds.detector_version = version
0915                         ds.detector_config = decomposed.get('detector_config', '')
0916                         ds.save()
0917 
0918                     task_defaults = dict(
0919                         description='',
0920                         dataset=ds,
0921                         prod_config=cfg,
0922                         campaign=campaign,
0923                         overrides={'past_output': metadata['past_output']},
0924                         created_by=created_by,
0925                     )
0926                     existing = ProdTask.objects.filter(name=pcs_name).first()
0927                     if existing:
0928                         preserve_status = existing.status != 'past_output'
0929                         for k, v in task_defaults.items():
0930                             if k == 'overrides':
0931                                 merged = dict(existing.overrides or {})
0932                                 merged.update(v)
0933                                 v = merged
0934                             setattr(existing, k, v)
0935                         if not preserve_status:
0936                             existing.status = 'past_output'
0937                         existing.save()
0938                         summary['updated'] += 1
0939                     else:
0940                         ProdTask.objects.create(name=pcs_name,
0941                                                 status='past_output',
0942                                                 **task_defaults)
0943                         summary['created'] += 1
0944                     campaign_files += block['file_count']
0945                     campaign_bytes += block['data_size_bytes']
0946 
0947                 campaign.data = {
0948                     **(campaign.data or {}),
0949                     'past_summary': {
0950                         'file_count': campaign_files,
0951                         'data_size_bytes': campaign_bytes,
0952                         'stage': stage,
0953                         'version': version,
0954                     },
0955                 }
0956                 campaign.save(update_fields=['data', 'updated_at'])
0957 
0958     return summary
0959 
0960 
0961 # ---------------------------------------------------------------------------
0962 # JLab Rucio current-campaign snapshot
0963 #
0964 # Output datasets land at JLab Rucio under scope `epic`. The nightly
0965 # epic-prod GitHub Action that generates docs/{FULL,RECO}/<v>/index.md is
0966 # exactly this same Rucio query, dumped to markdown. We pull it directly
0967 # so the Current tab can show 'Output: <files / size / RSEs>' on each
0968 # task row without waiting for the upstream nightly rebuild.
0969 #
0970 # Credentials are the public read-only eicread/eicread account
0971 # (matches the PandaBot jlab-rucio MCP config). Override via env.
0972 # ---------------------------------------------------------------------------
0973 
0974 JLAB_RUCIO_URL      = '/'.join(['https://rucio-server.jlab.org:443'])
0975 JLAB_RUCIO_ACCOUNT  = 'eicread'
0976 JLAB_RUCIO_USERNAME = 'eicread'
0977 JLAB_RUCIO_PASSWORD = 'eicread'
0978 RUCIO_SNAPSHOT_DIR  = '/opt/swf-monitor/shared/rucio-snapshots'
0979 
0980 
0981 def _jlab_rucio_auth(timeout=30):
0982     """userpass-auth against JLab Rucio. Returns the X-Rucio-Auth-Token."""
0983     import urllib.request as _ur
0984     import ssl as _ssl
0985     import os as _os
0986     ctx = _ssl.create_default_context()
0987     ctx.check_hostname = False
0988     ctx.verify_mode = _ssl.CERT_NONE
0989     url = (_os.environ.get('JLAB_RUCIO_URL') or JLAB_RUCIO_URL) + '/auth/userpass'
0990     req = _ur.Request(url)
0991     req.add_header('X-Rucio-Account',  _os.environ.get('JLAB_RUCIO_ACCOUNT',  JLAB_RUCIO_ACCOUNT))
0992     req.add_header('X-Rucio-Username', _os.environ.get('JLAB_RUCIO_USERNAME', JLAB_RUCIO_USERNAME))
0993     req.add_header('X-Rucio-Password', _os.environ.get('JLAB_RUCIO_PASSWORD', JLAB_RUCIO_PASSWORD))
0994     resp = _ur.urlopen(req, context=ctx, timeout=timeout)
0995     token = resp.headers['X-Rucio-Auth-Token']
0996     if not token:
0997         raise ServiceError('JLab Rucio auth returned no token')
0998     return token
0999 
1000 
1001 def _jlab_rucio_get(path, token, *, timeout=60, **q):
1002     """GET a JLab Rucio path with the auth token; returns response text."""
1003     import urllib.request as _ur
1004     import urllib.parse as _up
1005     import ssl as _ssl
1006     import os as _os
1007     ctx = _ssl.create_default_context()
1008     ctx.check_hostname = False
1009     ctx.verify_mode = _ssl.CERT_NONE
1010     url = (_os.environ.get('JLAB_RUCIO_URL') or JLAB_RUCIO_URL) + path
1011     if q:
1012         url += '?' + _up.urlencode(q)
1013     req = _ur.Request(url)
1014     req.add_header('X-Rucio-Auth-Token', token)
1015     return _ur.urlopen(req, context=ctx, timeout=timeout).read().decode()
1016 
1017 
1018 def _ndjson(text):
1019     """Parse a newline-delimited-JSON Rucio response (strings or dicts)."""
1020     import json as _json
1021     out = []
1022     for line in text.splitlines():
1023         line = line.strip()
1024         if not line:
1025             continue
1026         try:
1027             out.append(_json.loads(line))
1028         except _json.JSONDecodeError:
1029             out.append(line)
1030     return out
1031 
1032 
1033 def fetch_jlab_rucio_campaign(campaign_path, *, scope='epic', token=None,
1034                               max_workers=16):
1035     """Fetch the full Rucio snapshot for one campaign path (e.g. '/RECO/26.02.0').
1036 
1037     Returns {count, datasets:[{did, length, bytes, rse_replicas:[{rse, ...}]}, ...]}.
1038     Each dataset's rse_replicas mirrors what /replicas/<scope>/<name>/datasets
1039     returns — per-RSE found/total/state/bytes, exactly the shape the
1040     epic-prod nightly workflow uses to produce its index.md replica lines.
1041 
1042     Per-dataset metadata + replica fetches run in a ThreadPoolExecutor so a
1043     365-dataset campaign completes in ~5-10s instead of ~80s, keeping the
1044     'Update from Rucio' button under Apache's request timeout.
1045     """
1046     import json as _json
1047     from concurrent.futures import ThreadPoolExecutor as _Pool
1048     if token is None:
1049         token = _jlab_rucio_auth()
1050     names = _ndjson(_jlab_rucio_get(
1051         f'/dids/{scope}/dids/search', token,
1052         type='dataset', name=campaign_path + '/*'))
1053 
1054     def _one(name):
1055         if not isinstance(name, str):
1056             return None
1057         try:
1058             meta = _json.loads(_jlab_rucio_get(f'/dids/{scope}/{name}', token))
1059         except Exception as e:                                # noqa: BLE001
1060             _log.warning('rucio meta %s/%s: %s', scope, name, e)
1061             meta = {}
1062         try:
1063             rse_records = _ndjson(
1064                 _jlab_rucio_get(f'/replicas/{scope}/{name}/datasets', token))
1065         except Exception as e:                                # noqa: BLE001
1066             _log.warning('rucio replicas %s/%s: %s', scope, name, e)
1067             rse_records = []
1068         return {
1069             'did':          f'{scope}:{name}',
1070             'length':       meta.get('length'),
1071             'bytes':        meta.get('bytes'),
1072             'rse_replicas': rse_records,
1073         }
1074 
1075     with _Pool(max_workers=max_workers) as pool:
1076         results = list(pool.map(_one, names))
1077     datasets = [r for r in results if r is not None]
1078     return {'count': len(datasets), 'datasets': datasets}
1079 
1080 
1081 def _request_input_tail(ds_path):
1082     """Return the comparable tail of a CSV input dataset path.
1083 
1084     /volatile/eic/EPIC/EVGEN/<TAIL>  ->  '<TAIL>'  (lower-cased)
1085     anything else                    ->  '' (no match)
1086     """
1087     if not ds_path:
1088         return ''
1089     parts = ds_path.strip('/').split('/')
1090     if len(parts) >= 5 and parts[:4] == ['volatile', 'eic', 'EPIC', 'EVGEN']:
1091         return '/'.join(parts[4:]).lower()
1092     return ''
1093 
1094 
1095 def _did_path_tail(did):
1096     """Drop scope + /STAGE/<v>/<detector>/ and return the rest, lower-cased.
1097 
1098     epic:/RECO/26.02.0/epic_craterlake/DDIS/rapgap3.../noRad/ep/10x100/q2_1to10
1099         -> 'ddis/rapgap3.../norad/ep/10x100/q2_1to10'
1100     """
1101     name = did.split(':', 1)[-1].lstrip('/')
1102     parts = name.split('/')
1103     if len(parts) <= 3:
1104         return ''
1105     return '/'.join(parts[3:]).lower()
1106 
1107 
1108 def _aggregate_rucio_match(matches):
1109     """Per-stage rollup over a list of matched dataset records.
1110 
1111     Rucio's /dids/<scope>/<name> endpoint returns length/bytes as null
1112     for these datasets; the canonical file count and byte size live on
1113     each RSE replica record (they all carry the same value since RSEs
1114     hold replicas of the same data). Take the max across RSEs.
1115     """
1116     by_stage = {}
1117     for m in matches:
1118         st = by_stage.setdefault(m['stage'], {
1119             'count': 0, 'files': 0, 'bytes': 0,
1120             'rses': {}, 'incomplete': 0,
1121         })
1122         st['count'] += 1
1123         ds_files = m.get('length') or 0
1124         ds_bytes = m.get('bytes')  or 0
1125         complete = True
1126         for r in m.get('rse_replicas', []):
1127             rse = r.get('rse')
1128             if not rse:
1129                 continue
1130             ds_files = max(ds_files, r.get('length') or 0)
1131             ds_bytes = max(ds_bytes, r.get('bytes')  or 0)
1132             st['rses'].setdefault(rse, {'complete': 0, 'incomplete': 0})
1133             if r.get('available_length') == r.get('length') and r.get('length'):
1134                 st['rses'][rse]['complete'] += 1
1135             else:
1136                 st['rses'][rse]['incomplete'] += 1
1137                 complete = False
1138         st['files'] += ds_files
1139         st['bytes'] += ds_bytes
1140         if not complete:
1141             st['incomplete'] += 1
1142     return by_stage
1143 
1144 
1145 def _index_snapshot_by_tail(snapshot):
1146     """Pre-index a snapshot for fast tail-prefix lookup.
1147 
1148     Returns a list of (tail, dataset_record_with_stage) tuples sorted by
1149     tail length DESC, so the most-specific tails are tried first.
1150     """
1151     idx = []
1152     for cpath, info in snapshot.get('campaigns', {}).items():
1153         # cpath = '/RECO/26.02.0' or '/FULL/26.02.0'
1154         cp_parts = cpath.strip('/').split('/')
1155         stage = cp_parts[0] if cp_parts else ''
1156         for d in info.get('datasets', []):
1157             tail = _did_path_tail(d.get('did', ''))
1158             if not tail:
1159                 continue
1160             idx.append((tail, {**d, 'stage': stage}))
1161     return idx
1162 
1163 
1164 _Q2_MIN_RE   = _re.compile(r'^minQ2=([\d.]+)$', _re.I)
1165 _Q2_RANGE_RE = _re.compile(r'^q2_([\d.]+)to([\d.]+)$', _re.I)
1166 _Q2_POINT_RE = _re.compile(r'^q2_([\d.]+)$', _re.I)
1167 
1168 
1169 def _q2_range(s):
1170     """Convert a Q² label into a (lo, hi) numeric range.
1171 
1172     'minQ2=1'    -> (1, inf)
1173     'q2_1to10'   -> (1, 10)
1174     'q2_20'      -> (20, 20)
1175     anything else / empty -> None
1176     """
1177     if not s:
1178         return None
1179     m = _Q2_MIN_RE.match(s)
1180     if m:
1181         return (float(m.group(1)), float('inf'))
1182     m = _Q2_RANGE_RE.match(s)
1183     if m:
1184         return (float(m.group(1)), float(m.group(2)))
1185     m = _Q2_POINT_RE.match(s)
1186     if m:
1187         v = float(m.group(1))
1188         return (v, v)
1189     return None
1190 
1191 
1192 def _q2_overlap(req_q2, did_q2):
1193     """True if request and DID Q² labels could plausibly intersect."""
1194     if not req_q2 or not did_q2:
1195         return True   # absent on either side: don't block the match
1196     a = _q2_range(req_q2)
1197     b = _q2_range(did_q2)
1198     if a is None or b is None:
1199         return req_q2 == did_q2  # exact string fallback
1200     return a[0] <= b[1] and b[0] <= a[1]
1201 
1202 
1203 def _filter_match(req, did):
1204     """True if a request's filter fields match an output DID's.
1205 
1206     Matches on the semantic axes shared by both schemas (detector,
1207     beam, physics, Q²) rather than on path strings — the output DID
1208     carries extra segments (generator, radiation, charge) and a
1209     different Q² spelling that defeat string matching. See
1210     project-pcs-request-vs-output-paths.
1211     """
1212     for k in ('detector', 'beam', 'physics'):
1213         rv, dv = req.get(k), did.get(k)
1214         if rv and dv and rv != dv:
1215             return False
1216         if rv and not dv:
1217             return False
1218     if not _q2_overlap(req.get('q2'), did.get('q2')):
1219         return False
1220     # Species/energy only meaningful for SINGLE-physics paths; require
1221     # both sides to agree when both populated.
1222     for k in ('species', 'energy'):
1223         rv, dv = req.get(k), did.get(k)
1224         if rv and dv and rv != dv:
1225             return False
1226     return True
1227 
1228 
1229 def _did_tail_segments(did_tail):
1230     return did_tail.split('/') if did_tail else []
1231 
1232 
1233 def _path_aligned_match(did_segs, req_segs):
1234     """Is `req_segs` a contiguous subsequence of `did_segs`?
1235 
1236     Rucio output paths often carry a background-mixing chain prefix
1237     (e.g. Bkg_Exact1S_2us/GoldCt/5um/) AND a Q²-bin suffix that the
1238     CSV input tail doesn't carry. Subsequence matching handles both.
1239     """
1240     if not req_segs or len(req_segs) > len(did_segs):
1241         return False
1242     n = len(req_segs)
1243     for i in range(len(did_segs) - n + 1):
1244         if did_segs[i:i + n] == req_segs:
1245             return True
1246     return False
1247 
1248 
1249 def match_requests_to_rucio_snapshot(snapshot, *, campaign):
1250     """For every ProdTask in `campaign` whose CSV input has a usable tail,
1251     stash an `overrides['csv_import']['output']` rollup of matching Rucio
1252     datasets. A Rucio dataset matches when its DID-name tail
1253     (after /STAGE/<v>/<detector>/) contains the request input tail as a
1254     contiguous, path-aligned segment sequence — so BG-mixing prefixes
1255     and Q²-bin suffixes in the output don't break the match.
1256 
1257     Also stashes the unmatched Rucio datasets on
1258     ``campaign.data['rucio_unmatched']`` so the catalog view can surface
1259     them as synthetic table rows (unmatched output popping up).
1260     """
1261     # Precompute (rec, did_filters) so the matcher loop is O(req * did).
1262     idx = _index_snapshot_by_tail(snapshot)
1263     idx_filtered = [(rec, _extract_past_filters(rec['did']))
1264                     for _tail, rec in idx]
1265     qs = ProdTask.objects.filter(campaign=campaign).select_related('dataset')
1266     summary = {'tasks_seen': 0, 'tasks_matched': 0, 'tasks_unmatched': 0}
1267     matched_dids = set()
1268     for t in qs:
1269         summary['tasks_seen'] += 1
1270         # Prefer the persisted csv_import.filters block (already extracted
1271         # at ingest time) and fall back to a fresh extract from the CSV
1272         # input path for ProdTasks that predate the filter ingest.
1273         req_filters = ((t.overrides or {}).get('csv_import') or {}).get('filters') or {}
1274         if not req_filters:
1275             ds_path = (t.dataset.metadata or {}).get('source', {}).get('location', '') \
1276                 if (t.dataset.metadata or {}) else ''
1277             req_filters = _extract_csv_filters(ds_path, t.dataset.detector_config) \
1278                 if ds_path else {}
1279         matches = []
1280         if req_filters and any(req_filters.get(k) for k in ('beam', 'physics')):
1281             for rec, did_filters in idx_filtered:
1282                 if _filter_match(req_filters, did_filters):
1283                     matches.append(rec)
1284         for m in matches:
1285             matched_dids.add(m['did'])
1286         by_stage = _aggregate_rucio_match(matches)
1287         any_incomplete = any(s.get('incomplete', 0) for s in by_stage.values())
1288         has_output = len(matches) > 0
1289         if not has_output:
1290             output_status = 'no_output'
1291         elif any_incomplete:
1292             output_status = 'incomplete'
1293         else:
1294             output_status = 'complete'
1295         overrides = dict(t.overrides or {})
1296         cv = dict(overrides.get('csv_import', {}) or {})
1297         cv['output'] = {
1298             'by_stage':      by_stage,
1299             'matched_count': len(matches),
1300             'campaign_name': campaign.name,
1301             'status':        output_status,   # 'no_output' | 'complete' | 'incomplete'
1302             'has_output':    has_output,
1303             'has_incomplete': any_incomplete,
1304             'has_simu':      'FULL' in by_stage,
1305             'has_reco':      'RECO' in by_stage,
1306         }
1307         overrides['csv_import'] = cv
1308         t.overrides = overrides
1309         t.save(update_fields=['overrides', 'updated_at'])
1310         if matches:
1311             summary['tasks_matched'] += 1
1312         else:
1313             summary['tasks_unmatched'] += 1
1314 
1315     # Unmatched Rucio datasets: in the snapshot but no request matched.
1316     # Light-weight records — just the fields the catalog row needs.
1317     unmatched = []
1318     for cpath, info in (snapshot.get('campaigns') or {}).items():
1319         cp_parts = cpath.strip('/').split('/')
1320         stage = cp_parts[0] if cp_parts else ''
1321         for d in info.get('datasets') or []:
1322             did = d.get('did', '')
1323             if did and did not in matched_dids:
1324                 # Aggregate to a single-stage rollup of the SAME shape used
1325                 # for matched-request output so the template can reuse the
1326                 # output-line rendering.
1327                 rollup = _aggregate_rucio_match([{**d, 'stage': stage}])
1328                 files = sum(s['files'] for s in rollup.values())
1329                 bytes_ = sum(s['bytes'] for s in rollup.values())
1330                 any_incomplete = any(s.get('incomplete', 0) for s in rollup.values())
1331                 unmatched.append({
1332                     'did':        did,
1333                     'stage':      stage,
1334                     'files':      files,
1335                     'bytes':      bytes_,
1336                     'rse_names':  sorted({r.get('rse', '') for r in d.get('rse_replicas', []) if r.get('rse')}),
1337                     'by_stage':   rollup,
1338                     'incomplete': any_incomplete,
1339                     'filters':    _extract_past_filters(did),
1340                 })
1341     campaign.data = {**(campaign.data or {}), 'rucio_unmatched': unmatched}
1342     campaign.save(update_fields=['data', 'updated_at'])
1343     summary['rucio_unmatched'] = len(unmatched)
1344     return summary
1345 
1346 
1347 def summarize_rucio_timeline(snapshot, *, bin_hours=12):
1348     """Build a per-bin cumulative arrival timeline from a Rucio snapshot.
1349 
1350     'Arrival' = the earliest created_at across all RSE replicas of a
1351     dataset. Datasets without any usable timestamp are dropped. Each
1352     arrival lands in the nearest `bin_hours`-wide bin (default 12h,
1353     aligned to UTC midnight). Returns a dict suitable for Plotly:
1354 
1355         {'dates': ['YYYY-MM-DDTHH:00:00', ...],
1356          'bin_hours': 12,
1357          'simu': {'cum_datasets':[...], 'cum_files':[...], 'cum_bytes':[...]},
1358          'reco': {'cum_datasets':[...], 'cum_files':[...], 'cum_bytes':[...]}}
1359     """
1360     from email.utils import parsedate_to_datetime as _pd
1361     import datetime as _dt
1362     bin_size = _dt.timedelta(hours=bin_hours)
1363 
1364     def _bucket(dt):
1365         """Floor `dt` to the nearest `bin_hours`-bin aligned to UTC midnight."""
1366         midnight = dt.replace(hour=0, minute=0, second=0, microsecond=0)
1367         offset = (dt - midnight).total_seconds()
1368         return midnight + _dt.timedelta(
1369             seconds=(int(offset) // int(bin_size.total_seconds()))
1370                     * int(bin_size.total_seconds()))
1371 
1372     arrivals = []  # (bucket_iso, stage, files, bytes)
1373     for cpath, info in (snapshot.get('campaigns') or {}).items():
1374         cp_parts = cpath.strip('/').split('/')
1375         stage = cp_parts[0] if cp_parts else ''
1376         for d in info.get('datasets') or []:
1377             rses = d.get('rse_replicas') or []
1378             tsps = []
1379             files = bytes_ = 0
1380             for r in rses:
1381                 ts_str = r.get('created_at')
1382                 if ts_str:
1383                     try:
1384                         tsps.append(_pd(ts_str))
1385                     except Exception:                         # noqa: BLE001
1386                         pass
1387                 files = max(files, r.get('length') or 0)
1388                 bytes_ = max(bytes_, r.get('bytes')  or 0)
1389             if not tsps:
1390                 continue
1391             arrivals.append((_bucket(min(tsps)).strftime('%Y-%m-%dT%H:%M:%S'),
1392                              stage, files, bytes_))
1393     arrivals.sort()
1394     if not arrivals:
1395         return {'dates': [], 'bin_hours': bin_hours, 'simu': {}, 'reco': {}}
1396 
1397     first = _dt.datetime.fromisoformat(arrivals[0][0])
1398     last_arr = _dt.datetime.fromisoformat(arrivals[-1][0])
1399     # Extend the axis to the current bin (UTC) so a quiet stretch
1400     # between the last arrival and now shows as a flat segment.
1401     # Don't go past 'now' — no empty future bins on the axis.
1402     last = max(last_arr, _bucket(_dt.datetime.utcnow()))
1403     span = last - first
1404     n_bins = int(span.total_seconds() // bin_size.total_seconds()) + 1
1405     dates = [(first + i * bin_size).strftime('%Y-%m-%dT%H:%M:%S')
1406              for i in range(n_bins)]
1407     idx = {d: i for i, d in enumerate(dates)}
1408 
1409     def _empty():
1410         return {'cum_datasets': [0]*n_bins, 'cum_files': [0]*n_bins, 'cum_bytes': [0]*n_bins}
1411     out = {'dates': dates, 'bin_hours': bin_hours, 'simu': _empty(), 'reco': _empty()}
1412     per_bin = {'FULL': _empty(), 'RECO': _empty()}
1413     for d, stage, files, bytes_ in arrivals:
1414         if stage not in per_bin or d not in idx:
1415             continue
1416         i = idx[d]
1417         per_bin[stage]['cum_datasets'][i] += 1
1418         per_bin[stage]['cum_files'][i]    += files
1419         per_bin[stage]['cum_bytes'][i]    += bytes_
1420     for stage, key in (('FULL', 'simu'), ('RECO', 'reco')):
1421         cd = cf = cb = 0
1422         for i in range(n_bins):
1423             cd += per_bin[stage]['cum_datasets'][i]
1424             cf += per_bin[stage]['cum_files'][i]
1425             cb += per_bin[stage]['cum_bytes'][i]
1426             out[key]['cum_datasets'][i] = cd
1427             out[key]['cum_files'][i]    = cf
1428             out[key]['cum_bytes'][i]    = cb
1429     return out
1430 
1431 
1432 def _detect_active_releases(token=None, *, year_prefix='26.'):
1433     """Per-release dataset counts under epic:/RECO/<v>/ in JLab Rucio.
1434 
1435     Returns list of {version, count} for every <year_prefix>x.y release
1436     that has at least one dataset, sorted newest-first by component
1437     version. 'main' excluded. Trial runs that land a handful of
1438     datasets before real production starts are deliberately NOT
1439     filtered out here — the operator judges from the counts, and
1440     nothing is auto-promoted. Humans switch (see
1441     feedback-humans-switch-lifecycle).
1442     """
1443     if token is None:
1444         token = _jlab_rucio_auth()
1445     names = _ndjson(_jlab_rucio_get(
1446         '/dids/epic/dids/search', token, type='dataset', name='/RECO/*'))
1447     from collections import Counter as _Counter
1448     counts = _Counter()
1449     for n in names:
1450         if not isinstance(n, str):
1451             continue
1452         parts = n.lstrip('/').split('/')
1453         if len(parts) < 2:
1454             continue
1455         v = parts[1]
1456         if v == 'main' or not v.startswith(year_prefix):
1457             continue
1458         counts[v] += 1
1459     def _key(v):
1460         out = []
1461         for part in v.split('.'):
1462             try:
1463                 out.append((0, int(part)))
1464             except ValueError:
1465                 out.append((1, part))
1466         return tuple(out)
1467     return [{'version': v, 'count': counts[v]}
1468             for v in sorted(counts, key=_key, reverse=True)]
1469 
1470 
1471 def load_rucio_snapshot(campaign_name, *, snapshot_dir=RUCIO_SNAPSHOT_DIR):
1472     """Read a saved JLab Rucio snapshot. Returns None if absent."""
1473     import json as _json
1474     import os as _os
1475     path = _os.path.join(snapshot_dir, f'current-{campaign_name}.json')
1476     if not _os.path.exists(path):
1477         return None
1478     try:
1479         with open(path) as f:
1480             return _json.load(f)
1481     except (OSError, _json.JSONDecodeError) as e:
1482         _log.warning('load_rucio_snapshot %s: %s', path, e)
1483         return None
1484 
1485 
1486 def import_jlab_rucio_current_snapshot(*, campaign_name=None,
1487                                       snapshot_dir=RUCIO_SNAPSHOT_DIR,
1488                                       created_by='rucio_snapshot'):
1489     """Pull the JLab Rucio snapshot for the PCS current campaign and save
1490     it under the snapshot directory.
1491 
1492     campaign_name: '26.02.0' (or override). If None, uses the
1493         lifecycle='current' Campaign. Both /RECO/<v> and /FULL/<v> are
1494         fetched.
1495     snapshot_dir: writable directory. One JSON file per current campaign:
1496         '<snapshot_dir>/current-<campaign>.json'.
1497     NO-SILENT-FAILURES: every network / parse error is collected into
1498     summary['errors'] and surfaced to the operator.
1499     """
1500     import json as _json
1501     import os as _os
1502     import time as _time
1503 
1504     summary = {'campaign': '', 'paths': {}, 'errors': [], 'snapshot_path': ''}
1505 
1506     # If no name given, default to lifecycle='current'. Callers may pass
1507     # a 'last' campaign name explicitly.
1508     if campaign_name is None:
1509         current = (Campaign.objects.filter(lifecycle='current')
1510                    .order_by('-updated_at').first())
1511         if not current:
1512             raise ServiceError('No current Campaign defined in PCS')
1513         campaign_name = current.name
1514     summary['campaign'] = campaign_name
1515 
1516     _os.makedirs(snapshot_dir, exist_ok=True)
1517     out_path = _os.path.join(snapshot_dir, f'current-{campaign_name}.json')
1518     summary['snapshot_path'] = out_path
1519 
1520     try:
1521         token = _jlab_rucio_auth()
1522     except Exception as e:                                    # noqa: BLE001
1523         raise ServiceError(f'JLab Rucio auth failed: {e}')
1524 
1525     snapshot = {
1526         'fetched_at':    _time.strftime('%Y-%m-%dT%H:%M:%S%z'),
1527         'scope':         'epic',
1528         'campaign_name': campaign_name,
1529         'campaigns':     {},
1530     }
1531     for stage in ('RECO', 'FULL'):
1532         cpath = f'/{stage}/{campaign_name}'
1533         try:
1534             snapshot['campaigns'][cpath] = fetch_jlab_rucio_campaign(
1535                 cpath, token=token)
1536             summary['paths'][cpath] = snapshot['campaigns'][cpath]['count']
1537         except Exception as e:                                # noqa: BLE001
1538             summary['errors'].append(f'{cpath}: {e}')
1539             snapshot['campaigns'][cpath] = {'count': 0, 'datasets': [],
1540                                             'error': str(e)}
1541             summary['paths'][cpath] = 0
1542 
1543     with open(out_path, 'w') as f:
1544         _json.dump(snapshot, f, indent=2)
1545     summary['file_bytes'] = _os.path.getsize(out_path)
1546 
1547     # Detect active 26.x releases in JLab Rucio + stash on the PCS
1548     # current Campaign so the catalog can surface a 'Switch current'
1549     # banner. AI proposes; human switches (feedback-humans-switch-
1550     # lifecycle).
1551     try:
1552         token = _jlab_rucio_auth()
1553         detected = _detect_active_releases(token=token)
1554     except Exception as e:                                    # noqa: BLE001
1555         summary['errors'].append(f'detect_active_releases: {e}')
1556         detected = []
1557     summary['detected_releases'] = detected
1558 
1559     # Cache the per-request Output rollup on each ProdTask in this
1560     # campaign so the Current tab can render it without re-reading
1561     # the snapshot file on every page load.
1562     try:
1563         camp = Campaign.objects.get(name=campaign_name)
1564         # Stash the detected-releases list on the Campaign for the
1565         # banner; rename / promotion is a separate operator action.
1566         camp.data = {**(camp.data or {}), 'detected_releases': detected}
1567         camp.save(update_fields=['data', 'updated_at'])
1568         # Match the request set to the Rucio snapshot only for the
1569         # 'current' campaign — the 'last' campaign has no linked
1570         # request rows (CSV requests are bound to current), so there's
1571         # nothing to roll up.
1572         if camp.lifecycle == 'current':
1573             match_summary = match_requests_to_rucio_snapshot(snapshot, campaign=camp)
1574             summary['match'] = match_summary
1575     except Campaign.DoesNotExist:
1576         summary['errors'].append(
1577             f"campaign '{campaign_name}' not in PCS - skipping match step")
1578     except Exception as e:                                    # noqa: BLE001
1579         summary['errors'].append(f'request match step failed: {e}')
1580 
1581     return summary
1582 
1583 
1584 def set_pcs_campaign_lifecycle(new_name, target_lifecycle, *, created_by='operator'):
1585     """Set the PCS Campaign with lifecycle=`target_lifecycle` to `new_name`.
1586 
1587     target_lifecycle is 'current' or 'last' (singular slots — at most one
1588     Campaign at a time). If the slot is occupied, the existing occupant
1589     is renamed in place; ProdTask FKs are preserved (same row mutated).
1590     If empty, a new Campaign(lifecycle=target_lifecycle) is created.
1591     Operator-initiated only — never call from a sync / refresh handler
1592     (see feedback-humans-switch-lifecycle).
1593     """
1594     if not new_name:
1595         raise ServiceError('set_pcs_campaign_lifecycle: empty target name')
1596     if target_lifecycle not in ('current', 'last'):
1597         raise ServiceError(f'unsupported lifecycle {target_lifecycle!r}')
1598     existing = (Campaign.objects.filter(lifecycle=target_lifecycle)
1599                 .order_by('-updated_at').first())
1600     if existing is not None:
1601         if existing.name == new_name:
1602             return {'changed': False, 'name': new_name, 'lifecycle': target_lifecycle}
1603         if Campaign.objects.filter(name=new_name).exclude(pk=existing.pk).exists():
1604             raise ServiceError(
1605                 f'Campaign named {new_name!r} already exists; '
1606                 f'cannot rename {existing.name!r} into it')
1607         old_name = existing.name
1608         existing.name = new_name
1609         existing.save(update_fields=['name', 'updated_at'])
1610         _log.info('PCS %s campaign renamed: %s -> %s (by %s)',
1611                   target_lifecycle, old_name, new_name, created_by)
1612         return {'changed': True, 'old_name': old_name, 'name': new_name,
1613                 'lifecycle': target_lifecycle, 'created': False}
1614     # No existing slot — create one. Refuse if any Campaign already uses
1615     # this name (avoid hijacking past 'FULL/26.04.1' / 'RECO/26.04.1').
1616     if Campaign.objects.filter(name=new_name).exists():
1617         raise ServiceError(
1618             f'Campaign named {new_name!r} already exists; '
1619             f'cannot create a new {target_lifecycle} with that name')
1620     Campaign.objects.create(name=new_name, lifecycle=target_lifecycle,
1621                             created_by=created_by)
1622     _log.info('PCS %s campaign created: %s (by %s)',
1623               target_lifecycle, new_name, created_by)
1624     return {'changed': True, 'old_name': None, 'name': new_name,
1625             'lifecycle': target_lifecycle, 'created': True}
1626 
1627 
1628 # Backwards-compat wrapper.
1629 def rename_pcs_current_campaign(new_name, *, created_by='operator'):
1630     return set_pcs_campaign_lifecycle(new_name, 'current', created_by=created_by)
1631 
1632 
1633 def prodtask_record_submission(*, task, jedi_task_id, new_status='submitted'):
1634     """
1635     Record outcome of a JEDI submission. Two gates:
1636 
1637     - ``task.status`` must be 'ready' (no submit from draft, no re-submit).
1638     - ``task.panda_task_id`` must be unset; refuses to overwrite (409).
1639     """
1640     if task.status != 'ready':
1641         raise ServiceError(
1642             f'Task must be in status=ready before submission '
1643             f'(current: {task.status!r}). Mark it ready via '
1644             f'set-status first.'
1645         )
1646     if task.panda_task_id is not None:
1647         raise ServiceError(
1648             f'Task already records panda_task_id={task.panda_task_id}. '
1649             f'Refusing to overwrite.',
1650             status=409,
1651         )
1652     try:
1653         task.panda_task_id = int(jedi_task_id)
1654     except (TypeError, ValueError):
1655         raise ServiceError('jedi_task_id must be an integer')
1656 
1657     valid = _known_prodtask_statuses()
1658     if new_status not in valid:
1659         raise ServiceError(
1660             f'Invalid status. Choose from: {", ".join(sorted(valid))}'
1661         )
1662     task.status = new_status
1663     task.save(update_fields=['panda_task_id', 'status', 'updated_at'])
1664     return task