Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:20

0001 #!/usr/bin/env python3
0002 """
0003 check_eventcombiner.py
0004 
0005 For each run that already has output in the FileCatalog, compare
0006 max(lastevent) from those output files against eventsinrun from the
0007 DAQ run-quality DB. Runs whose ratio falls below --ratio-cut are
0008 flagged and written to an output list.
0009 """
0010 
0011 import sys
0012 from pathlib import Path
0013 
0014 from argparsing import submission_args
0015 from simpleLogger import CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0016 from sphenixprodrules import RuleConfig
0017 from sphenixmatching import MatchConfig
0018 from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0019 
0020 # ============================================================================================
0021 def main():
0022     args = submission_args()
0023 
0024     from simpleLogger import slogger
0025     import logging
0026     slogger.setLevel(logging.getLevelName(args.loglevel))
0027 
0028     param_overrides = {}
0029     param_overrides["runs"]     = args.runs
0030     param_overrides["runlist"]  = args.runlist
0031     param_overrides["nevents"]  = args.nevents
0032     param_overrides["prodmode"] = "production"
0033     if args.mangle_dirpath:
0034         param_overrides["prodmode"] = args.mangle_dirpath
0035     if args.physicsmode:
0036         param_overrides["physicsmode"] = args.physicsmode
0037 
0038     rule = RuleConfig.from_yaml_file(
0039         yaml_file       = args.config,
0040         rule_name       = args.rulename,
0041         param_overrides = param_overrides,
0042     )
0043 
0044     match = MatchConfig.from_rule_config(rule)
0045 
0046     if 'raw' not in match.input_config.db:
0047         ERROR(f"Rule '{args.rulename}' is not a combining rule (db={match.input_config.db}). Exiting.")
0048         sys.exit(2)
0049 
0050     # Get run list + ideal (run, daqhost) combos from RAW DB
0051     daqhosts_dict, eventsinrun_by_run = match.daqhosts_for_combining()  # also prints run count
0052     if not eventsinrun_by_run:
0053         sys.exit(0)
0054 
0055     seb_types = [h for h in match.in_types if h != 'gl1daq']
0056 
0057     n_ideal = sum(sum(1 for h in hosts if h != 'gl1daq') for hosts in daqhosts_dict.values())
0058     INFO(f"{n_ideal} (run, daqhost) combinations have all segments on lustre.")
0059 
0060     run_condition = list_to_condition(list(eventsinrun_by_run))
0061 
0062     total_query = f"""
0063         SELECT DISTINCT runnumber, daqhost FROM datasets
0064         WHERE {run_condition}
0065           AND daqhost IN {tuple(seb_types)}
0066         ORDER BY runnumber, daqhost
0067     """
0068     all_combos = dbQuery(cnxn_string_map['rawr'], total_query).fetchall()
0069     INFO(f"{len(all_combos)} (run, daqhost) combinations found in the raw DB.")
0070     not_on_lustre = [(int(r), h) for r, h in all_combos if h not in daqhosts_dict.get(int(r), set())]
0071     INFO(f"{len(not_on_lustre)} (run, daqhost) combinations are in the DB but not fully on lustre.")
0072     for run, daqhost in not_on_lustre[:5]:
0073         DEBUG(f"  Not fully on lustre: Run {run} {daqhost}")
0074 
0075     runs_without_gl1daq = [run for run, hosts in daqhosts_dict.items() if 'gl1daq' not in hosts]
0076     for run in sorted(runs_without_gl1daq):
0077         WARN(f"Run {run}: gl1daq not complete on lustre — run will not be submitted.")
0078 
0079     lustre_combos = [
0080         (run, daqhost)
0081         for run, hosts in daqhosts_dict.items()
0082         if 'gl1daq' in hosts
0083         for daqhost in hosts
0084         if daqhost != 'gl1daq'
0085     ]
0086 
0087     # --- FileCatalog ratio check ---
0088     lastevent_query = f"""
0089         SELECT runnumber, dsttype, max(lastevent)
0090         FROM datasets
0091         WHERE dataset='{match.dataset}'
0092           AND tag='{match.outtriplet}'
0093           AND dsttype like '{match.dst_type_template}'
0094           AND {run_condition}
0095         GROUP BY runnumber, dsttype
0096         ORDER BY runnumber, dsttype
0097     """
0098     rows = dbQuery(cnxn_string_map['fcr'], lastevent_query).fetchall()
0099     INFO(f"{len(rows)} (run, dsttype) combinations have existing output in the FileCatalog.")
0100 
0101     fc_dsttypes_by_run = {}
0102     for r, d, _ in rows:
0103         fc_dsttypes_by_run.setdefault(int(r), []).append(d)
0104 
0105     lustre_no_fc = [
0106         (run, daqhost)
0107         for run, daqhost in lustre_combos
0108         if not any(daqhost in dsttype for dsttype in fc_dsttypes_by_run.get(run, []))
0109     ]
0110     INFO(f"{len(lustre_no_fc)} lustre combos have no FileCatalog entry.")
0111     if lustre_no_fc:
0112         for run, daqhost in sorted(lustre_no_fc)[:5]:
0113             DEBUG(f"  Run {run} {daqhost}")
0114 
0115     all_no_fc = [
0116         (int(r), h)
0117         for r, h in all_combos
0118         if not any(h in dsttype for dsttype in fc_dsttypes_by_run.get(int(r), []))
0119     ]
0120     if all_no_fc:
0121         WARN(f"{len(all_no_fc)} raw DB combos (lustre or not) have no FileCatalog entry. Check for corruption?")
0122         for run, daqhost in sorted(all_no_fc)[:5]:
0123             DEBUG(f"  Run {run} {daqhost}")
0124     
0125     INFO(f"Checking for combinations flagged below ratio cut {args.ratio_cut}...")
0126     flagged = []
0127     for runnumber, dsttype, lastevent in rows:
0128         runnumber = int(runnumber)
0129         eventsinrun = eventsinrun_by_run.get(runnumber)
0130 
0131         if not eventsinrun:
0132             WARN(f"Run {runnumber} {dsttype}: eventsinrun=0, cannot compute ratio.")
0133             continue
0134 
0135         ratio = lastevent / eventsinrun
0136         msg = f"Run {runnumber} {dsttype}: lastevent={lastevent}, eventsinrun={eventsinrun}, ratio={ratio:.3f}"
0137         if ratio < args.ratio_cut:
0138             WARN(msg)
0139             flagged.append((runnumber, dsttype))
0140         else:
0141             CHATTY(msg)
0142 
0143     flagged = sorted(set(flagged))
0144     INFO(f"{len(flagged)} (run, dsttype) combinations flagged below ratio cut {args.ratio_cut}.")
0145 
0146     if flagged:
0147         report_and_cleanup(flagged, args, match)
0148 
0149     above_threshold = len(rows) - len(flagged)
0150     n_possible = len(all_combos)
0151     pct = 100.0 * above_threshold / n_possible if n_possible else 0.0
0152     INFO(f"Summary: {above_threshold}/{n_possible} possible combos have FileCatalog entries above threshold ({pct:.1f}%).")
0153 
0154 # ============================================================================================
0155 def report_and_cleanup(flagged, args, match):
0156     """
0157     Handle incomplete (runnumber, dsttype) combinations.
0158     Requires both --delete and --andgo to actually execute deletions.
0159     """
0160     if args.output:
0161         Path(args.output).parent.mkdir(parents=True, exist_ok=True)
0162         Path(args.output).write_text('\n'.join(f"{r} {d}" for r, d in flagged) + '\n')
0163         INFO(f"Flagged combinations written to {args.output}")
0164 
0165     if not getattr(args, 'delete', False):
0166         return
0167 
0168     dryrun = args.dryrun or not args.andgo
0169     if dryrun:
0170         WARN("--delete given without --andgo or --dryrun set: dry run only, no deletions performed.")
0171 
0172     tag = match.outtriplet
0173 
0174     for runnumber, dsttype in flagged:
0175         INFO(f"Deleting run {runnumber} {dsttype} from FileCatalog and production DB.")
0176 
0177         # 1. files (must come before datasets — uses JOIN to identify rows)
0178         delete_files = f"""
0179             DELETE FROM files USING datasets
0180             WHERE lfn=filename
0181               AND tag='{tag}'
0182               AND dsttype='{dsttype}'
0183               AND runnumber={runnumber}
0184         """
0185         # 2. datasets
0186         delete_datasets = f"""
0187             DELETE FROM datasets
0188             WHERE tag='{tag}'
0189               AND dsttype='{dsttype}'
0190               AND runnumber={runnumber}
0191         """
0192         # 3. production_jobs
0193         delete_prodjobs = f"""
0194             DELETE FROM production_jobs
0195             WHERE tag='{tag}'
0196               AND dsttype='{dsttype}'
0197               AND runnumber={runnumber}
0198         """
0199 
0200         CHATTY(delete_files)
0201         curs = dbQuery(cnxn_string_map['fcw'], delete_files, dryrun=dryrun)
0202         if curs: curs.commit()
0203 
0204         CHATTY(delete_datasets)
0205         curs = dbQuery(cnxn_string_map['fcw'], delete_datasets, dryrun=dryrun)
0206         if curs: curs.commit()
0207 
0208         CHATTY(delete_prodjobs)
0209         curs = dbQuery(cnxn_string_map['statw'], delete_prodjobs, dryrun=dryrun)
0210         if curs: curs.commit()
0211 
0212 # ============================================================================================
0213 if __name__ == '__main__':
0214     main()