File indexing completed on 2026-06-04 08:56:26
0001
0002 """
0003 link_csv_file_to_input_dataset.py — one-shot backfill.
0004
0005 Walk ProdTask rows with a non-empty ``csv_file`` and ensure each is linked
0006 to a ``Dataset(stage=evgen, source.kind=csv_manifest, source.location=csv_file)``
0007 via ``overrides['input_dataset_did']``.
0008
0009 Idempotent: skips tasks already linked. Reuses existing input Datasets
0010 when one already records the same source.location.
0011
0012 No DB columns added. ``csv_file`` is preserved as a fallback per
0013 back-compat.
0014
0015 A backfilled input Dataset shares the four tags + detector with the
0016 ProdTask's existing output Dataset, but uses a distinct ``scope``
0017 (default ``<output_scope>.evgen``) so the deterministic DID does not
0018 collide with the output. The convention is local to this backfill —
0019 operators can override with ``--input-scope-suffix``.
0020
0021 Usage::
0022
0023 cd /data/wenauseic/github/swf-monitor/src
0024 source ../../swf-testbed/.venv/bin/activate && source ~/.env
0025 python ../scripts/link_csv_file_to_input_dataset.py # dry-run
0026 python ../scripts/link_csv_file_to_input_dataset.py --apply # persist
0027
0028 Note on Django ORM access: this is a one-shot intra-app backfill, not an
0029 operational tool, so it boots Django directly rather than going through
0030 REST. After running once on a deployment it can be archived.
0031 """
0032 import argparse
0033 import os
0034 import sys
0035 from pathlib import Path
0036
0037
0038 def main():
0039 ap = argparse.ArgumentParser(description=__doc__.strip().splitlines()[0])
0040 ap.add_argument('--apply', action='store_true',
0041 help='Persist changes (default: dry-run)')
0042 ap.add_argument('--input-scope-suffix', default='evgen',
0043 help='Suffix appended to the output Dataset scope to '
0044 'form the input Dataset scope (default: evgen)')
0045 args = ap.parse_args()
0046
0047 src = Path(__file__).resolve().parent.parent / 'src'
0048 sys.path.insert(0, str(src))
0049 os.environ.setdefault('DJANGO_SETTINGS_MODULE',
0050 'swf_monitor_project.settings')
0051 import django
0052 django.setup()
0053
0054 from django.db import transaction
0055 from pcs.models import ProdTask, Dataset
0056
0057 qs = ProdTask.objects.exclude(csv_file='').order_by('name')
0058 total = qs.count()
0059 print(f'ProdTasks with non-empty csv_file: {total}')
0060 if total == 0:
0061 print('Nothing to backfill.')
0062 return 0
0063
0064 by_csv = {}
0065 for t in qs:
0066 by_csv.setdefault(t.csv_file, []).append(t)
0067 print(f'Unique csv_file values: {len(by_csv)}')
0068 print()
0069
0070 plan_create = []
0071 plan_link = []
0072 plan_skip = []
0073
0074 for csv, tasks in by_csv.items():
0075 sample = tasks[0]
0076 out_ds = sample.dataset
0077 existing = Dataset.objects.filter(
0078 metadata__source__location=csv,
0079 metadata__source__kind='csv_manifest',
0080 ).first()
0081 if existing:
0082 print(f' csv={csv!r} → reuse existing Dataset {existing.did}')
0083 else:
0084 input_scope = f'{out_ds.scope}.{args.input_scope_suffix}'
0085 print(f' csv={csv!r} → CREATE Dataset, scope={input_scope}')
0086 plan_create.append((csv, sample, input_scope))
0087
0088 for t in tasks:
0089 ov = t.overrides or {}
0090 if ov.get('input_dataset_did') or ov.get('input_dataset_dids'):
0091 plan_skip.append((t, 'already linked'))
0092 else:
0093 plan_link.append((t, csv))
0094
0095 print()
0096 print(f'Plan: create={len(plan_create)} '
0097 f'link={len(plan_link)} '
0098 f'skip={len(plan_skip)}')
0099
0100 if not args.apply:
0101 print('\nDry-run only. Re-run with --apply to persist.')
0102 return 0
0103
0104 print('\nApplying...')
0105 with transaction.atomic():
0106 for csv, sample, input_scope in plan_create:
0107 out_ds = sample.dataset
0108 input_ds = Dataset(
0109 scope=input_scope,
0110 detector_version=out_ds.detector_version,
0111 detector_config=out_ds.detector_config,
0112 physics_tag=out_ds.physics_tag,
0113 evgen_tag=out_ds.evgen_tag,
0114 simu_tag=out_ds.simu_tag,
0115 reco_tag=out_ds.reco_tag,
0116 description='External EVGEN input (backfilled from csv_file)',
0117 metadata={
0118 'stage': 'evgen',
0119 'source': {'kind': 'csv_manifest', 'location': csv},
0120 },
0121 created_by='backfill',
0122 )
0123 input_ds.save()
0124 print(f' CREATED Dataset {input_ds.did}')
0125
0126 for t, csv in plan_link:
0127 input_ds = Dataset.objects.filter(
0128 metadata__source__location=csv,
0129 metadata__source__kind='csv_manifest',
0130 ).first()
0131 if input_ds is None:
0132 raise RuntimeError(
0133 f'Input Dataset for csv={csv!r} not found after create '
0134 f'phase — aborting.'
0135 )
0136 ov = dict(t.overrides or {})
0137 ov['input_dataset_did'] = input_ds.did
0138 t.overrides = ov
0139 t.save(update_fields=['overrides', 'updated_at'])
0140 print(f' LINKED task={t.name} → {input_ds.did}')
0141
0142 print()
0143 print(f'Done. created={len(plan_create)} '
0144 f'linked={len(plan_link)} '
0145 f'skipped={len(plan_skip)}')
0146 return 0
0147
0148
0149 if __name__ == '__main__':
0150 sys.exit(main())