File indexing completed on 2026-06-26 08:40:20
0001
0002 """
0003 check_eventcombiner.py
0004
0005 For each run that already has output in the FileCatalog, compare
0006 max(lastevent) from those output files against eventsinrun from the
0007 DAQ run-quality DB. Runs whose ratio falls below --ratio-cut are
0008 flagged and written to an output list.
0009 """
0010
0011 import sys
0012 from pathlib import Path
0013
0014 from argparsing import submission_args
0015 from simpleLogger import CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0016 from sphenixprodrules import RuleConfig
0017 from sphenixmatching import MatchConfig
0018 from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0019
0020
0021 def main():
0022 args = submission_args()
0023
0024 from simpleLogger import slogger
0025 import logging
0026 slogger.setLevel(logging.getLevelName(args.loglevel))
0027
0028 param_overrides = {}
0029 param_overrides["runs"] = args.runs
0030 param_overrides["runlist"] = args.runlist
0031 param_overrides["nevents"] = args.nevents
0032 param_overrides["prodmode"] = "production"
0033 if args.mangle_dirpath:
0034 param_overrides["prodmode"] = args.mangle_dirpath
0035 if args.physicsmode:
0036 param_overrides["physicsmode"] = args.physicsmode
0037
0038 rule = RuleConfig.from_yaml_file(
0039 yaml_file = args.config,
0040 rule_name = args.rulename,
0041 param_overrides = param_overrides,
0042 )
0043
0044 match = MatchConfig.from_rule_config(rule)
0045
0046 if 'raw' not in match.input_config.db:
0047 ERROR(f"Rule '{args.rulename}' is not a combining rule (db={match.input_config.db}). Exiting.")
0048 sys.exit(2)
0049
0050
0051 daqhosts_dict, eventsinrun_by_run = match.daqhosts_for_combining()
0052 if not eventsinrun_by_run:
0053 sys.exit(0)
0054
0055 seb_types = [h for h in match.in_types if h != 'gl1daq']
0056
0057 n_ideal = sum(sum(1 for h in hosts if h != 'gl1daq') for hosts in daqhosts_dict.values())
0058 INFO(f"{n_ideal} (run, daqhost) combinations have all segments on lustre.")
0059
0060 run_condition = list_to_condition(list(eventsinrun_by_run))
0061
0062 total_query = f"""
0063 SELECT DISTINCT runnumber, daqhost FROM datasets
0064 WHERE {run_condition}
0065 AND daqhost IN {tuple(seb_types)}
0066 ORDER BY runnumber, daqhost
0067 """
0068 all_combos = dbQuery(cnxn_string_map['rawr'], total_query).fetchall()
0069 INFO(f"{len(all_combos)} (run, daqhost) combinations found in the raw DB.")
0070 not_on_lustre = [(int(r), h) for r, h in all_combos if h not in daqhosts_dict.get(int(r), set())]
0071 INFO(f"{len(not_on_lustre)} (run, daqhost) combinations are in the DB but not fully on lustre.")
0072 for run, daqhost in not_on_lustre[:5]:
0073 DEBUG(f" Not fully on lustre: Run {run} {daqhost}")
0074
0075 runs_without_gl1daq = [run for run, hosts in daqhosts_dict.items() if 'gl1daq' not in hosts]
0076 for run in sorted(runs_without_gl1daq):
0077 WARN(f"Run {run}: gl1daq not complete on lustre — run will not be submitted.")
0078
0079 lustre_combos = [
0080 (run, daqhost)
0081 for run, hosts in daqhosts_dict.items()
0082 if 'gl1daq' in hosts
0083 for daqhost in hosts
0084 if daqhost != 'gl1daq'
0085 ]
0086
0087
0088 lastevent_query = f"""
0089 SELECT runnumber, dsttype, max(lastevent)
0090 FROM datasets
0091 WHERE dataset='{match.dataset}'
0092 AND tag='{match.outtriplet}'
0093 AND dsttype like '{match.dst_type_template}'
0094 AND {run_condition}
0095 GROUP BY runnumber, dsttype
0096 ORDER BY runnumber, dsttype
0097 """
0098 rows = dbQuery(cnxn_string_map['fcr'], lastevent_query).fetchall()
0099 INFO(f"{len(rows)} (run, dsttype) combinations have existing output in the FileCatalog.")
0100
0101 fc_dsttypes_by_run = {}
0102 for r, d, _ in rows:
0103 fc_dsttypes_by_run.setdefault(int(r), []).append(d)
0104
0105 lustre_no_fc = [
0106 (run, daqhost)
0107 for run, daqhost in lustre_combos
0108 if not any(daqhost in dsttype for dsttype in fc_dsttypes_by_run.get(run, []))
0109 ]
0110 INFO(f"{len(lustre_no_fc)} lustre combos have no FileCatalog entry.")
0111 if lustre_no_fc:
0112 for run, daqhost in sorted(lustre_no_fc)[:5]:
0113 DEBUG(f" Run {run} {daqhost}")
0114
0115 all_no_fc = [
0116 (int(r), h)
0117 for r, h in all_combos
0118 if not any(h in dsttype for dsttype in fc_dsttypes_by_run.get(int(r), []))
0119 ]
0120 if all_no_fc:
0121 WARN(f"{len(all_no_fc)} raw DB combos (lustre or not) have no FileCatalog entry. Check for corruption?")
0122 for run, daqhost in sorted(all_no_fc)[:5]:
0123 DEBUG(f" Run {run} {daqhost}")
0124
0125 INFO(f"Checking for combinations flagged below ratio cut {args.ratio_cut}...")
0126 flagged = []
0127 for runnumber, dsttype, lastevent in rows:
0128 runnumber = int(runnumber)
0129 eventsinrun = eventsinrun_by_run.get(runnumber)
0130
0131 if not eventsinrun:
0132 WARN(f"Run {runnumber} {dsttype}: eventsinrun=0, cannot compute ratio.")
0133 continue
0134
0135 ratio = lastevent / eventsinrun
0136 msg = f"Run {runnumber} {dsttype}: lastevent={lastevent}, eventsinrun={eventsinrun}, ratio={ratio:.3f}"
0137 if ratio < args.ratio_cut:
0138 WARN(msg)
0139 flagged.append((runnumber, dsttype))
0140 else:
0141 CHATTY(msg)
0142
0143 flagged = sorted(set(flagged))
0144 INFO(f"{len(flagged)} (run, dsttype) combinations flagged below ratio cut {args.ratio_cut}.")
0145
0146 if flagged:
0147 report_and_cleanup(flagged, args, match)
0148
0149 above_threshold = len(rows) - len(flagged)
0150 n_possible = len(all_combos)
0151 pct = 100.0 * above_threshold / n_possible if n_possible else 0.0
0152 INFO(f"Summary: {above_threshold}/{n_possible} possible combos have FileCatalog entries above threshold ({pct:.1f}%).")
0153
0154
0155 def report_and_cleanup(flagged, args, match):
0156 """
0157 Handle incomplete (runnumber, dsttype) combinations.
0158 Requires both --delete and --andgo to actually execute deletions.
0159 """
0160 if args.output:
0161 Path(args.output).parent.mkdir(parents=True, exist_ok=True)
0162 Path(args.output).write_text('\n'.join(f"{r} {d}" for r, d in flagged) + '\n')
0163 INFO(f"Flagged combinations written to {args.output}")
0164
0165 if not getattr(args, 'delete', False):
0166 return
0167
0168 dryrun = args.dryrun or not args.andgo
0169 if dryrun:
0170 WARN("--delete given without --andgo or --dryrun set: dry run only, no deletions performed.")
0171
0172 tag = match.outtriplet
0173
0174 for runnumber, dsttype in flagged:
0175 INFO(f"Deleting run {runnumber} {dsttype} from FileCatalog and production DB.")
0176
0177
0178 delete_files = f"""
0179 DELETE FROM files USING datasets
0180 WHERE lfn=filename
0181 AND tag='{tag}'
0182 AND dsttype='{dsttype}'
0183 AND runnumber={runnumber}
0184 """
0185
0186 delete_datasets = f"""
0187 DELETE FROM datasets
0188 WHERE tag='{tag}'
0189 AND dsttype='{dsttype}'
0190 AND runnumber={runnumber}
0191 """
0192
0193 delete_prodjobs = f"""
0194 DELETE FROM production_jobs
0195 WHERE tag='{tag}'
0196 AND dsttype='{dsttype}'
0197 AND runnumber={runnumber}
0198 """
0199
0200 CHATTY(delete_files)
0201 curs = dbQuery(cnxn_string_map['fcw'], delete_files, dryrun=dryrun)
0202 if curs: curs.commit()
0203
0204 CHATTY(delete_datasets)
0205 curs = dbQuery(cnxn_string_map['fcw'], delete_datasets, dryrun=dryrun)
0206 if curs: curs.commit()
0207
0208 CHATTY(delete_prodjobs)
0209 curs = dbQuery(cnxn_string_map['statw'], delete_prodjobs, dryrun=dryrun)
0210 if curs: curs.commit()
0211
0212
0213 if __name__ == '__main__':
0214 main()