Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:20

0001 #!/usr/bin/env python3
0002 """
0003 check_downstream.py
0004 
0005 For downstream productions, compare required input DSTs against the primary
0006 output DST at (run, segment) granularity.  The checker is read-only: it reports
0007 missing or short primary outputs and leaves cleanup/resubmission to other tools.
0008 """
0009 
0010 import sys
0011 from collections import Counter, defaultdict
0012 from dataclasses import dataclass
0013 from pathlib import Path
0014 from typing import Any, Dict, Iterable, List, Set, Tuple
0015 
0016 from argparsing import submission_args
0017 from simpleLogger import CHATTY, DEBUG, INFO, WARN, ERROR  # noqa: F401
0018 from sphenixjobdicts import required_seb_hosts
0019 from sphenixprodrules import pRUNFMT, pSEGFMT
0020 
0021 
0022 @dataclass(frozen=True)
0023 class DatasetInfo:
0024     dsttype: str
0025     runnumber: int
0026     segment: int
0027     events: int
0028     filename: str = ""
0029 
0030 
0031 @dataclass(frozen=True)
0032 class FlaggedWorkUnit:
0033     runnumber: int
0034     segment: int
0035     reasons: Tuple[str, ...]
0036     input_events: int
0037     output_events: int
0038 
0039     def report_line(self) -> str:
0040         return (
0041             f"{self.runnumber:{pRUNFMT}} {self.segment:{pSEGFMT}} "
0042             f"{','.join(self.reasons)} {self.input_events} {self.output_events}"
0043         )
0044 
0045 
0046 def _row_value(row: Any, name: str, index: int) -> Any:
0047     if hasattr(row, name):
0048         return getattr(row, name)
0049     return row[index]
0050 
0051 
0052 def _dataset_info_from_row(row: Any) -> DatasetInfo:
0053     return DatasetInfo(
0054         dsttype=str(_row_value(row, "dsttype", 0)),
0055         runnumber=int(_row_value(row, "runnumber", 1)),
0056         segment=int(_row_value(row, "segment", 2)),
0057         events=int(_row_value(row, "events", 3)),
0058         filename=str(_row_value(row, "filename", 4)),
0059     )
0060 
0061 
0062 def _sql_in(values: Iterable[str]) -> str:
0063     quoted = ",".join(f"'{value}'" for value in values)
0064     return f"({quoted})"
0065 
0066 
0067 def build_eligible_units(
0068     input_rows: Iterable[Any],
0069     required_input_types: Iterable[str],
0070     cut_segment: int = 1,
0071 ) -> Dict[Tuple[int, int], List[DatasetInfo]]:
0072     """Return run/segment units that have every required input dsttype."""
0073     required = set(required_input_types)
0074     by_unit: Dict[Tuple[int, int], Dict[str, DatasetInfo]] = defaultdict(dict)
0075 
0076     for row in input_rows:
0077         info = _dataset_info_from_row(row)
0078         if cut_segment and info.segment % cut_segment != 0:
0079             continue
0080         if info.dsttype not in required:
0081             continue
0082         # Duplicate catalog rows for the same type/run/segment should not change
0083         # eligibility.  Keep the highest event count for conservative checking.
0084         previous = by_unit[(info.runnumber, info.segment)].get(info.dsttype)
0085         if previous is None or info.events > previous.events:
0086             by_unit[(info.runnumber, info.segment)][info.dsttype] = info
0087 
0088     eligible = {}
0089     for unit, rows_by_type in by_unit.items():
0090         if set(rows_by_type) == required:
0091             eligible[unit] = [rows_by_type[dsttype] for dsttype in sorted(required)]
0092     return eligible
0093 
0094 
0095 def filter_runs_by_required_seb(
0096     input_rows: Iterable[Any],
0097     required_seb: Set[str],
0098     min_seb: int,
0099     raw_daqhosts_by_run: Dict[int, Set[str]],
0100 ) -> Set[int]:
0101     """Apply the calo required-SEB run-level availability check."""
0102     if not required_seb:
0103         return set()
0104 
0105     present_by_run: Dict[int, Set[str]] = defaultdict(set)
0106     prefix = "DST_TRIGGERED_EVENT_"
0107     for row in input_rows:
0108         info = _dataset_info_from_row(row)
0109         if not info.dsttype.startswith(prefix):
0110             continue
0111         host = info.dsttype[len(prefix):]
0112         if host in required_seb:
0113             present_by_run[info.runnumber].add(host)
0114 
0115     allowed = set()
0116     all_runs = set(raw_daqhosts_by_run) | set(present_by_run)
0117     failure_examples = 0
0118     for runnumber in all_runs:
0119         available_required = raw_daqhosts_by_run.get(runnumber, set()).intersection(required_seb)
0120         present_required = present_by_run.get(runnumber, set())
0121         if len(available_required) >= min_seb and len(present_required) >= min_seb:
0122             allowed.add(runnumber)
0123         else:
0124             failure_examples += 1
0125             if failure_examples <= 5:
0126                 DEBUG(
0127                     f"Run {runnumber}: required SEB availability failed "
0128                     f"(raw={len(available_required)}, catalog={len(present_required)}, min={min_seb})."
0129                 )
0130             elif failure_examples == 6:
0131                 DEBUG("Additional required SEB availability failures suppressed after 5 examples.")
0132     return allowed
0133 
0134 
0135 def find_flagged_units(
0136     eligible_units: Dict[Tuple[int, int], List[DatasetInfo]],
0137     output_rows: Iterable[Any],
0138     ratio_cut: float,
0139 ) -> List[FlaggedWorkUnit]:
0140     """Classify eligible units with missing, inconsistent, or short outputs."""
0141     outputs_by_unit: Dict[Tuple[int, int], DatasetInfo] = {}
0142     for row in output_rows:
0143         info = _dataset_info_from_row(row)
0144         previous = outputs_by_unit.get((info.runnumber, info.segment))
0145         if previous is None or info.events > previous.events:
0146             outputs_by_unit[(info.runnumber, info.segment)] = info
0147 
0148     flagged = []
0149     for (runnumber, segment), inputs in sorted(eligible_units.items()):
0150         input_event_counts = {info.events for info in inputs}
0151         input_events = inputs[0].events if len(input_event_counts) == 1 else -1
0152         output = outputs_by_unit.get((runnumber, segment))
0153         output_events = output.events if output else -1
0154 
0155         reasons = []
0156         if len(input_event_counts) != 1:
0157             reasons.append("input_mismatch")
0158         if output is None:
0159             reasons.append("missing_output")
0160         elif input_events > 0 and output.events / input_events < ratio_cut:
0161             reasons.append("low_output_events")
0162 
0163         if reasons:
0164             flagged.append(
0165                 FlaggedWorkUnit(
0166                     runnumber=runnumber,
0167                     segment=segment,
0168                     reasons=tuple(reasons),
0169                     input_events=input_events,
0170                     output_events=output_events,
0171                 )
0172             )
0173     return flagged
0174 
0175 
0176 def check_run_level_coverage(
0177     eligible_units: Dict[Tuple[int, int], List[DatasetInfo]],
0178     output_rows: Iterable[Any],
0179     ratio_cut: float,
0180 ) -> Set[int]:
0181     """Check whether summed output events reach summed input events per run.
0182 
0183     Returns the set of runnumbers that fail the coverage check.
0184     Logs up to 5 failing examples and a summary if more are found.
0185     """
0186     outputs_by_unit: Dict[Tuple[int, int], DatasetInfo] = {}
0187     for row in output_rows:
0188         info = _dataset_info_from_row(row)
0189         previous = outputs_by_unit.get((info.runnumber, info.segment))
0190         if previous is None or info.events > previous.events:
0191             outputs_by_unit[(info.runnumber, info.segment)] = info
0192 
0193     input_sum_by_run: Dict[int, int] = defaultdict(int)
0194     for (runnumber, _), inputs in eligible_units.items():
0195         # conservatively count the largest input event count for the unit
0196         input_sum_by_run[runnumber] += max(info.events for info in inputs)
0197 
0198     output_sum_by_run: Dict[int, int] = defaultdict(int)
0199     for (runnumber, _), info in outputs_by_unit.items():
0200         output_sum_by_run[runnumber] += info.events
0201 
0202     failing_runs: Set[int] = set()
0203     examples = 0
0204     no_input_examples = 0
0205     all_runs = set(input_sum_by_run) | set(output_sum_by_run)
0206     for run in sorted(all_runs):
0207         input_sum = input_sum_by_run.get(run, 0)
0208         output_sum = output_sum_by_run.get(run, 0)
0209         if input_sum <= 0:
0210             no_input_examples += 1
0211             if no_input_examples <= 5:
0212                 DEBUG(f"Run {run}: no input events to check run-level coverage.")
0213             elif no_input_examples == 6:
0214                 DEBUG("Additional no-input coverage examples suppressed after 5 examples.")
0215             continue
0216         ratio = output_sum / input_sum if input_sum else 0.0
0217         if ratio < ratio_cut:
0218             failing_runs.add(run)
0219             examples += 1
0220             if examples <= 5:
0221                 WARN(
0222                     f"Run {run}: run-level coverage failed (output={output_sum}, input={input_sum}, ratio={ratio:.3f}, min={ratio_cut})."
0223                 )
0224             elif examples == 6:
0225                 WARN("Additional run-level coverage failures suppressed after 5 examples.")
0226 
0227     INFO(f"{len(failing_runs)} runs fail run-level coverage (ratio < {ratio_cut}).")
0228     return failing_runs
0229 
0230 
0231 def check_coverage_against_raw(
0232     output_rows: Iterable[Any], raw_input_by_run: Dict[int, int], ratio_cut: float
0233 ) -> Set[int]:
0234     """Compare summed outputs (files DB) against summed available raw input per run.
0235 
0236     Logs up to 5 failing runs and returns the set of failing runnumbers.
0237     """
0238     outputs_by_unit: Dict[Tuple[int, int], DatasetInfo] = {}
0239     for row in output_rows:
0240         info = _dataset_info_from_row(row)
0241         previous = outputs_by_unit.get((info.runnumber, info.segment))
0242         if previous is None or info.events > previous.events:
0243             outputs_by_unit[(info.runnumber, info.segment)] = info
0244 
0245     output_sum_by_run: Dict[int, int] = defaultdict(int)
0246     for (runnumber, _), info in outputs_by_unit.items():
0247         output_sum_by_run[runnumber] += info.events
0248 
0249     failing = set()
0250     examples = 0
0251     for run in sorted(raw_input_by_run):
0252         raw_events = raw_input_by_run.get(run, 0)
0253         out_events = output_sum_by_run.get(run, 0)
0254         if raw_events <= 0:
0255             CHATTY(f"Run {run}: raw input events=0, skipping coverage check.")
0256             continue
0257         ratio = out_events / raw_events
0258         if ratio < ratio_cut:
0259             failing.add(run)
0260             examples += 1
0261             if examples <= 5:
0262                 WARN(
0263                     f"Run {run}: files DB covers {out_events}/{raw_events} events "
0264                     f"({ratio:.3f} < {ratio_cut})"
0265                 )
0266             elif examples == 6:
0267                 WARN("Additional files-vs-raw coverage failures suppressed after 5 examples.")
0268 
0269     INFO(f"{len(failing)} runs fail files-vs-raw coverage (ratio < {ratio_cut}).")
0270     return failing
0271 
0272 
0273 def write_report(flagged: List[FlaggedWorkUnit], output: str = None) -> None:
0274     lines = [unit.report_line() for unit in flagged]
0275     text = "\n".join(lines)
0276     if text:
0277         text += "\n"
0278 
0279     if output:
0280         Path(output).parent.mkdir(parents=True, exist_ok=True)
0281         Path(output).write_text(text)
0282         INFO(f"Flagged downstream work units written to {output}")
0283 
0284 
0285 def _load_rule_and_match(args) -> Tuple[Any, Any]:
0286     from sphenixmatching import MatchConfig
0287     from sphenixprodrules import RuleConfig
0288 
0289     param_overrides = {
0290         "runs": args.runs,
0291         "runlist": args.runlist,
0292         "nevents": args.nevents,
0293         "prodmode": "production",
0294         "check_legacy": args.check_legacy,
0295         "cut_segment": args.cut_segment,
0296     }
0297     if args.mangle_dirpath:
0298         param_overrides["prodmode"] = args.mangle_dirpath
0299     if args.physicsmode:
0300         param_overrides["physicsmode"] = args.physicsmode
0301 
0302     rule = RuleConfig.from_yaml_file(
0303         yaml_file=args.config,
0304         rule_name=args.rulename,
0305         param_overrides=param_overrides,
0306     )
0307     return rule, MatchConfig.from_rule_config(rule)
0308 
0309 
0310 def _query_raw_daqhosts(runnumbers: Iterable[int]) -> Dict[int, Set[str]]:
0311     from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0312 
0313     run_condition = list_to_condition(list(runnumbers))
0314     if not run_condition:
0315         return {}
0316     query = f"""
0317         SELECT DISTINCT runnumber, daqhost
0318         FROM datasets
0319         WHERE {run_condition}
0320     """
0321     rows = dbQuery(cnxn_string_map["rawr"], query).fetchall()
0322     hosts_by_run: Dict[int, Set[str]] = defaultdict(set)
0323     for row in rows:
0324         hosts_by_run[int(_row_value(row, "runnumber", 0))].add(str(_row_value(row, "daqhost", 1)))
0325     return hosts_by_run
0326 
0327 def _query_raw_input_events(match: Any, runnumbers: Iterable[int]) -> Dict[int, int]:
0328     """Query the raw DB for per-run available input events for trigger-SEB hosts.
0329 
0330     For downstream combining the target per run is the maximum events across
0331     DAQ hosts (not the sum). Returns mapping runnumber -> max_events.
0332     """
0333     from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0334 
0335     run_condition = list_to_condition(list(runnumbers))
0336     if not run_condition:
0337         return {}
0338 
0339     prefix = "DST_TRIGGERED_EVENT_"
0340     hosts = sorted({t[len(prefix):] for t in match.in_types if isinstance(t, str) and t.startswith(prefix)})
0341     if not hosts:
0342         return {}
0343 
0344     host_list = ",".join(f"'{h}'" for h in hosts)
0345     query = f"""
0346         SELECT runnumber, MAX(events) as max_events
0347         FROM datasets
0348         WHERE {run_condition}
0349           AND daqhost IN ({host_list})
0350         GROUP BY runnumber
0351     """
0352     rows = dbQuery(cnxn_string_map["rawr"], query).fetchall()
0353     return {int(r): int(e) for r, e in rows}
0354 
0355 def _query_inputs(match: Any, runnumbers: Iterable[int]) -> List[Any]:
0356     from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0357 
0358     run_condition = list_to_condition(list(runnumbers))
0359     if not run_condition:
0360         return []
0361 
0362     query = f"""
0363         SELECT dsttype, runnumber, segment, events, filename
0364         FROM {match.input_config.table}
0365         WHERE dataset='{match.dataset}'
0366           AND tag='{match.input_config.intriplet}'
0367           AND dsttype IN {_sql_in(match.in_types)}
0368           AND {run_condition}
0369           {match.input_config.infile_query_constraints}
0370         ORDER BY runnumber, segment, dsttype
0371     """
0372     return dbQuery(cnxn_string_map[match.input_config.db], query).fetchall()
0373 
0374 
0375 def _query_outputs(match: Any, runnumbers: Iterable[int]) -> List[Any]:
0376     from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0377 
0378     run_condition = list_to_condition(list(runnumbers))
0379     if not run_condition:
0380         return []
0381 
0382     query = f"""
0383         SELECT dsttype, runnumber, segment, events, filename
0384         FROM datasets
0385         WHERE dataset='{match.dataset}'
0386           AND tag='{match.outtriplet}'
0387           AND dsttype='{match.dsttype}'
0388           AND {run_condition}
0389         ORDER BY runnumber, segment
0390     """
0391     return dbQuery(cnxn_string_map["fcr"], query).fetchall()
0392 
0393 
0394 def main():
0395     args = submission_args()
0396 
0397     from simpleLogger import slogger
0398     import logging
0399     slogger.setLevel(logging.getLevelName(args.loglevel))
0400 
0401     rule, match = _load_rule_and_match(args)
0402 
0403     if "raw" in match.input_config.db:
0404         ERROR(
0405             f"Rule '{args.rulename}' is a raw/combining rule "
0406             f"(db={match.input_config.db}). Use check_eventcombiner.py instead."
0407         )
0408         sys.exit(2)
0409 
0410     goodruns = match.good_runlist()
0411     if not goodruns:
0412         INFO("No runs pass run quality cuts.")
0413         sys.exit(0)
0414     runnumbers = sorted(goodruns)
0415 
0416     neventsper = getattr(match.job_config, "neventsper", None)
0417     if neventsper is not None:
0418         try:
0419             neventsper = int(neventsper)
0420         except (TypeError, ValueError):
0421             neventsper = None
0422     if neventsper:
0423         total_expected_outputs = sum(
0424             (goodruns[run] + neventsper - 1) // neventsper
0425             for run in runnumbers
0426             if goodruns.get(run, 0) > 0
0427         )
0428         INFO(
0429             f"{total_expected_outputs} expected downstream output files "
0430             f"from events/neventsper={neventsper}."
0431         )
0432 
0433     input_rows = _query_inputs(match, runnumbers)
0434     INFO(f"{len(input_rows)} available input FileCatalog rows found.")
0435 
0436     required_seb = required_seb_hosts(match.dsttype)
0437     if required_seb:
0438         raw_daqhosts_by_run = _query_raw_daqhosts(runnumbers)
0439         allowed_runs = filter_runs_by_required_seb(
0440             input_rows=input_rows,
0441             required_seb=required_seb,
0442             min_seb=match.input_config.min_seb,
0443             raw_daqhosts_by_run=raw_daqhosts_by_run,
0444         )
0445         input_rows = [
0446             row for row in input_rows
0447             if int(_row_value(row, "runnumber", 1)) in allowed_runs
0448         ]
0449         INFO(f"{len(allowed_runs)} runs pass required SEB availability checks.")
0450 
0451     eligible_units = build_eligible_units(
0452         input_rows=input_rows,
0453         required_input_types=match.in_types,
0454         cut_segment=match.input_config.cut_segment,
0455     )
0456     INFO(f"{len(eligible_units)} available input combinations found.")
0457 
0458     output_rows = _query_outputs(match, runnumbers)
0459     INFO(f"{len(output_rows)} primary output FileCatalog rows found.")
0460 
0461     # Run-level coverage check: compare summed input events to summed outputs
0462     failing_runs = check_run_level_coverage(
0463         eligible_units=eligible_units,
0464         output_rows=output_rows,
0465         ratio_cut=args.ratio_cut,
0466     )
0467 
0468     # Also compare files DB outputs against available raw inputs where possible
0469     raw_input_by_run = _query_raw_input_events(match, runnumbers)
0470     raw_coverage_summary = None
0471     if raw_input_by_run:
0472         failing_raw = check_coverage_against_raw(
0473             output_rows=output_rows,
0474             raw_input_by_run=raw_input_by_run,
0475             ratio_cut=args.ratio_cut,
0476         )
0477         INFO(f"{len(failing_raw)} runs fail coverage against raw input.")
0478         raw_runs = len(raw_input_by_run)
0479         complete_runs = raw_runs - len(failing_raw)
0480         pct_raw = 100.0 * complete_runs / raw_runs if raw_runs else 0.0
0481         raw_coverage_summary = (
0482             f"Summary: {complete_runs}/{raw_runs} runs have downstream FileCatalog coverage "
0483             f"above threshold relative to raw input events ({pct_raw:.1f}%)."
0484         )
0485 
0486     flagged = find_flagged_units(
0487         eligible_units=eligible_units,
0488         output_rows=output_rows,
0489         ratio_cut=args.ratio_cut,
0490     )
0491 
0492     reason_counts = Counter(reason for unit in flagged for reason in unit.reasons)
0493     INFO(f"{len(flagged)} downstream work units flagged below ratio cut {args.ratio_cut}.")
0494     for reason, count in sorted(reason_counts.items()):
0495         INFO(f"{reason}: {count}")
0496 
0497     if flagged:
0498         write_report(flagged, args.output)
0499 
0500     if args.delete:
0501         WARN("--delete is ignored by check_downstream.py; this checker is read-only.")
0502 
0503     if raw_coverage_summary is not None:
0504         INFO(raw_coverage_summary)
0505     else:
0506         n_eligible = len(eligible_units)
0507         above_threshold = n_eligible - len(flagged)
0508         pct = 100.0 * above_threshold / n_eligible if n_eligible else 0.0
0509         INFO(f"Summary: {above_threshold}/{n_eligible} eligible units have output above threshold ({pct:.1f}%).")
0510 
0511 
0512 if __name__ == "__main__":
0513     main()