File indexing completed on 2026-06-26 08:40:20
0001
0002 """
0003 check_downstream.py
0004
0005 For downstream productions, compare required input DSTs against the primary
0006 output DST at (run, segment) granularity. The checker is read-only: it reports
0007 missing or short primary outputs and leaves cleanup/resubmission to other tools.
0008 """
0009
0010 import sys
0011 from collections import Counter, defaultdict
0012 from dataclasses import dataclass
0013 from pathlib import Path
0014 from typing import Any, Dict, Iterable, List, Set, Tuple
0015
0016 from argparsing import submission_args
0017 from simpleLogger import CHATTY, DEBUG, INFO, WARN, ERROR
0018 from sphenixjobdicts import required_seb_hosts
0019 from sphenixprodrules import pRUNFMT, pSEGFMT
0020
0021
0022 @dataclass(frozen=True)
0023 class DatasetInfo:
0024 dsttype: str
0025 runnumber: int
0026 segment: int
0027 events: int
0028 filename: str = ""
0029
0030
0031 @dataclass(frozen=True)
0032 class FlaggedWorkUnit:
0033 runnumber: int
0034 segment: int
0035 reasons: Tuple[str, ...]
0036 input_events: int
0037 output_events: int
0038
0039 def report_line(self) -> str:
0040 return (
0041 f"{self.runnumber:{pRUNFMT}} {self.segment:{pSEGFMT}} "
0042 f"{','.join(self.reasons)} {self.input_events} {self.output_events}"
0043 )
0044
0045
0046 def _row_value(row: Any, name: str, index: int) -> Any:
0047 if hasattr(row, name):
0048 return getattr(row, name)
0049 return row[index]
0050
0051
0052 def _dataset_info_from_row(row: Any) -> DatasetInfo:
0053 return DatasetInfo(
0054 dsttype=str(_row_value(row, "dsttype", 0)),
0055 runnumber=int(_row_value(row, "runnumber", 1)),
0056 segment=int(_row_value(row, "segment", 2)),
0057 events=int(_row_value(row, "events", 3)),
0058 filename=str(_row_value(row, "filename", 4)),
0059 )
0060
0061
0062 def _sql_in(values: Iterable[str]) -> str:
0063 quoted = ",".join(f"'{value}'" for value in values)
0064 return f"({quoted})"
0065
0066
0067 def build_eligible_units(
0068 input_rows: Iterable[Any],
0069 required_input_types: Iterable[str],
0070 cut_segment: int = 1,
0071 ) -> Dict[Tuple[int, int], List[DatasetInfo]]:
0072 """Return run/segment units that have every required input dsttype."""
0073 required = set(required_input_types)
0074 by_unit: Dict[Tuple[int, int], Dict[str, DatasetInfo]] = defaultdict(dict)
0075
0076 for row in input_rows:
0077 info = _dataset_info_from_row(row)
0078 if cut_segment and info.segment % cut_segment != 0:
0079 continue
0080 if info.dsttype not in required:
0081 continue
0082
0083
0084 previous = by_unit[(info.runnumber, info.segment)].get(info.dsttype)
0085 if previous is None or info.events > previous.events:
0086 by_unit[(info.runnumber, info.segment)][info.dsttype] = info
0087
0088 eligible = {}
0089 for unit, rows_by_type in by_unit.items():
0090 if set(rows_by_type) == required:
0091 eligible[unit] = [rows_by_type[dsttype] for dsttype in sorted(required)]
0092 return eligible
0093
0094
0095 def filter_runs_by_required_seb(
0096 input_rows: Iterable[Any],
0097 required_seb: Set[str],
0098 min_seb: int,
0099 raw_daqhosts_by_run: Dict[int, Set[str]],
0100 ) -> Set[int]:
0101 """Apply the calo required-SEB run-level availability check."""
0102 if not required_seb:
0103 return set()
0104
0105 present_by_run: Dict[int, Set[str]] = defaultdict(set)
0106 prefix = "DST_TRIGGERED_EVENT_"
0107 for row in input_rows:
0108 info = _dataset_info_from_row(row)
0109 if not info.dsttype.startswith(prefix):
0110 continue
0111 host = info.dsttype[len(prefix):]
0112 if host in required_seb:
0113 present_by_run[info.runnumber].add(host)
0114
0115 allowed = set()
0116 all_runs = set(raw_daqhosts_by_run) | set(present_by_run)
0117 failure_examples = 0
0118 for runnumber in all_runs:
0119 available_required = raw_daqhosts_by_run.get(runnumber, set()).intersection(required_seb)
0120 present_required = present_by_run.get(runnumber, set())
0121 if len(available_required) >= min_seb and len(present_required) >= min_seb:
0122 allowed.add(runnumber)
0123 else:
0124 failure_examples += 1
0125 if failure_examples <= 5:
0126 DEBUG(
0127 f"Run {runnumber}: required SEB availability failed "
0128 f"(raw={len(available_required)}, catalog={len(present_required)}, min={min_seb})."
0129 )
0130 elif failure_examples == 6:
0131 DEBUG("Additional required SEB availability failures suppressed after 5 examples.")
0132 return allowed
0133
0134
0135 def find_flagged_units(
0136 eligible_units: Dict[Tuple[int, int], List[DatasetInfo]],
0137 output_rows: Iterable[Any],
0138 ratio_cut: float,
0139 ) -> List[FlaggedWorkUnit]:
0140 """Classify eligible units with missing, inconsistent, or short outputs."""
0141 outputs_by_unit: Dict[Tuple[int, int], DatasetInfo] = {}
0142 for row in output_rows:
0143 info = _dataset_info_from_row(row)
0144 previous = outputs_by_unit.get((info.runnumber, info.segment))
0145 if previous is None or info.events > previous.events:
0146 outputs_by_unit[(info.runnumber, info.segment)] = info
0147
0148 flagged = []
0149 for (runnumber, segment), inputs in sorted(eligible_units.items()):
0150 input_event_counts = {info.events for info in inputs}
0151 input_events = inputs[0].events if len(input_event_counts) == 1 else -1
0152 output = outputs_by_unit.get((runnumber, segment))
0153 output_events = output.events if output else -1
0154
0155 reasons = []
0156 if len(input_event_counts) != 1:
0157 reasons.append("input_mismatch")
0158 if output is None:
0159 reasons.append("missing_output")
0160 elif input_events > 0 and output.events / input_events < ratio_cut:
0161 reasons.append("low_output_events")
0162
0163 if reasons:
0164 flagged.append(
0165 FlaggedWorkUnit(
0166 runnumber=runnumber,
0167 segment=segment,
0168 reasons=tuple(reasons),
0169 input_events=input_events,
0170 output_events=output_events,
0171 )
0172 )
0173 return flagged
0174
0175
0176 def check_run_level_coverage(
0177 eligible_units: Dict[Tuple[int, int], List[DatasetInfo]],
0178 output_rows: Iterable[Any],
0179 ratio_cut: float,
0180 ) -> Set[int]:
0181 """Check whether summed output events reach summed input events per run.
0182
0183 Returns the set of runnumbers that fail the coverage check.
0184 Logs up to 5 failing examples and a summary if more are found.
0185 """
0186 outputs_by_unit: Dict[Tuple[int, int], DatasetInfo] = {}
0187 for row in output_rows:
0188 info = _dataset_info_from_row(row)
0189 previous = outputs_by_unit.get((info.runnumber, info.segment))
0190 if previous is None or info.events > previous.events:
0191 outputs_by_unit[(info.runnumber, info.segment)] = info
0192
0193 input_sum_by_run: Dict[int, int] = defaultdict(int)
0194 for (runnumber, _), inputs in eligible_units.items():
0195
0196 input_sum_by_run[runnumber] += max(info.events for info in inputs)
0197
0198 output_sum_by_run: Dict[int, int] = defaultdict(int)
0199 for (runnumber, _), info in outputs_by_unit.items():
0200 output_sum_by_run[runnumber] += info.events
0201
0202 failing_runs: Set[int] = set()
0203 examples = 0
0204 no_input_examples = 0
0205 all_runs = set(input_sum_by_run) | set(output_sum_by_run)
0206 for run in sorted(all_runs):
0207 input_sum = input_sum_by_run.get(run, 0)
0208 output_sum = output_sum_by_run.get(run, 0)
0209 if input_sum <= 0:
0210 no_input_examples += 1
0211 if no_input_examples <= 5:
0212 DEBUG(f"Run {run}: no input events to check run-level coverage.")
0213 elif no_input_examples == 6:
0214 DEBUG("Additional no-input coverage examples suppressed after 5 examples.")
0215 continue
0216 ratio = output_sum / input_sum if input_sum else 0.0
0217 if ratio < ratio_cut:
0218 failing_runs.add(run)
0219 examples += 1
0220 if examples <= 5:
0221 WARN(
0222 f"Run {run}: run-level coverage failed (output={output_sum}, input={input_sum}, ratio={ratio:.3f}, min={ratio_cut})."
0223 )
0224 elif examples == 6:
0225 WARN("Additional run-level coverage failures suppressed after 5 examples.")
0226
0227 INFO(f"{len(failing_runs)} runs fail run-level coverage (ratio < {ratio_cut}).")
0228 return failing_runs
0229
0230
0231 def check_coverage_against_raw(
0232 output_rows: Iterable[Any], raw_input_by_run: Dict[int, int], ratio_cut: float
0233 ) -> Set[int]:
0234 """Compare summed outputs (files DB) against summed available raw input per run.
0235
0236 Logs up to 5 failing runs and returns the set of failing runnumbers.
0237 """
0238 outputs_by_unit: Dict[Tuple[int, int], DatasetInfo] = {}
0239 for row in output_rows:
0240 info = _dataset_info_from_row(row)
0241 previous = outputs_by_unit.get((info.runnumber, info.segment))
0242 if previous is None or info.events > previous.events:
0243 outputs_by_unit[(info.runnumber, info.segment)] = info
0244
0245 output_sum_by_run: Dict[int, int] = defaultdict(int)
0246 for (runnumber, _), info in outputs_by_unit.items():
0247 output_sum_by_run[runnumber] += info.events
0248
0249 failing = set()
0250 examples = 0
0251 for run in sorted(raw_input_by_run):
0252 raw_events = raw_input_by_run.get(run, 0)
0253 out_events = output_sum_by_run.get(run, 0)
0254 if raw_events <= 0:
0255 CHATTY(f"Run {run}: raw input events=0, skipping coverage check.")
0256 continue
0257 ratio = out_events / raw_events
0258 if ratio < ratio_cut:
0259 failing.add(run)
0260 examples += 1
0261 if examples <= 5:
0262 WARN(
0263 f"Run {run}: files DB covers {out_events}/{raw_events} events "
0264 f"({ratio:.3f} < {ratio_cut})"
0265 )
0266 elif examples == 6:
0267 WARN("Additional files-vs-raw coverage failures suppressed after 5 examples.")
0268
0269 INFO(f"{len(failing)} runs fail files-vs-raw coverage (ratio < {ratio_cut}).")
0270 return failing
0271
0272
0273 def write_report(flagged: List[FlaggedWorkUnit], output: str = None) -> None:
0274 lines = [unit.report_line() for unit in flagged]
0275 text = "\n".join(lines)
0276 if text:
0277 text += "\n"
0278
0279 if output:
0280 Path(output).parent.mkdir(parents=True, exist_ok=True)
0281 Path(output).write_text(text)
0282 INFO(f"Flagged downstream work units written to {output}")
0283
0284
0285 def _load_rule_and_match(args) -> Tuple[Any, Any]:
0286 from sphenixmatching import MatchConfig
0287 from sphenixprodrules import RuleConfig
0288
0289 param_overrides = {
0290 "runs": args.runs,
0291 "runlist": args.runlist,
0292 "nevents": args.nevents,
0293 "prodmode": "production",
0294 "check_legacy": args.check_legacy,
0295 "cut_segment": args.cut_segment,
0296 }
0297 if args.mangle_dirpath:
0298 param_overrides["prodmode"] = args.mangle_dirpath
0299 if args.physicsmode:
0300 param_overrides["physicsmode"] = args.physicsmode
0301
0302 rule = RuleConfig.from_yaml_file(
0303 yaml_file=args.config,
0304 rule_name=args.rulename,
0305 param_overrides=param_overrides,
0306 )
0307 return rule, MatchConfig.from_rule_config(rule)
0308
0309
0310 def _query_raw_daqhosts(runnumbers: Iterable[int]) -> Dict[int, Set[str]]:
0311 from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0312
0313 run_condition = list_to_condition(list(runnumbers))
0314 if not run_condition:
0315 return {}
0316 query = f"""
0317 SELECT DISTINCT runnumber, daqhost
0318 FROM datasets
0319 WHERE {run_condition}
0320 """
0321 rows = dbQuery(cnxn_string_map["rawr"], query).fetchall()
0322 hosts_by_run: Dict[int, Set[str]] = defaultdict(set)
0323 for row in rows:
0324 hosts_by_run[int(_row_value(row, "runnumber", 0))].add(str(_row_value(row, "daqhost", 1)))
0325 return hosts_by_run
0326
0327 def _query_raw_input_events(match: Any, runnumbers: Iterable[int]) -> Dict[int, int]:
0328 """Query the raw DB for per-run available input events for trigger-SEB hosts.
0329
0330 For downstream combining the target per run is the maximum events across
0331 DAQ hosts (not the sum). Returns mapping runnumber -> max_events.
0332 """
0333 from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0334
0335 run_condition = list_to_condition(list(runnumbers))
0336 if not run_condition:
0337 return {}
0338
0339 prefix = "DST_TRIGGERED_EVENT_"
0340 hosts = sorted({t[len(prefix):] for t in match.in_types if isinstance(t, str) and t.startswith(prefix)})
0341 if not hosts:
0342 return {}
0343
0344 host_list = ",".join(f"'{h}'" for h in hosts)
0345 query = f"""
0346 SELECT runnumber, MAX(events) as max_events
0347 FROM datasets
0348 WHERE {run_condition}
0349 AND daqhost IN ({host_list})
0350 GROUP BY runnumber
0351 """
0352 rows = dbQuery(cnxn_string_map["rawr"], query).fetchall()
0353 return {int(r): int(e) for r, e in rows}
0354
0355 def _query_inputs(match: Any, runnumbers: Iterable[int]) -> List[Any]:
0356 from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0357
0358 run_condition = list_to_condition(list(runnumbers))
0359 if not run_condition:
0360 return []
0361
0362 query = f"""
0363 SELECT dsttype, runnumber, segment, events, filename
0364 FROM {match.input_config.table}
0365 WHERE dataset='{match.dataset}'
0366 AND tag='{match.input_config.intriplet}'
0367 AND dsttype IN {_sql_in(match.in_types)}
0368 AND {run_condition}
0369 {match.input_config.infile_query_constraints}
0370 ORDER BY runnumber, segment, dsttype
0371 """
0372 return dbQuery(cnxn_string_map[match.input_config.db], query).fetchall()
0373
0374
0375 def _query_outputs(match: Any, runnumbers: Iterable[int]) -> List[Any]:
0376 from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0377
0378 run_condition = list_to_condition(list(runnumbers))
0379 if not run_condition:
0380 return []
0381
0382 query = f"""
0383 SELECT dsttype, runnumber, segment, events, filename
0384 FROM datasets
0385 WHERE dataset='{match.dataset}'
0386 AND tag='{match.outtriplet}'
0387 AND dsttype='{match.dsttype}'
0388 AND {run_condition}
0389 ORDER BY runnumber, segment
0390 """
0391 return dbQuery(cnxn_string_map["fcr"], query).fetchall()
0392
0393
0394 def main():
0395 args = submission_args()
0396
0397 from simpleLogger import slogger
0398 import logging
0399 slogger.setLevel(logging.getLevelName(args.loglevel))
0400
0401 rule, match = _load_rule_and_match(args)
0402
0403 if "raw" in match.input_config.db:
0404 ERROR(
0405 f"Rule '{args.rulename}' is a raw/combining rule "
0406 f"(db={match.input_config.db}). Use check_eventcombiner.py instead."
0407 )
0408 sys.exit(2)
0409
0410 goodruns = match.good_runlist()
0411 if not goodruns:
0412 INFO("No runs pass run quality cuts.")
0413 sys.exit(0)
0414 runnumbers = sorted(goodruns)
0415
0416 neventsper = getattr(match.job_config, "neventsper", None)
0417 if neventsper is not None:
0418 try:
0419 neventsper = int(neventsper)
0420 except (TypeError, ValueError):
0421 neventsper = None
0422 if neventsper:
0423 total_expected_outputs = sum(
0424 (goodruns[run] + neventsper - 1) // neventsper
0425 for run in runnumbers
0426 if goodruns.get(run, 0) > 0
0427 )
0428 INFO(
0429 f"{total_expected_outputs} expected downstream output files "
0430 f"from events/neventsper={neventsper}."
0431 )
0432
0433 input_rows = _query_inputs(match, runnumbers)
0434 INFO(f"{len(input_rows)} available input FileCatalog rows found.")
0435
0436 required_seb = required_seb_hosts(match.dsttype)
0437 if required_seb:
0438 raw_daqhosts_by_run = _query_raw_daqhosts(runnumbers)
0439 allowed_runs = filter_runs_by_required_seb(
0440 input_rows=input_rows,
0441 required_seb=required_seb,
0442 min_seb=match.input_config.min_seb,
0443 raw_daqhosts_by_run=raw_daqhosts_by_run,
0444 )
0445 input_rows = [
0446 row for row in input_rows
0447 if int(_row_value(row, "runnumber", 1)) in allowed_runs
0448 ]
0449 INFO(f"{len(allowed_runs)} runs pass required SEB availability checks.")
0450
0451 eligible_units = build_eligible_units(
0452 input_rows=input_rows,
0453 required_input_types=match.in_types,
0454 cut_segment=match.input_config.cut_segment,
0455 )
0456 INFO(f"{len(eligible_units)} available input combinations found.")
0457
0458 output_rows = _query_outputs(match, runnumbers)
0459 INFO(f"{len(output_rows)} primary output FileCatalog rows found.")
0460
0461
0462 failing_runs = check_run_level_coverage(
0463 eligible_units=eligible_units,
0464 output_rows=output_rows,
0465 ratio_cut=args.ratio_cut,
0466 )
0467
0468
0469 raw_input_by_run = _query_raw_input_events(match, runnumbers)
0470 raw_coverage_summary = None
0471 if raw_input_by_run:
0472 failing_raw = check_coverage_against_raw(
0473 output_rows=output_rows,
0474 raw_input_by_run=raw_input_by_run,
0475 ratio_cut=args.ratio_cut,
0476 )
0477 INFO(f"{len(failing_raw)} runs fail coverage against raw input.")
0478 raw_runs = len(raw_input_by_run)
0479 complete_runs = raw_runs - len(failing_raw)
0480 pct_raw = 100.0 * complete_runs / raw_runs if raw_runs else 0.0
0481 raw_coverage_summary = (
0482 f"Summary: {complete_runs}/{raw_runs} runs have downstream FileCatalog coverage "
0483 f"above threshold relative to raw input events ({pct_raw:.1f}%)."
0484 )
0485
0486 flagged = find_flagged_units(
0487 eligible_units=eligible_units,
0488 output_rows=output_rows,
0489 ratio_cut=args.ratio_cut,
0490 )
0491
0492 reason_counts = Counter(reason for unit in flagged for reason in unit.reasons)
0493 INFO(f"{len(flagged)} downstream work units flagged below ratio cut {args.ratio_cut}.")
0494 for reason, count in sorted(reason_counts.items()):
0495 INFO(f"{reason}: {count}")
0496
0497 if flagged:
0498 write_report(flagged, args.output)
0499
0500 if args.delete:
0501 WARN("--delete is ignored by check_downstream.py; this checker is read-only.")
0502
0503 if raw_coverage_summary is not None:
0504 INFO(raw_coverage_summary)
0505 else:
0506 n_eligible = len(eligible_units)
0507 above_threshold = n_eligible - len(flagged)
0508 pct = 100.0 * above_threshold / n_eligible if n_eligible else 0.0
0509 INFO(f"Summary: {above_threshold}/{n_eligible} eligible units have output above threshold ({pct:.1f}%).")
0510
0511
0512 if __name__ == "__main__":
0513 main()