Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-04 08:56:26

0001 #!/usr/bin/env python3
0002 """
0003 link_csv_file_to_input_dataset.py — one-shot backfill.
0004 
0005 Walk ProdTask rows with a non-empty ``csv_file`` and ensure each is linked
0006 to a ``Dataset(stage=evgen, source.kind=csv_manifest, source.location=csv_file)``
0007 via ``overrides['input_dataset_did']``.
0008 
0009 Idempotent: skips tasks already linked. Reuses existing input Datasets
0010 when one already records the same source.location.
0011 
0012 No DB columns added. ``csv_file`` is preserved as a fallback per
0013 back-compat.
0014 
0015 A backfilled input Dataset shares the four tags + detector with the
0016 ProdTask's existing output Dataset, but uses a distinct ``scope``
0017 (default ``<output_scope>.evgen``) so the deterministic DID does not
0018 collide with the output. The convention is local to this backfill —
0019 operators can override with ``--input-scope-suffix``.
0020 
0021 Usage::
0022 
0023     cd /data/wenauseic/github/swf-monitor/src
0024     source ../../swf-testbed/.venv/bin/activate && source ~/.env
0025     python ../scripts/link_csv_file_to_input_dataset.py            # dry-run
0026     python ../scripts/link_csv_file_to_input_dataset.py --apply    # persist
0027 
0028 Note on Django ORM access: this is a one-shot intra-app backfill, not an
0029 operational tool, so it boots Django directly rather than going through
0030 REST. After running once on a deployment it can be archived.
0031 """
0032 import argparse
0033 import os
0034 import sys
0035 from pathlib import Path
0036 
0037 
0038 def main():
0039     ap = argparse.ArgumentParser(description=__doc__.strip().splitlines()[0])
0040     ap.add_argument('--apply', action='store_true',
0041                     help='Persist changes (default: dry-run)')
0042     ap.add_argument('--input-scope-suffix', default='evgen',
0043                     help='Suffix appended to the output Dataset scope to '
0044                          'form the input Dataset scope (default: evgen)')
0045     args = ap.parse_args()
0046 
0047     src = Path(__file__).resolve().parent.parent / 'src'
0048     sys.path.insert(0, str(src))
0049     os.environ.setdefault('DJANGO_SETTINGS_MODULE',
0050                           'swf_monitor_project.settings')
0051     import django
0052     django.setup()
0053 
0054     from django.db import transaction
0055     from pcs.models import ProdTask, Dataset
0056 
0057     qs = ProdTask.objects.exclude(csv_file='').order_by('name')
0058     total = qs.count()
0059     print(f'ProdTasks with non-empty csv_file: {total}')
0060     if total == 0:
0061         print('Nothing to backfill.')
0062         return 0
0063 
0064     by_csv = {}
0065     for t in qs:
0066         by_csv.setdefault(t.csv_file, []).append(t)
0067     print(f'Unique csv_file values: {len(by_csv)}')
0068     print()
0069 
0070     plan_create = []   # (csv, sample_task, planned_scope)
0071     plan_link = []     # (task, csv)
0072     plan_skip = []     # (task, reason)
0073 
0074     for csv, tasks in by_csv.items():
0075         sample = tasks[0]
0076         out_ds = sample.dataset
0077         existing = Dataset.objects.filter(
0078             metadata__source__location=csv,
0079             metadata__source__kind='csv_manifest',
0080         ).first()
0081         if existing:
0082             print(f'  csv={csv!r} → reuse existing Dataset {existing.did}')
0083         else:
0084             input_scope = f'{out_ds.scope}.{args.input_scope_suffix}'
0085             print(f'  csv={csv!r} → CREATE Dataset, scope={input_scope}')
0086             plan_create.append((csv, sample, input_scope))
0087 
0088         for t in tasks:
0089             ov = t.overrides or {}
0090             if ov.get('input_dataset_did') or ov.get('input_dataset_dids'):
0091                 plan_skip.append((t, 'already linked'))
0092             else:
0093                 plan_link.append((t, csv))
0094 
0095     print()
0096     print(f'Plan: create={len(plan_create)}  '
0097           f'link={len(plan_link)}  '
0098           f'skip={len(plan_skip)}')
0099 
0100     if not args.apply:
0101         print('\nDry-run only. Re-run with --apply to persist.')
0102         return 0
0103 
0104     print('\nApplying...')
0105     with transaction.atomic():
0106         for csv, sample, input_scope in plan_create:
0107             out_ds = sample.dataset
0108             input_ds = Dataset(
0109                 scope=input_scope,
0110                 detector_version=out_ds.detector_version,
0111                 detector_config=out_ds.detector_config,
0112                 physics_tag=out_ds.physics_tag,
0113                 evgen_tag=out_ds.evgen_tag,
0114                 simu_tag=out_ds.simu_tag,
0115                 reco_tag=out_ds.reco_tag,
0116                 description='External EVGEN input (backfilled from csv_file)',
0117                 metadata={
0118                     'stage': 'evgen',
0119                     'source': {'kind': 'csv_manifest', 'location': csv},
0120                 },
0121                 created_by='backfill',
0122             )
0123             input_ds.save()
0124             print(f'  CREATED Dataset {input_ds.did}')
0125 
0126         for t, csv in plan_link:
0127             input_ds = Dataset.objects.filter(
0128                 metadata__source__location=csv,
0129                 metadata__source__kind='csv_manifest',
0130             ).first()
0131             if input_ds is None:
0132                 raise RuntimeError(
0133                     f'Input Dataset for csv={csv!r} not found after create '
0134                     f'phase — aborting.'
0135                 )
0136             ov = dict(t.overrides or {})
0137             ov['input_dataset_did'] = input_ds.did
0138             t.overrides = ov
0139             t.save(update_fields=['overrides', 'updated_at'])
0140             print(f'  LINKED task={t.name} → {input_ds.did}')
0141 
0142     print()
0143     print(f'Done. created={len(plan_create)}  '
0144           f'linked={len(plan_link)}  '
0145           f'skipped={len(plan_skip)}')
0146     return 0
0147 
0148 
0149 if __name__ == '__main__':
0150     sys.exit(main())