File indexing completed on 2026-04-25 08:29:08
0001 from typing import Dict, List, Tuple, Set, Any
0002 import itertools
0003 import operator
0004 from dataclasses import dataclass, asdict
0005 from pathlib import Path
0006 import shutil
0007 from datetime import datetime
0008 import pprint
0009 import psutil
0010 import math
0011 from contextlib import nullcontext
0012
0013 from sphenixprodrules import RuleConfig, InputConfig
0014 from sphenixprodrules import pRUNFMT,pSEGFMT
0015 from sphenixdbutils import cnxn_string_map, dbQuery, list_to_condition
0016 from simpleLogger import CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0017 from sphenixjobdicts import inputs_from_output
0018 from sphenixmisc import binary_contains_bisect, shell_command
0019
0020 from collections import namedtuple
0021 FileHostRunSegStat = namedtuple('FileHostRunSeg',['filename','daqhost','runnumber','segment','status'])
0022
0023 """ This file contains the classes for matching runs and files to a rule.
0024 MatchConfig is the steering class for db queries to
0025 find appropriate input files and name the output files.
0026 It is constructed from a RuleConfig object.
0027 """
0028
0029
0030
0031
0032
0033
0034 @dataclass( frozen = True )
0035 class MatchConfig:
0036 dsttype: str
0037 runlist_int: str
0038 input_config: InputConfig
0039 dataset: str
0040 outtriplet: str
0041 physicsmode: str
0042 filesystem: Dict
0043 rungroup_tmpl: str
0044 job_config: Any
0045
0046
0047 dst_type_template: str
0048 in_types: Any
0049 input_stem: Any
0050
0051
0052 @classmethod
0053 def from_rule_config(cls, rule_config: RuleConfig):
0054 """
0055 Constructs a MatchConfig object partially from a RuleConfig object.
0056
0057 Args:
0058 rule_config: The RuleConfig object to extract data from.
0059
0060 Returns:
0061 A MatchConfig object with fields pre-populated from the RuleConfig.
0062 """
0063
0064 dsttype = rule_config.dsttype
0065 runlist_int = rule_config.runlist_int
0066 input_config = rule_config.input_config
0067 dataset = rule_config.dataset
0068 outtriplet = rule_config.outtriplet
0069 physicsmode = rule_config.physicsmode
0070 filesystem = rule_config.job_config.filesystem
0071 rungroup_tmpl = rule_config.job_config.rungroup_tmpl
0072 job_config = rule_config.job_config
0073
0074
0075 dst_type_template = f'{dsttype}'
0076
0077 if 'TRIGGERED' in dsttype or 'STREAMING' in dsttype:
0078 dst_type_template += '_%'
0079 dst_type_template += '%'
0080
0081
0082 input_stem = inputs_from_output[dsttype]
0083 CHATTY( f'Input files are of the form:\n{pprint.pformat(input_stem)}')
0084 if isinstance(input_stem, dict):
0085 in_types = list(input_stem.values())
0086 else :
0087 in_types = input_stem
0088 if 'raw' in input_config.db:
0089 in_types.insert(0,'gl1daq')
0090
0091 return cls(
0092 dsttype = dsttype,
0093 runlist_int = runlist_int,
0094 input_config = input_config,
0095 dataset = dataset,
0096 outtriplet = outtriplet,
0097 physicsmode = physicsmode,
0098 filesystem = filesystem,
0099 rungroup_tmpl = rungroup_tmpl,
0100 job_config = job_config,
0101
0102 dst_type_template = dst_type_template,
0103 in_types = in_types,
0104 input_stem = input_stem,
0105 )
0106
0107
0108 def dict(self):
0109 return { k: str(v) for k, v in asdict(self).items() if v is not None }
0110
0111
0112 def good_runlist(self, subset_runlist: List[int] = None) -> List[int]:
0113
0114 CHATTY(f"Resident Memory: {psutil.Process().memory_info().rss / 1024 / 1024:.0f} MB")
0115
0116
0117
0118 runlist_to_check = subset_runlist if subset_runlist is not None else self.runlist_int
0119
0120 INFO("Checking runlist against run quality cuts.")
0121 run_quality_tmpl="""
0122 select distinct(runnumber) from run
0123 where
0124 runnumber>={runmin} and runnumber <= {runmax}
0125 and
0126 runtype='{physicsmode}'
0127 and
0128 eventsinrun >= {min_run_events}
0129 and
0130 EXTRACT(EPOCH FROM (ertimestamp-brtimestamp)) >={min_run_time}
0131 order by runnumber
0132 ;"""
0133 run_quality_query=run_quality_tmpl.format(
0134 runmin=min(runlist_to_check),
0135 runmax=max(runlist_to_check),
0136 physicsmode=self.physicsmode,
0137 min_run_events=self.input_config.min_run_events,
0138 min_run_time=self.input_config.min_run_time,
0139 )
0140 goodruns=[ int(r) for (r,) in dbQuery( cnxn_string_map['daqr'], run_quality_query).fetchall() ]
0141
0142 runlist_int=[ run for run in runlist_to_check if run in goodruns ]
0143 if runlist_int==[]:
0144 return []
0145 INFO(f"{len(runlist_int)} runs pass run quality cuts.")
0146 DEBUG(f"Runlist: {runlist_int}")
0147 return runlist_int
0148
0149
0150 def get_files_in_db(self, runnumbers: Any) :
0151
0152 exist_query = f"""select filename from datasets
0153 where dataset='{self.dataset}'
0154 and tag='{self.outtriplet}'
0155 and dsttype like '{self.dst_type_template}'"""
0156
0157 run_condition=list_to_condition(runnumbers)
0158 if run_condition!="" :
0159 exist_query += f"\n\tand {run_condition}"
0160 existing_output = [ c.filename for c in dbQuery( cnxn_string_map['fcr'], exist_query ) ]
0161 existing_output.sort()
0162 return existing_output
0163
0164
0165 def get_output_files(self, filemask: str = r"\*.root:\*", dstlistname: str=None, dryrun: bool=True) -> List[str]:
0166
0167 find=shutil.which('find')
0168 lfind = shutil.which('lfs')
0169 if lfind is None:
0170 WARN("'lfs find' not found")
0171 lfind = shutil.which('find')
0172 else:
0173 lfind = f'{lfind} find'
0174 INFO(f'Using find={find} and lfind={lfind}.')
0175
0176 if dstlistname:
0177 INFO(f"Piping output to {dstlistname}")
0178 if not dryrun:
0179 Path(dstlistname).unlink(missing_ok=True)
0180 else:
0181 dstlistname="/dev/null"
0182 INFO(f"Dryrun. Piping output to {dstlistname}")
0183
0184 outlocation=self.filesystem['outdir']
0185
0186 finaldir=self.filesystem['finaldir']
0187 if finaldir != outlocation:
0188 ERROR("Found finaldir != outdir. Use/adapt dstlakespider instead." )
0189 print(f"finaldir = {finaldir}")
0190 print(f"outdir = {outlocation}")
0191 exit(1)
0192 INFO(f"Directory tree: {outlocation}")
0193
0194
0195 leafparent=outlocation.split('/{leafdir}')[0]
0196 leafdirs_cmd=rf"{find} {leafparent} -type d -name {self.dsttype}\* -mindepth 1 -a -maxdepth 1"
0197 leafdirs = shell_command(leafdirs_cmd)
0198 CHATTY(f"Leaf directories: \n{pprint.pformat(leafdirs)}")
0199
0200
0201 sorted_runlist = sorted(self.runlist_int)
0202 def rungroup(run):
0203 return self.rungroup_tmpl.format(a=100*math.floor(run/100), b=100*math.ceil((run+1)/100))
0204 desirable_rungroups = { rungroup(run) for run in sorted_runlist }
0205 runs_by_group = { group : [] for group in desirable_rungroups}
0206 outidentifier=f'{self.dataset}_{self.outtriplet}'
0207 for run in sorted_runlist:
0208
0209 runstr=f'{outidentifier}-{run:{pRUNFMT}}'
0210
0211 runs_by_group[rungroup(run)].append(runstr)
0212
0213
0214
0215
0216
0217
0218
0219
0220 ret=[]
0221
0222 tstart=datetime.now()
0223 with open(dstlistname,"w") if dstlistname else nullcontext() as dstlistfile:
0224 for leafdir in leafdirs :
0225 CHATTY(f"Searching {leafdir}")
0226 available_rungroups = shell_command(rf"{find} {leafdir} -name run_\* -type d -mindepth 1 -a -maxdepth 1")
0227 DEBUG(f"Resident Memory: {psutil.Process().memory_info().rss / 1024 / 1024:.0f} MB")
0228
0229
0230 rungroups = {rg for rg in available_rungroups if any( drg in rg for drg in desirable_rungroups) }
0231 DEBUG(f"For {leafdir}, we have {len(rungroups)} run groups to work on")
0232 for rungroup in rungroups:
0233 runs_str=runs_by_group[Path(rungroup).name]
0234 find_command=f"{lfind} {rungroup} -type f -name {filemask}"
0235 CHATTY(find_command)
0236 group_runs = shell_command(find_command)
0237
0238 group_runs = [ run for run in group_runs if any( dr in run for dr in runs_str) ]
0239 if dstlistfile:
0240 for run in group_runs:
0241 dstlistfile.write(f"{run}\n")
0242 else:
0243 ret += group_runs
0244 INFO(f"List creation took {(datetime.now() - tstart).total_seconds():.2f} seconds.")
0245 return ret
0246
0247
0248 def select_matches_for_combination(self, files_for_run: Dict[str, List[FileHostRunSegStat]],
0249 runnumber: int) -> Dict[str, List[FileHostRunSegStat]]:
0250 gl1_files = files_for_run.pop('gl1daq',None)
0251 if gl1_files is None:
0252 WARN(f"No GL1 files found for run {runnumber}. Skipping this run.")
0253 return {}
0254 CHATTY(f'All GL1 files for for run {runnumber}:\n{gl1_files}')
0255
0256
0257 segments=set()
0258 for host in files_for_run:
0259 for f in files_for_run[host]:
0260 if f.status==1:
0261 segments.add(f.segment)
0262 if segments:
0263 CHATTY(f"Run {runnumber} has {len(segments)} segments in the input streams: {sorted(segments)}")
0264
0265
0266
0267 return files_for_run
0268
0269
0270 def get_prod_status(self, runnumbers):
0271
0272 INFO(f'Checking for output already in production for {runnumbers}')
0273 run_condition=list_to_condition(runnumbers)
0274 if run_condition!="" :
0275 run_condition = f"and {run_condition.replace('runnumber','run')}"
0276
0277 status_query = f"""select dstfile,status from production_status
0278 where status!='finished'
0279 {run_condition}
0280 and dstname like '{self.dst_type_template}%{self.outtriplet}' {self.input_config.status_query_constraints}
0281 order by run desc;"""
0282
0283 now=datetime.now()
0284 existing_status = { c.dstfile : c.status for c in dbQuery( cnxn_string_map['statr'], status_query ) }
0285 INFO(f'Query took {(datetime.now() - now).total_seconds():.2f} seconds.')
0286 return existing_status
0287
0288
0289 def daqhosts_for_combining(self, subset_runlist: List[int] = None) -> Dict[int, Set[int]]:
0290
0291
0292
0293 goodruns=self.good_runlist(subset_runlist)
0294 if goodruns==[]:
0295 INFO( "No runs pass run quality cuts.")
0296 return {}
0297 INFO(f"{len(goodruns)} runs pass run quality cuts.")
0298 DEBUG(f"Runlist: {goodruns}")
0299 if goodruns==[]:
0300 return {}
0301 run_condition=list_to_condition(goodruns)
0302
0303
0304 if self.input_config.combine_seg0_only:
0305 INFO("Only combining segment 0. Skipping detailed checks.")
0306
0307
0308 lustre_query = "select runnumber,daqhost from datasets"
0309 lustre_query += f" WHERE {run_condition}"
0310 lustre_query += f" AND segment=0 AND status::int > 0"
0311 lustre_query += f" AND daqhost in {tuple(self.in_types)}"
0312 lustre_result = dbQuery( cnxn_string_map[ self.input_config.db ], lustre_query ).fetchall()
0313 daqhosts_for_combining = {}
0314 for r,h in lustre_result:
0315 if r not in daqhosts_for_combining:
0316 daqhosts_for_combining[r] = set()
0317 daqhosts_for_combining[r].add(h)
0318 for run in daqhosts_for_combining:
0319 CHATTY(f"Available on lustre for run {run}: {daqhosts_for_combining.get(run,set())}")
0320
0321 return daqhosts_for_combining
0322
0323
0324
0325 seg_query= "select runnumber,hostname,count(sequence) from filelist"
0326 seg_query+= f" WHERE {run_condition}"
0327 seg_query+= f" and hostname in {tuple(self.in_types)}"
0328 seg_query+= " group by runnumber,hostname;"
0329 seg_result = dbQuery( cnxn_string_map['daqr'], seg_query ).fetchall()
0330 run_segs = {}
0331 for r,h,s in seg_result:
0332 if r not in run_segs:
0333 run_segs[r] = {}
0334 run_segs[r][h] = s
0335
0336
0337 lustre_query = "select runnumber,daqhost,count(status) from datasets"
0338 lustre_query += f" where {run_condition}"
0339 lustre_query += f" and daqhost in {tuple(self.in_types)}"
0340 lustre_query += f" and status::int > 0"
0341 lustre_query += " group by runnumber,daqhost;"
0342 lustre_result = dbQuery( cnxn_string_map[ self.input_config.db ], lustre_query ).fetchall()
0343 lustre_segs = {}
0344 for r,h,s in lustre_result:
0345 if r not in lustre_segs:
0346 lustre_segs[r] = {}
0347 lustre_segs[r][h] = s
0348
0349
0350
0351 daqhosts_for_combining = {}
0352 for r, hosts in run_segs.items():
0353 CHATTY(f"Produced segments for run {r}: {hosts}")
0354 if r in lustre_segs:
0355 CHATTY(f"Available on lustre for run {r}: {lustre_segs[r]}")
0356 for h, s in hosts.items():
0357 if h in lustre_segs[r]:
0358 if s == lustre_segs[r][h]:
0359 daqhosts_for_combining[r] = daqhosts_for_combining.get(r, set())
0360 daqhosts_for_combining[r].add(h)
0361 CHATTY(f"Run {r} has all {s} segments for host {h}. Using this host.")
0362 else:
0363 CHATTY(f"Run {r} host {h} has only {lustre_segs[r][h]} out of {s} segments on lustre. Not using this host.")
0364
0365 return daqhosts_for_combining
0366
0367
0368 def devmatches(self, subset_runlist: List[int] = None) :
0369
0370
0371 start=datetime.now()
0372 if 'raw' in self.input_config.db:
0373 rule_matches = {}
0374 segswitch="seg0fromdb"
0375 if not self.input_config.combine_seg0_only:
0376 segswitch="allsegsfromdb"
0377 daqhosts_for_combining = self.daqhosts_for_combining(subset_runlist)
0378 if daqhosts_for_combining=={}:
0379 WARN("No runs satisfy the segment availability criteria. No jobs to submit.")
0380 return {}
0381 INFO(f"{len(daqhosts_for_combining)} runs satisfy the segment availability criteria.")
0382
0383 for runnumber in sorted(daqhosts_for_combining, reverse=True):
0384 CHATTY(f"Currently to be created: {len(rule_matches)} output files.")
0385 if self.job_config.max_jobs>0 and len(rule_matches) > self.job_config.max_jobs:
0386 INFO(f"Number jobs is {len(rule_matches)}; exceeds max_jobs = {self.job_config.max_jobs}. Return.")
0387 break
0388
0389
0390 if not 'gl1daq' in daqhosts_for_combining[runnumber]:
0391 DEBUG(f"No GL1 file(s) for run {runnumber}")
0392 continue
0393
0394
0395 existing_output=self.get_files_in_db(runnumber)
0396 if existing_output==[]:
0397 DEBUG(f"No output files yet for run {runnumber}")
0398 else:
0399 DEBUG(f"Already have {len(existing_output)} output files for run {runnumber}")
0400
0401 existing_status=self.get_prod_status(runnumber)
0402 if existing_status=={}:
0403 DEBUG(f"No output files yet in the production db for run {runnumber}")
0404 else:
0405 DEBUG(f"Already have {len(existing_status)} output files in the production db")
0406
0407 for leaf, daqhost in self.input_stem.items():
0408 if daqhost=='gl1daq':
0409 continue
0410 if daqhost not in daqhosts_for_combining[runnumber]:
0411 CHATTY(f"No inputs from {daqhost} for run {runnumber}.")
0412 continue
0413
0414 dsttype = f'{self.dsttype}_{leaf}'
0415 dsttype += f'_{self.dataset}'
0416 outbase=f'{dsttype}_{self.outtriplet}'
0417
0418 logbase=f'{outbase}-{runnumber:{pRUNFMT}}-{0:{pSEGFMT}}'
0419 dstfile=f'{logbase}.root'
0420 if dstfile in existing_output:
0421 CHATTY(f"Output file {dstfile} already exists. Not submitting.")
0422 continue
0423
0424 if dstfile in existing_status:
0425 WARN(f"Output file {dstfile} already has production status {existing_status[dstfile]}. Not submitting.")
0426 continue
0427
0428
0429 DEBUG(f"Creating {dstfile} for run {runnumber}.")
0430
0431 rule_matches[dstfile] = [segswitch], outbase, logbase, runnumber, 0, daqhost, self.dsttype+'_'+leaf
0432
0433 INFO(f'[Parsing time ] {(datetime.now() - start).total_seconds():.2f} seconds')
0434 return rule_matches
0435 else:
0436 return self.matches(subset_runlist)
0437
0438
0439 def matches(self, subset_runlist: List[int] = None) :
0440
0441
0442
0443
0444
0445
0446
0447
0448
0449
0450
0451
0452
0453 goodruns=self.good_runlist(subset_runlist)
0454 if goodruns==[]:
0455 INFO( "No runs pass run quality cuts.")
0456 return {}
0457 DEBUG(f"{len(goodruns)} runs pass run quality cuts.")
0458 CHATTY(f"Runlist: {goodruns}")
0459
0460
0461 in_types=self.in_types
0462
0463
0464 in_types_str = f'( QUOTE{"QUOTE,QUOTE".join(in_types)}QUOTE )'
0465 in_types_str = in_types_str.replace("QUOTE","'")
0466
0467
0468
0469 infile_query = f"""select filename,dsttype as daqhost,runnumber,segment,'1' as status
0470 from {self.input_config.table}
0471 where dsttype in {in_types_str}
0472 """
0473 intriplet=self.input_config.intriplet
0474 if intriplet and intriplet!="":
0475 infile_query+=f"\tand tag='{intriplet}'"
0476 infile_query += self.input_config.infile_query_constraints
0477
0478
0479 start=datetime.now()
0480 rule_matches = {}
0481
0482
0483 INFO(f"Resident Memory: {psutil.Process().memory_info().rss / 1024 / 1024} MB")
0484 for runnumber in sorted(goodruns, reverse=True):
0485 CHATTY(f"Currently to be created: {len(rule_matches)} output files.")
0486 if self.job_config.max_jobs>0 and len(rule_matches) > self.job_config.max_jobs:
0487 INFO(f"Number jobs is {len(rule_matches)}; exceeds max_jobs = {self.job_config.max_jobs}. Return.")
0488 break
0489
0490
0491 run_query = infile_query + f"\n\t and runnumber={runnumber} "
0492 qnow=datetime.now()
0493 db_result = dbQuery( cnxn_string_map[ self.input_config.db ], run_query ).fetchall()
0494 INFO(f'Infile query took {(datetime.now() - qnow).total_seconds():.2f} seconds.')
0495 candidates = [ FileHostRunSegStat(c.filename,c.daqhost,c.runnumber,c.segment,c.status) for c in db_result ]
0496 CHATTY(f"Run: {runnumber}, Resident Memory: {psutil.Process().memory_info().rss / 1024 / 1024} MB")
0497 if len(candidates) == 0 :
0498 DEBUG(f"No input files found for run {runnumber}. Skipping run.")
0499 continue
0500 DEBUG(f"Found {len(candidates)} input files for run {runnumber}.")
0501
0502
0503 existing_output=self.get_files_in_db(runnumber)
0504 if existing_output==[]:
0505 DEBUG(f"No output files yet for run {runnumber}")
0506 else:
0507 DEBUG(f"Already have {len(existing_output)} output files for run {runnumber}")
0508
0509 existing_status=self.get_prod_status(runnumber)
0510 if existing_status=={}:
0511 DEBUG(f"No output files yet in the production db for run {runnumber}")
0512 else:
0513 DEBUG(f"Already have {len(existing_status)} output files in the production db")
0514
0515
0516
0517
0518 if 'TRKR_SEED' in self.dsttype:
0519 for infile in candidates:
0520 if infile.segment % self.input_config.cut_segment != 0:
0521 DEBUG(f"Skipping: segment {infile.segment} is not divisible by {self.input_config.cut_segment}")
0522 continue
0523 outbase=f'{self.dsttype}_{self.dataset}_{self.outtriplet}'
0524 logbase= f'{outbase}-{infile.runnumber:{pRUNFMT}}-{infile.segment:{pSEGFMT}}'
0525 dstfile = f'{logbase}.root'
0526 if binary_contains_bisect(existing_output,dstfile):
0527 CHATTY(f"Output file {dstfile} already exists. Not submitting.")
0528 continue
0529 if dstfile in existing_status:
0530 DEBUG(f"Production status of {dstfile} is {existing_status[dstfile]}. Not submitting.")
0531 continue
0532 in_files_for_seg=[infile]
0533 CHATTY(f"Creating {dstfile} from {in_files_for_seg}")
0534 rule_matches[dstfile] = ["dbinput"], outbase, logbase, infile.runnumber, infile.segment, "dummy", self.dsttype
0535 continue
0536
0537
0538
0539
0540 candidates.sort(key=lambda x: (x.runnumber, x.daqhost))
0541 files_for_run = { k : list(g) for
0542 k, g in itertools.groupby(candidates, operator.attrgetter('daqhost')) }
0543
0544
0545 if ( 'gl1daq' in in_types_str ):
0546 ERROR("This should not happen.")
0547 exit(1)
0548
0549
0550
0551
0552
0553
0554 DEBUG("Getting available daq hosts for run {runnumber}")
0555
0556 daqhost_query=f"select hostname,serverid from hostinfo where runnumber={runnumber}"
0557 daqhost_serverid=[ (c.hostname,c.serverid) for c in dbQuery( cnxn_string_map['daqr'], daqhost_query).fetchall() ]
0558 available_tpc=set()
0559 available_tracking=set()
0560 available_seb=set()
0561 for (hostname,serverid) in daqhost_serverid:
0562 if hostname=='ebdc39' :
0563 available_tracking.add(hostname)
0564 continue
0565 if 'ebdc' in hostname:
0566 available_tpc.add(f"{hostname}_{serverid}")
0567 continue
0568 if 'seb' in hostname:
0569 available_seb.add(hostname)
0570 continue
0571
0572 if not 'gl1' in hostname:
0573 available_tracking.add(hostname)
0574
0575 DEBUG (f"Found {len(available_tpc)} TPC hosts in the run db")
0576 CHATTY(f"{available_tpc}")
0577 DEBUG (f"Found {len(available_tracking)} other tracking hosts in the run db")
0578 CHATTY(f"{available_tracking}")
0579 DEBUG (f"Found {len(available_seb)} sebXX hosts in the run db")
0580 CHATTY(f"{available_seb}")
0581
0582
0583
0584 minSEB=20
0585 if 'CALOFITTING' in self.dsttype:
0586
0587 if len(available_seb) < minSEB and not self.physicsmode=='cosmics':
0588 WARN(f"Skip run {runnumber}. Only {len(available_seb)} SEB hosts turned on in the run.")
0589 continue
0590
0591
0592 present_seb_files=set()
0593 for host in files_for_run:
0594 for available in available_seb:
0595 if available in host:
0596 present_seb_files.add(host)
0597 continue
0598 if len(present_seb_files) < minSEB and not self.physicsmode=='cosmics':
0599 WARN(f"Skip run {runnumber}. Only {len(present_seb_files)} SEB detectors actually in the run.")
0600 missing_hosts = [host for host in available_seb if not any(host in present for present in present_seb_files)]
0601 if missing_hosts:
0602 WARN(f"Missing SEB hosts: {missing_hosts}")
0603 continue
0604 DEBUG (f"Found {len(present_seb_files)} SEB files in the catalog")
0605
0606
0607
0608 if 'TRKR_CLUSTER' in self.dsttype:
0609
0610
0611
0612
0613 minNTPC=48
0614 if len(available_tpc) < minNTPC and not self.physicsmode=='cosmics':
0615 WARN(f"Skip run {runnumber}. Only {len(available_tpc)} TPC detectors turned on in the run.")
0616 continue
0617
0618
0619
0620
0621 present_tpc_files=set()
0622 for host in files_for_run:
0623 for available in available_tpc:
0624 if available in host:
0625 present_tpc_files.add(host)
0626 continue
0627 if len(present_tpc_files) < minNTPC and not self.physicsmode=='cosmics':
0628 WARN(f"Skip run {runnumber}. Only {len(present_tpc_files)} TPC detectors actually in the run.")
0629 missing_hosts = [host for host in available_tpc if not any(host in present for present in present_tpc_files)]
0630 if missing_hosts:
0631 CHATTY(f"Missing TPC hosts: {missing_hosts}")
0632 continue
0633 DEBUG (f"Found {len(present_tpc_files)} TPC files in the catalog")
0634
0635
0636 present_tracking=set(files_for_run).symmetric_difference(present_tpc_files)
0637 CHATTY(f"Available non-TPC hosts in the daq db: {present_tracking}")
0638
0639 if len(present_tracking) != len(available_tracking) and not self.physicsmode=='cosmics':
0640 WARN(f"Skip run {runnumber}. Only {len(present_tracking)} non-TPC detectors actually in the run. {len(available_tracking)} possible.")
0641 missing_hosts = [host for host in available_tracking if not any(host in present for present in present_tracking)]
0642 if missing_hosts:
0643 WARN(f"Missing non-TPC hosts: {missing_hosts}")
0644
0645
0646 continue
0647 DEBUG (f"Found {len(present_tracking)} other tracking files in the catalog")
0648
0649
0650 segments = None
0651 rejected = set()
0652 for host in files_for_run:
0653 files_for_run[host].sort(key=lambda x: (x.segment))
0654 new_segments = list(map(lambda x: x.segment, files_for_run[host]))
0655 if segments is None:
0656 segments = new_segments
0657 elif segments != new_segments:
0658 rejected.update( set(segments).symmetric_difference(set(new_segments)) )
0659 segments = list( set(segments).intersection(new_segments))
0660
0661 if len(rejected) > 0 and not self.physicsmode=='cosmics' :
0662 DEBUG(f"Run {runnumber}: Removed {len(rejected)} segments not present in all streams.")
0663 CHATTY(f"Rejected segments: {rejected}")
0664
0665
0666
0667 outbase=f'{self.dsttype}_{self.dataset}_{self.outtriplet}'
0668 for seg in segments:
0669 if seg % self.input_config.cut_segment != 0:
0670 continue
0671 logbase= f'{outbase}-{runnumber:{pRUNFMT}}-{seg:{pSEGFMT}}'
0672 dstfile = f'{logbase}.root'
0673 if dstfile in existing_output:
0674 CHATTY(f"Output file {dstfile} already exists. Not submitting.")
0675 continue
0676 if dstfile in existing_status:
0677 CHATTY(f"Output file {dstfile} already has production status {existing_status[dstfile]}. Not submitting.")
0678 continue
0679 rule_matches[dstfile] = ["dbinput"], outbase, logbase, runnumber, seg, "dummy", self.dsttype
0680
0681 INFO(f'[Parsing time ] {(datetime.now() - start).total_seconds():.2f} seconds')
0682
0683 return rule_matches
0684
0685 def parse_lfn(lfn: str, rule: RuleConfig) -> Tuple[str,...] :
0686
0687
0688 try:
0689 name=lfn.split(':')[0]
0690 name=Path(name).name
0691
0692
0693 dsttype,runsegend=name.split(f'_{rule.dataset}_{rule.outtriplet}')
0694 if runsegend=='.root':
0695 raise ValueError("killkillkill")
0696 _,run,segend=runsegend.split('-')
0697 seg,end=segend.split('.')
0698 except ValueError as e:
0699 print(f"[parse_lfn] Caught error {e}")
0700 print(f"lfn = {lfn}")
0701 raise
0702
0703
0704 return dsttype,int(run),int(seg),end
0705
0706
0707
0708 def parse_spiderstuff(filename: str) -> Tuple[str,...] :
0709 try:
0710 size=-1
0711 ctime=-1
0712 if 'size' in filename and 'ctime'in filename:
0713 lfn,_,nevents,_,first,_,last,_,md5,_,size,_,ctime,_,dbid = filename.split(':')
0714 else:
0715 lfn,_,nevents,_,first,_,last,_,md5,_,dbid = filename.split(':')
0716
0717 lfn=Path(lfn).name
0718 except Exception as e:
0719 ERROR(f"Error: {e}")
0720 print(filename)
0721 print(filename.split(':'))
0722 exit(-1)
0723
0724 return lfn,int(nevents),int(first),int(last),md5,int(size),int(ctime),int(dbid)
0725
0726