Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 from typing import Dict, List, Tuple, Set, Any
0002 import itertools
0003 import operator
0004 from dataclasses import dataclass, asdict
0005 from pathlib import Path
0006 import shutil
0007 from datetime import datetime
0008 import pprint # noqa: F401
0009 import psutil
0010 import math
0011 from contextlib import nullcontext # For optional file writing
0012 
0013 from sphenixprodrules import RuleConfig, InputConfig
0014 from sphenixprodrules import pRUNFMT,pSEGFMT
0015 from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0016 from simpleLogger import CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0017 from sphenixjobdicts import inputs_from_output
0018 from sphenixmisc import binary_contains_bisect, shell_command
0019 
0020 from collections import namedtuple
0021 FileHostRunSegStat = namedtuple('FileHostRunSeg',['filename','daqhost','runnumber','segment','status'])
0022 
0023 """ This file contains the classes for matching runs and files to a rule.
0024     MatchConfig is the steering class for db queries to
0025     find appropriate input files and name the output files.
0026     It is constructed from a RuleConfig object.
0027 """
0028 
0029 # Striving to keep Dataclasses immutable (frozen=True)
0030 # All modifications should be done in the constructor
0031 
0032 # ============================================================================
0033 
0034 @dataclass( frozen = True )
0035 class MatchConfig:
0036     dsttype:        str
0037     runlist_int:    str
0038     input_config:   InputConfig
0039     dataset:        str
0040     outtriplet:     str
0041     physicsmode:    str
0042     filesystem:     Dict
0043     rungroup_tmpl:  str
0044     job_config:     Any
0045 
0046     # Internal, derived variables
0047     dst_type_template: str
0048     in_types:          Any # Fixme, should always be List[str]
0049     input_stem:        Any
0050 
0051     # ------------------------------------------------
0052     @classmethod
0053     def from_rule_config(cls, rule_config: RuleConfig):
0054         """
0055         Constructs a MatchConfig object partially from a RuleConfig object.
0056 
0057         Args:
0058             rule_config: The RuleConfig object to extract data from.
0059 
0060         Returns:
0061             A MatchConfig object with fields pre-populated from the RuleConfig.
0062         """
0063 
0064         dsttype       = rule_config.dsttype
0065         runlist_int   = rule_config.runlist_int
0066         input_config  = rule_config.input_config
0067         dataset       = rule_config.dataset
0068         outtriplet    = rule_config.outtriplet
0069         physicsmode   = rule_config.physicsmode
0070         filesystem    = rule_config.job_config.filesystem
0071         rungroup_tmpl = rule_config.job_config.rungroup_tmpl
0072         job_config    = rule_config.job_config
0073         
0074         ## derived
0075         dst_type_template = f'{dsttype}'
0076         # This test should be equivalent to if 'raw' in input_config.db
0077         if 'TRIGGERED' in dsttype or 'STREAMING' in dsttype:
0078             dst_type_template += '_%'
0079             dst_type_template += '%'
0080 
0081         ### Assemble leafs, where needed
0082         input_stem = inputs_from_output[dsttype]
0083         CHATTY( f'Input files are of the form:\n{pprint.pformat(input_stem)}')
0084         if isinstance(input_stem, dict):
0085             in_types = list(input_stem.values())
0086         else :
0087             in_types = input_stem
0088         if 'raw' in input_config.db:
0089             in_types.insert(0,'gl1daq') # all raw daq files need an extra GL1 file
0090 
0091         return cls(
0092             dsttype       = dsttype,
0093             runlist_int   = runlist_int,
0094             input_config  = input_config,
0095             dataset       = dataset,
0096             outtriplet    = outtriplet,
0097             physicsmode   = physicsmode,
0098             filesystem    = filesystem,
0099             rungroup_tmpl = rungroup_tmpl,
0100             job_config    = job_config,
0101             ## derived
0102             dst_type_template = dst_type_template,
0103             in_types = in_types,
0104             input_stem = input_stem,
0105         )
0106 
0107     # ------------------------------------------------
0108     def dict(self):
0109         return { k: str(v) for k, v in asdict(self).items() if v is not None }
0110 
0111     # ------------------------------------------------
0112     def good_runlist(self, subset_runlist: List[int] = None) -> List[int]:
0113         ### Run quality
0114         CHATTY(f"Resident Memory: {psutil.Process().memory_info().rss / 1024 / 1024:.0f} MB")
0115         # Here would be a  good spot to check against golden or bad runlists and to enforce quality cuts on the runs
0116 
0117         # Use subset if provided, otherwise use full runlist
0118         runlist_to_check = subset_runlist if subset_runlist is not None else self.runlist_int
0119 
0120         INFO("Checking runlist against run quality cuts.")
0121         run_quality_tmpl="""
0122 select distinct(runnumber) from run
0123  where
0124 runnumber>={runmin} and runnumber <= {runmax}
0125  and
0126 runtype='{physicsmode}'
0127  and
0128 eventsinrun >= {min_run_events}
0129  and
0130 EXTRACT(EPOCH FROM (ertimestamp-brtimestamp)) >={min_run_time}
0131 order by runnumber
0132 ;"""
0133         run_quality_query=run_quality_tmpl.format(
0134             runmin=min(runlist_to_check),
0135             runmax=max(runlist_to_check),
0136             physicsmode=self.physicsmode,
0137             min_run_events=self.input_config.min_run_events,
0138             min_run_time=self.input_config.min_run_time,
0139         )
0140         goodruns=[ int(r) for (r,) in dbQuery( cnxn_string_map['daqr'], run_quality_query).fetchall() ]
0141         # tighten run condition now
0142         runlist_int=[ run for run in runlist_to_check if run in goodruns ]
0143         if runlist_int==[]:
0144             return []
0145         INFO(f"{len(runlist_int)} runs pass run quality cuts.")
0146         DEBUG(f"Runlist: {runlist_int}")
0147         return runlist_int
0148 
0149     # ------------------------------------------------
0150     def get_files_in_db(self, runnumbers: Any) :
0151         ## Note: Not all constraints are needed, but they may speed up the query
0152         exist_query  = f"""select filename from datasets
0153         where dataset='{self.dataset}'
0154         and tag='{self.outtriplet}'
0155         and dsttype like '{self.dst_type_template}'"""
0156 
0157         run_condition=list_to_condition(runnumbers)
0158         if run_condition!="" :
0159             exist_query += f"\n\tand {run_condition}"
0160         existing_output = [ c.filename for c in dbQuery( cnxn_string_map['fcr'], exist_query ) ]
0161         existing_output.sort()
0162         return existing_output
0163 
0164     # ------------------------------------------------
0165     def get_output_files(self, filemask: str = r"\*.root:\*", dstlistname: str=None, dryrun: bool=True) -> List[str]:
0166         ### Which find command to use for lustre?
0167         find=shutil.which('find')
0168         lfind = shutil.which('lfs')
0169         if lfind is None:
0170             WARN("'lfs find' not found")
0171             lfind = shutil.which('find')
0172         else:
0173             lfind = f'{lfind} find'
0174             INFO(f'Using find={find} and lfind={lfind}.')
0175 
0176         if dstlistname:
0177             INFO(f"Piping output to {dstlistname}")
0178             if not dryrun:
0179                 Path(dstlistname).unlink(missing_ok=True)
0180             else:
0181                 dstlistname="/dev/null"
0182                 INFO(f"Dryrun. Piping output to {dstlistname}")
0183 
0184         outlocation=self.filesystem['outdir']
0185         # Further down, we will simplify by assuming finaldir == outdir, otherwise this script shouldn't be used.
0186         finaldir=self.filesystem['finaldir']
0187         if finaldir != outlocation:
0188             ERROR("Found finaldir != outdir. Use/adapt dstlakespider instead." )
0189             print(f"finaldir = {finaldir}")
0190             print(f"outdir = {outlocation}")
0191             exit(1)
0192         INFO(f"Directory tree: {outlocation}")
0193 
0194         # All leafs:
0195         leafparent=outlocation.split('/{leafdir}')[0]
0196         leafdirs_cmd=rf"{find} {leafparent} -type d -name {self.dsttype}\* -mindepth 1 -a -maxdepth 1"
0197         leafdirs = shell_command(leafdirs_cmd)
0198         CHATTY(f"Leaf directories: \n{pprint.pformat(leafdirs)}")
0199 
0200         # Run groups that we're interested in
0201         sorted_runlist = sorted(self.runlist_int)
0202         def rungroup(run):
0203             return self.rungroup_tmpl.format(a=100*math.floor(run/100), b=100*math.ceil((run+1)/100))
0204         desirable_rungroups = { rungroup(run) for run in sorted_runlist }
0205         runs_by_group = { group : [] for group in desirable_rungroups}
0206         outidentifier=f'{self.dataset}_{self.outtriplet}'
0207         for run in sorted_runlist:
0208             # runs_by_group[rungroup(run)].append(str(run))
0209             runstr=f'{outidentifier}-{run:{pRUNFMT}}'
0210             ## could also add segment, runstr+=f'-{segment:{pSEGFMT}}'
0211             runs_by_group[rungroup(run)].append(runstr)
0212 
0213         # INFO(f"Size of the filter dictionary is {sys.getsizeof(runs_by_group)} bytes")
0214         # INFO(f"Length of the filter dictionary is {len(runs_by_group.keys())}")
0215         # INFO(f"Size of one entry is {sys.getsizeof(runs_by_group['run_00072000_00072100'])} bytes")
0216         # INFO(f"Size of one string is {sys.getsizeof(runs_by_group['run_00072000_00072100'][0])} bytes")
0217         ## --> Negligible. << 1MB
0218 
0219         ### Walk through leafs - assume rungroups may change between run groups
0220         ret=[]
0221 
0222         tstart=datetime.now()
0223         with open(dstlistname,"w") if dstlistname else nullcontext() as dstlistfile:
0224             for leafdir in leafdirs :
0225                 CHATTY(f"Searching {leafdir}")
0226                 available_rungroups = shell_command(rf"{find} {leafdir} -name run_\* -type d -mindepth 1 -a -maxdepth 1")
0227                 DEBUG(f"Resident Memory: {psutil.Process().memory_info().rss / 1024 / 1024:.0f} MB")
0228                 
0229                 # Want to have the subset of available rungroups where a desirable rungroup is a substring (cause the former have the full path)
0230                 rungroups = {rg for rg in available_rungroups if any( drg in rg for drg in desirable_rungroups) }
0231                 DEBUG(f"For {leafdir}, we have {len(rungroups)} run groups to work on")                
0232                 for rungroup in rungroups:
0233                     runs_str=runs_by_group[Path(rungroup).name]
0234                     find_command=f"{lfind} {rungroup} -type f -name {filemask}"
0235                     CHATTY(find_command)
0236                     group_runs = shell_command(find_command)
0237                     # Enforce run number constraint
0238                     group_runs = [ run for run in group_runs if any( dr in run for dr in runs_str) ]
0239                     if dstlistfile:
0240                         for run in group_runs:
0241                             dstlistfile.write(f"{run}\n")
0242                     else:
0243                         ret += group_runs
0244         INFO(f"List creation took {(datetime.now() - tstart).total_seconds():.2f} seconds.")
0245         return ret
0246 
0247     # ------------------------------------------------
0248     def select_matches_for_combination(self, files_for_run: Dict[str, List[FileHostRunSegStat]],
0249                                        runnumber: int) -> Dict[str, List[FileHostRunSegStat]]:
0250         gl1_files = files_for_run.pop('gl1daq',None)
0251         if gl1_files is None:
0252             WARN(f"No GL1 files found for run {runnumber}. Skipping this run.")
0253             return {}
0254         CHATTY(f'All GL1 files for for run {runnumber}:\n{gl1_files}')
0255 
0256         # We need to determine which segments are present
0257         segments=set()
0258         for host in files_for_run:
0259             for f in files_for_run[host]:
0260                 if f.status==1:
0261                     segments.add(f.segment)
0262         if segments:
0263             CHATTY(f"Run {runnumber} has {len(segments)} segments in the input streams: {sorted(segments)}")
0264 
0265         #segswitch="seg0fromdb"
0266 
0267         return files_for_run
0268 
0269     # ------------------------------------------------
0270     def get_prod_status(self, runnumbers):
0271         ### Check production status
0272         INFO(f'Checking for output already in production for {runnumbers}')
0273         run_condition=list_to_condition(runnumbers)
0274         if run_condition!="" :
0275             run_condition = f"and {run_condition.replace('runnumber','run')}"
0276 
0277         status_query  = f"""select dstfile,status from production_status
0278         where status!='finished'
0279          {run_condition} 
0280          and dstname like '{self.dst_type_template}%{self.outtriplet}' {self.input_config.status_query_constraints}
0281         order by run desc;"""
0282 
0283         now=datetime.now()        
0284         existing_status = { c.dstfile : c.status for c in dbQuery( cnxn_string_map['statr'], status_query ) }
0285         INFO(f'Query took {(datetime.now() - now).total_seconds():.2f} seconds.')
0286         return existing_status
0287 
0288     # ------------------------------------------------
0289     def daqhosts_for_combining(self, subset_runlist: List[int] = None) -> Dict[int, Set[int]]:
0290         ### Which DAQ hosts have all required segments present in the file catalog for a given run?
0291 
0292         # Run quality:
0293         goodruns=self.good_runlist(subset_runlist)
0294         if goodruns==[]:
0295             INFO( "No runs pass run quality cuts.")
0296             return {}
0297         INFO(f"{len(goodruns)} runs pass run quality cuts.")
0298         DEBUG(f"Runlist: {goodruns}")
0299         if goodruns==[]:
0300             return {}
0301         run_condition=list_to_condition(goodruns)
0302 
0303         # If we only care about segment 0, we can skip a lot of the checks
0304         if self.input_config.combine_seg0_only:
0305             INFO("Only combining segment 0. Skipping detailed checks.")
0306             
0307             # Which hosts have a segment 0 in the file catalog?
0308             lustre_query =   "select runnumber,daqhost from datasets"
0309             lustre_query += f" WHERE {run_condition}"
0310             lustre_query += f" AND segment=0 AND status::int > 0"
0311             lustre_query += f" AND daqhost in {tuple(self.in_types)}"
0312             lustre_result = dbQuery( cnxn_string_map[ self.input_config.db ], lustre_query ).fetchall()
0313             daqhosts_for_combining = {}
0314             for r,h in lustre_result:
0315                 if r not in daqhosts_for_combining:
0316                     daqhosts_for_combining[r] = set()
0317                 daqhosts_for_combining[r].add(h)
0318             for run in daqhosts_for_combining:
0319                 CHATTY(f"Available on lustre for run {run}: {daqhosts_for_combining.get(run,set())}")
0320 
0321             return daqhosts_for_combining
0322 
0323         ### More general case, need to check all segments
0324         # How many segments were produced per daqhost?
0325         seg_query=   "select runnumber,hostname,count(sequence) from filelist"
0326         seg_query+= f" WHERE {run_condition}"
0327         seg_query+= f" and hostname in {tuple(self.in_types)}"
0328         seg_query+=  " group by runnumber,hostname;"
0329         seg_result = dbQuery( cnxn_string_map['daqr'], seg_query ).fetchall()
0330         run_segs = {}
0331         for r,h,s in seg_result:
0332             if r not in run_segs:
0333                 run_segs[r] = {}
0334             run_segs[r][h] = s
0335 
0336         ### How many segments are actually present in the file catalog?
0337         lustre_query =   "select runnumber,daqhost,count(status) from datasets"
0338         lustre_query += f" where {run_condition}"
0339         lustre_query += f" and daqhost in {tuple(self.in_types)}"
0340         lustre_query += f" and status::int > 0"
0341         lustre_query +=  " group by runnumber,daqhost;"
0342         lustre_result = dbQuery( cnxn_string_map[ self.input_config.db ], lustre_query ).fetchall()
0343         lustre_segs = {}
0344         for r,h,s in lustre_result:
0345             if r not in lustre_segs:
0346                 lustre_segs[r] = {}
0347             lustre_segs[r][h] = s
0348         
0349         ## Now compare the two and decide which runs to use
0350         ## For a given host, all segments must be present
0351         daqhosts_for_combining = {}
0352         for r, hosts in run_segs.items():
0353             CHATTY(f"Produced segments for run {r}: {hosts}")
0354             if r in lustre_segs:
0355                 CHATTY(f"Available on lustre for run {r}: {lustre_segs[r]}")
0356                 for h, s in hosts.items():
0357                     if h in lustre_segs[r]:
0358                         if s == lustre_segs[r][h]:
0359                             daqhosts_for_combining[r] = daqhosts_for_combining.get(r, set())
0360                             daqhosts_for_combining[r].add(h)
0361                             CHATTY(f"Run {r} has all {s} segments for host {h}. Using this host.")
0362                         else:
0363                             CHATTY(f"Run {r} host {h} has only {lustre_segs[r][h]} out of {s} segments on lustre. Not using this host.")
0364 
0365         return daqhosts_for_combining
0366 
0367     # ------------------------------------------------
0368     def devmatches(self, subset_runlist: List[int] = None) :
0369         ### Match parameters are set, now build up the list of inputs and construct corresponding output file names
0370         # The logic for combination and downstream jobs is sufficiently different to warrant separate functions
0371         start=datetime.now()
0372         if 'raw' in self.input_config.db:
0373             rule_matches = {}
0374             segswitch="seg0fromdb"
0375             if not self.input_config.combine_seg0_only:
0376                 segswitch="allsegsfromdb"
0377             daqhosts_for_combining = self.daqhosts_for_combining(subset_runlist)
0378             if daqhosts_for_combining=={}:
0379                 WARN("No runs satisfy the segment availability criteria. No jobs to submit.")
0380                 return {}
0381             INFO(f"{len(daqhosts_for_combining)} runs satisfy the segment availability criteria.")
0382             
0383             for runnumber in sorted(daqhosts_for_combining, reverse=True):
0384                 CHATTY(f"Currently to be created: {len(rule_matches)} output files.")
0385                 if self.job_config.max_jobs>0 and len(rule_matches) > self.job_config.max_jobs:
0386                     INFO(f"Number jobs is {len(rule_matches)}; exceeds max_jobs = {self.job_config.max_jobs}. Return.")
0387                     break
0388 
0389                 # GL1 is a must
0390                 if not 'gl1daq' in daqhosts_for_combining[runnumber]:
0391                     DEBUG(f"No GL1 file(s) for run {runnumber}")
0392                     continue
0393                 
0394                 ## Now check against production status and existing files
0395                 existing_output=self.get_files_in_db(runnumber)
0396                 if existing_output==[]:
0397                     DEBUG(f"No output files yet for run {runnumber}")
0398                 else:
0399                     DEBUG(f"Already have {len(existing_output)} output files for run {runnumber}")
0400 
0401                 existing_status=self.get_prod_status(runnumber)
0402                 if existing_status=={}:
0403                     DEBUG(f"No output files yet in the production db for run {runnumber}")
0404                 else:   
0405                     DEBUG(f"Already have {len(existing_status)} output files in the production db")
0406 
0407                 for leaf, daqhost in self.input_stem.items():
0408                     if daqhost=='gl1daq': # It needs to exist, but it doesn't need a separate job                        
0409                         continue
0410                     if daqhost not in daqhosts_for_combining[runnumber]:
0411                         CHATTY(f"No inputs from {daqhost} for run {runnumber}.")
0412                         continue
0413                     # We still could explicitly query the input files from the db here, but we already know that all segments are present
0414                     dsttype  = f'{self.dsttype}_{leaf}'
0415                     dsttype += f'_{self.dataset}'
0416                     outbase=f'{dsttype}_{self.outtriplet}'
0417                     # For combining, use segment 0 as key for logs and for existing output
0418                     logbase=f'{outbase}-{runnumber:{pRUNFMT}}-{0:{pSEGFMT}}'
0419                     dstfile=f'{logbase}.root'
0420                     if dstfile in existing_output:
0421                         CHATTY(f"Output file {dstfile} already exists. Not submitting.")
0422                         continue
0423 
0424                     if dstfile in existing_status:
0425                         WARN(f"Output file {dstfile} already has production status {existing_status[dstfile]}. Not submitting.")
0426                         continue
0427 
0428                     # DEBUG(f"Creating {dstfile} for run {runnumber} with {len(files_for_run[daqhost])} input segments")
0429                     DEBUG(f"Creating {dstfile} for run {runnumber}.")
0430 
0431                     rule_matches[dstfile] = [segswitch], outbase, logbase, runnumber, 0, daqhost, self.dsttype+'_'+leaf
0432 
0433             INFO(f'[Parsing time ] {(datetime.now() - start).total_seconds():.2f} seconds')
0434             return rule_matches
0435         else:
0436             return self.matches(subset_runlist)
0437 
0438     # ------------------------------------------------
0439     def matches(self, subset_runlist: List[int] = None) :
0440         ### Match parameters are set, now build up the list of inputs and construct corresponding output file names
0441         # Despite the "like" clause, this is a fast query. Extra cuts or substitute cuts like
0442         # 'and runnumber>={self.runMin} and runnumber<={self.runMax}'
0443         # can be added if the need arises.
0444         # Note: If the file database is not up to date, we can use a filesystem search in the output directory
0445         # Note: The db field in the yaml is for input queries only, all output queries go to the FileCatalog
0446 
0447         ####################################################################################
0448         ###### Now get all existing input files
0449         ####################################################################################
0450         # TODO: Support rule.printquery
0451 
0452         # Return early if there are no viable runs
0453         goodruns=self.good_runlist(subset_runlist)
0454         if goodruns==[]:
0455             INFO( "No runs pass run quality cuts.")
0456             return {}
0457         DEBUG(f"{len(goodruns)} runs pass run quality cuts.")
0458         CHATTY(f"Runlist: {goodruns}")
0459 
0460         # Manipulate the input types to match the database
0461         in_types=self.in_types # local copy, member is frozen
0462         
0463         # Transform list to ('<v1>','<v2>', ...) format. (one-liner doesn't work in python 3.9)
0464         in_types_str = f'( QUOTE{"QUOTE,QUOTE".join(in_types)}QUOTE )'
0465         in_types_str = in_types_str.replace("QUOTE","'")
0466 
0467         # Need status==1 for all files in a given run,host combination
0468         # Easier to check that after the SQL query
0469         infile_query = f"""select filename,dsttype as daqhost,runnumber,segment,'1' as status
0470         from {self.input_config.table}
0471         where dsttype in {in_types_str}
0472         """
0473         intriplet=self.input_config.intriplet
0474         if intriplet and intriplet!="":
0475             infile_query+=f"\tand tag='{intriplet}'"
0476         infile_query += self.input_config.infile_query_constraints
0477 
0478         #### Now build up potential output files from what's available
0479         start=datetime.now()
0480         rule_matches = {}
0481 
0482         ### Runnumber is the prime differentiator
0483         INFO(f"Resident Memory: {psutil.Process().memory_info().rss / 1024 / 1024} MB")
0484         for runnumber in sorted(goodruns, reverse=True):
0485             CHATTY(f"Currently to be created: {len(rule_matches)} output files.")
0486             if self.job_config.max_jobs>0 and len(rule_matches) > self.job_config.max_jobs:
0487                 INFO(f"Number jobs is {len(rule_matches)}; exceeds max_jobs = {self.job_config.max_jobs}. Return.")
0488                 break
0489 
0490             # Potential input files for this run
0491             run_query = infile_query + f"\n\t and runnumber={runnumber} "
0492             qnow=datetime.now()
0493             db_result = dbQuery( cnxn_string_map[ self.input_config.db ], run_query ).fetchall()
0494             INFO(f'Infile query took {(datetime.now() - qnow).total_seconds():.2f} seconds.')
0495             candidates = [ FileHostRunSegStat(c.filename,c.daqhost,c.runnumber,c.segment,c.status) for c in db_result ]
0496             CHATTY(f"Run: {runnumber}, Resident Memory: {psutil.Process().memory_info().rss / 1024 / 1024} MB")
0497             if len(candidates) == 0 :
0498                 DEBUG(f"No input files found for run {runnumber}. Skipping run.")
0499                 continue
0500             DEBUG(f"Found {len(candidates)} input files for run {runnumber}.")
0501 
0502             # Files to be created are checked against this list. Could use various attributes but most straightforward is just the filename
0503             existing_output=self.get_files_in_db(runnumber)
0504             if existing_output==[]:
0505                 DEBUG(f"No output files yet for run {runnumber}")
0506             else:
0507                 DEBUG(f"Already have {len(existing_output)} output files for run {runnumber}")
0508 
0509             existing_status=self.get_prod_status(runnumber)
0510             if existing_status=={}:
0511                 DEBUG(f"No output files yet in the production db for run {runnumber}")
0512             else:   
0513                 DEBUG(f"Already have {len(existing_status)} output files in the production db")
0514 
0515             ### Simplest case, 1-to-1:For every segment, there is exactly one output file, and exactly one input file from the previous step
0516             # If the output doesn't exist yet, use input files to create the job
0517             # TODO: or 'CALOFITTING' or many other job types
0518             if 'TRKR_SEED' in self.dsttype:
0519                 for infile in candidates:
0520                     if infile.segment % self.input_config.cut_segment != 0:
0521                         DEBUG(f"Skipping: segment {infile.segment} is not divisible by {self.input_config.cut_segment}")
0522                         continue
0523                     outbase=f'{self.dsttype}_{self.dataset}_{self.outtriplet}'
0524                     logbase= f'{outbase}-{infile.runnumber:{pRUNFMT}}-{infile.segment:{pSEGFMT}}'
0525                     dstfile = f'{logbase}.root'
0526                     if binary_contains_bisect(existing_output,dstfile):
0527                         CHATTY(f"Output file {dstfile} already exists. Not submitting.")
0528                         continue
0529                     if dstfile in existing_status:
0530                         DEBUG(f"Production status of {dstfile} is {existing_status[dstfile]}. Not submitting.")
0531                         continue
0532                     in_files_for_seg=[infile]
0533                     CHATTY(f"Creating {dstfile} from {in_files_for_seg}")
0534                     rule_matches[dstfile] = ["dbinput"], outbase, logbase, infile.runnumber, infile.segment, "dummy", self.dsttype
0535                 continue
0536 
0537             ####### NOT 1-1, requires more work:
0538             # For every segment, there is exactly one output file, and exactly one input file _from each stream_ OR from the previous step
0539             ######## Cut up the candidates into streams/daqhost≈ƒs
0540             candidates.sort(key=lambda x: (x.runnumber, x.daqhost)) # itertools.groupby depends on data being sorted
0541             files_for_run = { k : list(g) for
0542                               k, g in itertools.groupby(candidates, operator.attrgetter('daqhost')) }
0543             
0544             # daq file lists all need GL1 files. Pull them out and add them to the others
0545             if ( 'gl1daq' in in_types_str ):
0546                 ERROR("This should not happen.")
0547                 exit(1)
0548 
0549             ####### "Easy" case. One way to identify this case is to see if gl1 is not needed
0550             #  If the input has a segment number, then the output will have the same segment number
0551             #  - These are downstream objects (input is already a DST)
0552             #  - This can be 1-1 or many-to-1 (usually 2-1 for SEED + CLUSTER --> TRACKS)
0553             ### Get available input
0554             DEBUG("Getting available daq hosts for run {runnumber}")
0555 
0556             daqhost_query=f"select hostname,serverid from hostinfo where runnumber={runnumber}"
0557             daqhost_serverid=[ (c.hostname,c.serverid) for c in dbQuery( cnxn_string_map['daqr'], daqhost_query).fetchall() ]
0558             available_tpc=set()
0559             available_tracking=set()
0560             available_seb=set()
0561             for (hostname,serverid) in daqhost_serverid:
0562                 if hostname=='ebdc39' : # special case for TPOT
0563                     available_tracking.add(hostname)
0564                     continue
0565                 if 'ebdc' in hostname:
0566                     available_tpc.add(f"{hostname}_{serverid}")
0567                     continue
0568                 if 'seb' in hostname:
0569                     available_seb.add(hostname)
0570                     continue
0571                 # remainder is other tracking detectors (and gl1)
0572                 if not 'gl1' in hostname:
0573                     available_tracking.add(hostname)
0574                 
0575             DEBUG (f"Found {len(available_tpc)} TPC hosts in the run db")
0576             CHATTY(f"{available_tpc}")
0577             DEBUG (f"Found {len(available_tracking)} other tracking hosts in the run db")
0578             CHATTY(f"{available_tracking}")
0579             DEBUG (f"Found {len(available_seb)} sebXX hosts in the run db")
0580             CHATTY(f"{available_seb}")
0581             ### Here we could enforce both mandatory and masked hosts
0582 
0583             # Calo hardcoding
0584             minSEB=20
0585             if 'CALOFITTING' in self.dsttype:
0586                 # 1. How many SEB hosts are turned on in this run according to the daq db?
0587                 if len(available_seb) < minSEB and not self.physicsmode=='cosmics':
0588                     WARN(f"Skip run {runnumber}. Only {len(available_seb)} SEB hosts turned on in the run.")
0589                     continue
0590                 
0591                 # 2. How many are SEB host files have been produced and are currently available in this run.
0592                 present_seb_files=set()
0593                 for host in files_for_run:
0594                     for available in available_seb:
0595                         if available in host:
0596                             present_seb_files.add(host)
0597                             continue
0598                 if len(present_seb_files) < minSEB and not self.physicsmode=='cosmics':
0599                     WARN(f"Skip run {runnumber}. Only {len(present_seb_files)} SEB detectors actually in the run.")
0600                     missing_hosts = [host for host in available_seb if not any(host in present for present in present_seb_files)]
0601                     if missing_hosts:
0602                         WARN(f"Missing SEB hosts: {missing_hosts}")
0603                     continue
0604                 DEBUG (f"Found {len(present_seb_files)} SEB files in the catalog")
0605 
0606 
0607             # TPC hardcoding
0608             if 'TRKR_CLUSTER' in self.dsttype:
0609                 # 1. require at least N=30 out of the 48 ebdc_[0-24]_[01] to be turned on in the run
0610                 #    This is an early breakpoint to see if the run can be used for tracking
0611                 #    CHANGE 08/21/2025: On request from jdosbo, change back to requiring all ebdcs.
0612                 ### Important note: Requirement is NOT enforced for cosmics.
0613                 minNTPC=48
0614                 if len(available_tpc) < minNTPC and not self.physicsmode=='cosmics':
0615                     WARN(f"Skip run {runnumber}. Only {len(available_tpc)} TPC detectors turned on in the run.")
0616                     continue
0617                 
0618                 # 2. How many are TPC hosts are actually there in this run.
0619                 #    Not necessarily the same as above, if input DSTs aren't completely produced yet.
0620                 #    Other reason could be if the daq db is wrong.
0621                 present_tpc_files=set()
0622                 for host in files_for_run:
0623                     for available in available_tpc:
0624                         if available in host:
0625                             present_tpc_files.add(host)
0626                             continue                
0627                 if len(present_tpc_files) < minNTPC and not self.physicsmode=='cosmics':
0628                     WARN(f"Skip run {runnumber}. Only {len(present_tpc_files)} TPC detectors actually in the run.")
0629                     missing_hosts = [host for host in available_tpc if not any(host in present for present in present_tpc_files)]
0630                     if missing_hosts:
0631                         CHATTY(f"Missing TPC hosts: {missing_hosts}")
0632                     continue
0633                 DEBUG (f"Found {len(present_tpc_files)} TPC files in the catalog")
0634 
0635                 # 3. For INTT, MVTX, enforce that they're all available if possible
0636                 present_tracking=set(files_for_run).symmetric_difference(present_tpc_files)
0637                 CHATTY(f"Available non-TPC hosts in the daq db: {present_tracking}")
0638                 ### TODO: Only checking length here. Probably okay forever though.
0639                 if len(present_tracking) != len(available_tracking) and not self.physicsmode=='cosmics':
0640                     WARN(f"Skip run {runnumber}. Only {len(present_tracking)} non-TPC detectors actually in the run. {len(available_tracking)} possible.")
0641                     missing_hosts = [host for host in available_tracking if not any(host in present for present in present_tracking)]
0642                     if missing_hosts:
0643                         WARN(f"Missing non-TPC hosts: {missing_hosts}")
0644                     # WARN(f"Available non-TPC hosts in the daq db: {sorted(available_tracking)}")
0645                     # WARN(f"Present non-TPC leafs: {sorted(present_tracking)}")
0646                     continue
0647                 DEBUG (f"Found {len(present_tracking)} other tracking files in the catalog")
0648 
0649             # Sort and group the input files by segment. Reject if not all hosts are present in the segment yet
0650             segments = None
0651             rejected = set()
0652             for host in files_for_run:
0653                 files_for_run[host].sort(key=lambda x: (x.segment))
0654                 new_segments = list(map(lambda x: x.segment, files_for_run[host]))
0655                 if segments is None:
0656                     segments = new_segments
0657                 elif segments != new_segments:
0658                     rejected.update( set(segments).symmetric_difference(set(new_segments)) )
0659                     segments = list( set(segments).intersection(new_segments))
0660 
0661             if len(rejected) > 0  and not self.physicsmode=='cosmics' :
0662                 DEBUG(f"Run {runnumber}: Removed {len(rejected)} segments not present in all streams.")
0663                 CHATTY(f"Rejected segments: {rejected}")
0664 
0665             # If the output doesn't exist yet, use input files to create the job
0666             # outbase=f'{self.dsttype}_{self.outtriplet}_{self.outdataset}'
0667             outbase=f'{self.dsttype}_{self.dataset}_{self.outtriplet}'
0668             for seg in segments:
0669                 if seg % self.input_config.cut_segment != 0:
0670                     continue
0671                 logbase= f'{outbase}-{runnumber:{pRUNFMT}}-{seg:{pSEGFMT}}'
0672                 dstfile = f'{logbase}.root'
0673                 if dstfile in existing_output:
0674                     CHATTY(f"Output file {dstfile} already exists. Not submitting.")
0675                     continue
0676                 if dstfile in existing_status:
0677                     CHATTY(f"Output file {dstfile} already has production status {existing_status[dstfile]}. Not submitting.")
0678                     continue
0679                 rule_matches[dstfile] = ["dbinput"], outbase, logbase, runnumber, seg, "dummy", self.dsttype
0680             # \for run
0681         INFO(f'[Parsing time ] {(datetime.now() - start).total_seconds():.2f} seconds')
0682 
0683         return rule_matches
0684 # ============================================================================
0685 def parse_lfn(lfn: str, rule: RuleConfig) -> Tuple[str,...] :
0686     # Notably, input is not necessarily a true lfn, but:
0687     # If there's a colon, throw everything away after the first one; that's another parser's problem
0688     try:
0689         name=lfn.split(':')[0]
0690         name=Path(name).name # could throw an error instead if we're handed a full path.
0691         #  split at, and remove, run3auau_new_nocbdtag_v001, remainder is 'DST_...', '-00066582-00000.root' (or .finished)
0692         # dsttype,runsegend=name.split(f'_{rule.outtriplet}_{rule.dataset}')
0693         dsttype,runsegend=name.split(f'_{rule.dataset}_{rule.outtriplet}')
0694         if runsegend=='.root':
0695             raise ValueError("killkillkill")
0696         _,run,segend=runsegend.split('-')
0697         seg,end=segend.split('.')
0698     except ValueError as e:
0699         print(f"[parse_lfn] Caught error {e}")
0700         print(f"lfn = {lfn}")
0701         raise
0702         # else:
0703         #     exit(-1)
0704     return dsttype,int(run),int(seg),end
0705 
0706 
0707 # ============================================================================
0708 def parse_spiderstuff(filename: str) -> Tuple[str,...] :
0709     try:
0710         size=-1
0711         ctime=-1
0712         if 'size' in filename and 'ctime'in filename:
0713             lfn,_,nevents,_,first,_,last,_,md5,_,size,_,ctime,_,dbid = filename.split(':')
0714         else:
0715             lfn,_,nevents,_,first,_,last,_,md5,_,dbid = filename.split(':')
0716 
0717         lfn=Path(lfn).name
0718     except Exception as e:
0719         ERROR(f"Error: {e}")
0720         print(filename)
0721         print(filename.split(':'))
0722         exit(-1)
0723 
0724     return lfn,int(nevents),int(first),int(last),md5,int(size),int(ctime),int(dbid)
0725 
0726 # ============================================================================