File indexing completed on 2026-06-26 08:40:22
0001
0002 """migrate_outputs_schema.py — one-time migration of produced-output data onto
0003 the unified ``ProdTask.overrides['outputs']`` schema.
0004
0005 Reshapes each ``past_output`` block into one ``outputs`` entry (one per
0006 produced Rucio dataset) and drops the superseded ``csv_import.output``
0007 aggregate. Idempotent (skips tasks that already carry ``outputs``). Dry run by
0008 default. See ``swf-monitor/docs/EPICPROD_DATA_LINEAGE.md``.
0009
0010 Usage::
0011
0012 cd /data/wenauseic/github/swf-monitor/src
0013 source ../../swf-testbed/.venv/bin/activate && source ~/.env
0014 python ../scripts/migrate_outputs_schema.py # dry run
0015 python ../scripts/migrate_outputs_schema.py --apply # write
0016 """
0017 import os
0018 import sys
0019
0020 THIS_DIR = os.path.dirname(os.path.abspath(__file__))
0021 sys.path.insert(0, os.path.join(THIS_DIR, '..', 'src'))
0022 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'swf_monitor_project.settings')
0023
0024 import django
0025 django.setup()
0026
0027 from pcs.services import migrate_outputs_schema
0028
0029
0030 def main(argv):
0031 apply = '--apply' in argv[1:]
0032 print('APPLY — writing changes' if apply
0033 else 'DRY RUN — no writes (pass --apply to write)')
0034 summary = migrate_outputs_schema(apply=apply)
0035 print(f' tasks seen: {summary["seen"]}')
0036 print(f' past_output migrated: {summary["past_migrated"]}')
0037 print(f' csv aggregate dropped: {summary["aggregate_dropped"]}')
0038 if summary['errors']:
0039 print(f' errors: {len(summary["errors"])}', file=sys.stderr)
0040 for err in summary['errors'][:10]:
0041 print(f' - {err}', file=sys.stderr)
0042 if len(summary['errors']) > 10:
0043 print(f' ... and {len(summary["errors"]) - 10} more', file=sys.stderr)
0044 return 1
0045 return 0
0046
0047
0048 if __name__ == '__main__':
0049 sys.exit(main(sys.argv))