File indexing completed on 2026-04-25 08:29:08
0001 import yaml
0002 import re
0003 import glob
0004 from typing import Dict, List, Tuple, Any, Optional
0005 from dataclasses import dataclass, asdict
0006 from pathlib import Path
0007 import stat
0008 import subprocess
0009 import pprint
0010
0011 from simpleLogger import CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0012 from sphenixjobdicts import inputs_from_output
0013 from sphenixcondorjobs import CondorJobConfig,CondorJobConfig_fieldnames,glob_arguments_tmpl
0014
0015 """ This file contains the dataclasses for the rule configuration and matching.
0016 It encapsulates what is tedious but hopefully easily understood instantiation
0017 from a YAML file, with some values being changed or completed by command line arguments of the caller.
0018 RuleConfig represents a single rule configuration.
0019 InputConfig represents the input configuration sub-block.
0020 JobConfig represents the job configuration sub-block.
0021 """
0022
0023
0024
0025
0026
0027
0028 RUNFMT = '%08i'
0029 SEGFMT = '%05i'
0030 VERFMT = '03d'
0031 pRUNFMT = RUNFMT.replace('%','').replace('i','d')
0032 pSEGFMT = SEGFMT.replace('%','').replace('i','d')
0033
0034
0035
0036
0037
0038
0039 _default_filesystem = {
0040 'outdir' : "/sphenix/lustre01/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}",
0041 'finaldir' : "/sphenix/lustre01/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}",
0042 'logdir' : "/sphenix/data/data02/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}/log",
0043 'histdir' : "/sphenix/data/data02/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}/hist",
0044 'condor' : "/tmp/data02/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}/log",
0045 }
0046
0047
0048 def is_executable(file_path):
0049 """
0050 Checks if a file is executable.
0051
0052 Args:
0053 file_path (str or Path): The path to the file.
0054
0055 Returns:
0056 bool: True if the file is executable, False otherwise.
0057 """
0058 path = Path(file_path)
0059
0060 if not path.is_file():
0061 return False
0062
0063 st = path.stat()
0064 return bool(st.st_mode & stat.S_IXUSR or
0065 st.st_mode & stat.S_IXGRP or
0066 st.st_mode & stat.S_IXOTH)
0067
0068
0069 def check_params(params_data: Dict[str, Any], required: List[str], optional: List[str] ) -> bool:
0070 """
0071 Check that all required parameters are present, and no unexpected ones.
0072 """
0073 check_clean = True
0074 for f in required:
0075 if f not in params_data:
0076 check_clean = False
0077 raise ValueError(f"Missing required field '{f}'.")
0078
0079 if optional:
0080 for f in params_data.copy():
0081 if f not in optional + required:
0082 WARN( f"Unexpected field '{f}' in params. Removing, but you should clean up the yaml")
0083
0084 check_clean = False
0085 del params_data[f]
0086
0087 return check_clean
0088
0089
0090 @dataclass( frozen = True )
0091 class InputConfig:
0092 """Represents the input configuration block in the YAML."""
0093 db: str
0094 table: str
0095
0096 intriplet: str = None
0097 indsttype: List[str] = None
0098 indsttype_str: str = None
0099
0100 rule_name: str = None
0101
0102 min_run_events: Optional[int] = None
0103 min_run_time: Optional[int] = None
0104 combine_seg0_only: Optional[bool] = True
0105 choose20: Optional[bool] = False
0106 cut_segment: Optional[int] = 1
0107 infile_query_constraints: Optional[str] = None
0108 status_query_constraints: Optional[str] = None
0109 direct_path: Optional[str] = None
0110
0111
0112 @dataclass( frozen = True )
0113 class RuleConfig:
0114 """Represents a single rule configuration in the YAML."""
0115
0116
0117 dsttype: str
0118 period: str
0119 build: str
0120 dbtag: str
0121 version: int
0122
0123
0124 build_string: str
0125 version_string: str
0126 outtriplet: str
0127 runlist_int: List[int]
0128
0129
0130 input_config: InputConfig
0131 job_config: Any
0132
0133
0134 physicsmode: str
0135 dataset: str
0136 runlist: str = ""
0137
0138
0139 def dict(self) -> Dict[str, Any]:
0140 """Convert to a dictionary, handling nested dataclasses."""
0141 data = asdict(self)
0142
0143 data['input'] = asdict(self.input_config)
0144 data['job'] = asdict(self.job_config)
0145 return data
0146
0147
0148 @classmethod
0149 def from_yaml(cls,
0150 yaml_file: str,
0151 yaml_data: Dict[str, Any],
0152 rule_name: str,
0153 param_overrides=None,
0154 ) -> "RuleConfig":
0155 """
0156 Constructs a RuleConfig object from a YAML data dictionary.
0157
0158 Args:
0159 yaml_data: The dictionary loaded from the YAML file.
0160 rule_name: The name of the rule to extract from the YAML.
0161 param_overrides: A dictionary (usually originating from argparse) to override the YAML data and fill in placeholders.
0162
0163 Returns:
0164 A RuleConfig object.
0165 """
0166 try:
0167 rule_data = yaml_data[rule_name]
0168 except KeyError:
0169 raise ValueError(f"Rule '{rule_name}' not found in YAML data.")
0170
0171 if param_overrides is None:
0172 WARN("No rule substitutions provided. This may fail.")
0173 param_overrides = {}
0174
0175
0176 params_data = rule_data.get("params", {})
0177 check_params(params_data
0178 , required=["dsttype", "period","build", "dbtag", "version"]
0179 , optional=["dataset", "physicsmode"] )
0180
0181
0182 build_tag = params_data["build"]
0183 cvmfs_matches = glob.glob(
0184 f"/cvmfs/sphenix.sdcc.bnl.gov/alma9.2-gcc-14.2.0/release/release_*/{build_tag}"
0185 )
0186 if not cvmfs_matches:
0187 CRITICAL(
0188 f"Build tag '{build_tag}' not found under "
0189 f"/cvmfs/sphenix.sdcc.bnl.gov/alma9.2-gcc-14.2.0/release/release_*/ — "
0190 f"check spelling or cvmfs availability."
0191 )
0192 exit(2)
0193 INFO(f"Build tag '{build_tag}' found: {cvmfs_matches[0]}")
0194
0195
0196 build_string=params_data["build"].replace(".","")
0197 version_string = f'v{params_data["version"]:{VERFMT}}'
0198 outtriplet = f'{build_string}_{params_data["dbtag"]}_{version_string}'
0199
0200
0201 runs=param_overrides["runs"]
0202 runlist_filename=param_overrides.get("runlist")
0203 INFO(f"runs = {runs}")
0204 INFO(f"runlist = {runlist_filename}")
0205 runlist_int=None
0206
0207 default_runmin=66456
0208 default_runmax=90000
0209 if runlist_filename:
0210 INFO(f"Processing runs from file: {runlist_filename}")
0211 try:
0212 with open(runlist_filename, 'r') as file:
0213 content = file.read()
0214 except FileNotFoundError:
0215 ERROR(f"Error: Runlist file not found at {runlist_filename}")
0216 exit(-1)
0217 try:
0218 number_strings = re.findall(r"[-+]?\d+", content)
0219 runlist_int=[int(runstr) for runstr in number_strings]
0220 except Exception as e:
0221 ERROR(f"Error: Exception parsing runlist file {runlist_filename}: {e}")
0222 else:
0223 INFO(f"Processing runs argument: {runs}")
0224 if not runs:
0225 WARN("Processing all runs.")
0226 runs=['-1','-1']
0227 nargs=len( runs )
0228 if nargs==1:
0229 runlist_int=[int(runs[0])]
0230 if runlist_int[0]<=0 :
0231 ERROR(f"Can't run on single run {runlist_int[0]}")
0232 elif nargs==2:
0233 runmin,runmax=tuple(map(int,runs))
0234 if runmin<0:
0235 runmin=default_runmin
0236 WARN(f"Using runmin={runmin}")
0237 if runmax<0:
0238 runmax=default_runmax
0239 WARN(f"Using runmax={runmax}")
0240 runlist_int=list(range(runmin, runmax+1))
0241 else :
0242
0243 runlist_int=sorted(set(map(int,runs)))
0244
0245 runlist_int=[r for r in runlist_int if r>=0]
0246 if not runlist_int or runlist_int==[]:
0247 ERROR("Something's wrong parsing the runs to be processed. Maybe runmax < runmin?")
0248 exit(-1)
0249 CHATTY(f"Runlist: {runlist_int}")
0250
0251
0252 physicsmode = params_data.get("physicsmode", "physics")
0253 physicsmode = param_overrides.get("physicsmode", physicsmode)
0254
0255
0256
0257 input_data = rule_data.get("input", {})
0258 check_params(input_data
0259 , required=["db", "table"]
0260 , optional=["intriplet",
0261 "min_run_events","min_run_time",
0262 "direct_path", "dataset",
0263 "combine_seg0_only","choose20",
0264 "cut_segment",
0265 "infile_query_constraints",
0266 "status_query_constraints","physicsmode"] )
0267
0268 intriplet=input_data.get("intriplet")
0269 dsttype=params_data["dsttype"]
0270 input_stem = inputs_from_output[dsttype]
0271 CHATTY( f'Input files are of the form:\n{pprint.pformat(input_stem)}')
0272 if isinstance(input_stem, dict):
0273 indsttype = list(input_stem.values())
0274 elif isinstance(input_stem, list):
0275 indsttype = input_stem
0276 else:
0277 ERROR("Unrecognized type of input file descriptor {type(input_stem)}")
0278 exit(1)
0279 indsttype_str=",".join(indsttype)
0280
0281
0282 min_run_events=input_data.get("min_run_events",100000)
0283 min_run_time=input_data.get("min_run_time",300)
0284
0285 combine_seg0_only=input_data.get("combine_seg0_only",False)
0286
0287 argv_combine_seg0_only=param_overrides.get("combine_seg0_only")
0288 if argv_combine_seg0_only is not None:
0289 combine_seg0_only=argv_combine_seg0_only
0290
0291 choose20=input_data.get("choose20",False)
0292 argv_choose20=param_overrides.get("choose20")
0293 if argv_choose20 :
0294 choose20=True
0295 if choose20:
0296 CRITICAL("Option choose20 shouldn't be used.")
0297 exit(11)
0298
0299 cut_segment = input_data.get("cut_segment", 1)
0300 argv_cut_segment = param_overrides.get("cut_segment")
0301 if argv_cut_segment is not None:
0302 cut_segment = argv_cut_segment
0303
0304
0305 if choose20 :
0306 if 'raw' in input_data["db"]:
0307 WARN ("Selecting only 20% of good runs.")
0308 else:
0309 WARN ("Option 'choose20' ignored for downstream production.")
0310 choose20=False
0311
0312
0313 input_direct_path = input_data.get("direct_path")
0314 if input_direct_path is not None:
0315 input_direct_path = input_direct_path.format(mode=physicsmode)
0316 DEBUG (f"Using direct path {input_direct_path}")
0317
0318
0319 infile_query_constraints = input_data.get("infile_query_constraints", "")
0320 infile_query_constraints += param_overrides.get("infile_query_constraints", "")
0321 status_query_constraints = input_data.get("status_query_constraints", "")
0322 status_query_constraints += param_overrides.get("status_query_constraints", "")
0323 DEBUG(f"Input query constraints: {infile_query_constraints}" )
0324 DEBUG(f"Status query constraints: {status_query_constraints}" )
0325
0326 input_config=InputConfig(
0327 db=input_data["db"],
0328 table=input_data["table"],
0329 intriplet=intriplet,
0330 indsttype=indsttype,
0331 indsttype_str=indsttype_str,
0332 rule_name=rule_name,
0333 min_run_events=min_run_events,
0334 min_run_time=min_run_time,
0335 combine_seg0_only=combine_seg0_only,
0336 choose20=choose20,
0337 cut_segment=cut_segment,
0338 infile_query_constraints=infile_query_constraints,
0339 status_query_constraints=status_query_constraints,
0340 direct_path=input_direct_path,
0341 )
0342
0343
0344 job_data = rule_data.get("job", {})
0345 check_params(job_data
0346 , required=[
0347 "script", "payload", "neventsper","log","priority",
0348
0349 ],
0350 optional=None
0351 )
0352
0353
0354
0355 job_data["log_tmpl"]=job_data.pop("log")
0356 arguments_tmpl=job_data.pop("arguments",None)
0357 if arguments_tmpl:
0358
0359 ERROR("Yaml rule contains 'arguments' field. That almost certainly means the file is outdated.")
0360 exit(1)
0361 else:
0362 arguments_tmpl=glob_arguments_tmpl
0363 job_data["arguments_tmpl"]=arguments_tmpl
0364
0365
0366 payload_list = job_data.pop("payload")
0367 payload_list += param_overrides.get("payload_list",[])
0368
0369 yaml_path = Path(yaml_file).parent.resolve()
0370 for i,loc in enumerate(payload_list):
0371 if not loc.startswith("/"):
0372 payload_list[i]= f'{yaml_path}/{loc}'
0373 DEBUG(f'List of payload items is {payload_list}')
0374
0375
0376 filesystem = _default_filesystem.copy()
0377 custom_fs = job_data.get("filesystem",None)
0378 if custom_fs:
0379 INFO("Updating default filesystem paths with custom paths from YAML file")
0380 filesystem.update(custom_fs)
0381 else:
0382 INFO("Using default filesystem paths")
0383
0384
0385 for key in filesystem:
0386 filesystem[key]=filesystem[key].format( prodmode=param_overrides["prodmode"],
0387 period=params_data["period"],
0388 physicsmode=physicsmode,
0389 outtriplet=outtriplet,
0390 leafdir='{leafdir}',
0391 rungroup='{rungroup}',
0392 )
0393 DEBUG(f"{key}:\t {filesystem[key]}")
0394 job_data["filesystem"]=filesystem
0395
0396
0397 script = job_data.pop("script")
0398
0399 if not script.startswith("/"):
0400 p = subprocess.Popen(f'/usr/bin/find {" ".join(payload_list)} -type f',
0401 shell=True,
0402 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0403 stdout, stderr = p.communicate()
0404 errfiles = stderr.decode(errors='ignore').splitlines()
0405 if errfiles:
0406 WARN("The following errors occurred while searching the payload:")
0407 for errf in errfiles:
0408 WARN(errf)
0409 allfiles = stdout.decode(errors='ignore').split()
0410 for f in allfiles:
0411 if script == Path(f).name:
0412 script = f
0413 break
0414 INFO(f'Full path to script is {script}')
0415 if not Path(script).exists() :
0416 ERROR(f"Executable {script} does not exist")
0417 exit(1)
0418 if not is_executable(Path(script)):
0419 ERROR(f"{script} is not executable")
0420 exit(1)
0421 job_data["executable"]=script
0422
0423
0424 mem = job_data.pop("mem",None)
0425 request_memory = job_data.get("request_memory",None)
0426 if mem:
0427 WARN("'mem' is deprecated, use 'request_memory' instead.")
0428 if not request_memory:
0429 job_data["request_memory"]=mem
0430 elif request_memory != mem:
0431 ERROR("Conflicting 'mem' (deprecated) and 'request_memory' fields.")
0432 exit(1)
0433
0434
0435
0436
0437
0438
0439
0440 for field in 'batch_name', 'arguments_tmpl','log_tmpl':
0441 subsval = job_data.get(field)
0442 if not isinstance(subsval, str):
0443 continue
0444 subsval = subsval.format(
0445 nevents=param_overrides["nevents"],
0446 **params_data,
0447 **filesystem,
0448 **asdict(input_config),
0449 payload=",".join(payload_list),
0450 comment=job_data.get("comment",None),
0451 neventsper=job_data.get("neventsper"),
0452 buildarg=params_data["build"],
0453 tag=params_data["dbtag"],
0454 outtriplet=outtriplet,
0455
0456 outbase='{outbase}',
0457 logbase='{logbase}',
0458 inbase='{inbase}',
0459 run='{run}',
0460 seg='{seg}',
0461 daqhost='{daqhost}',
0462 inputs='{inputs}',
0463 )
0464 job_data[field] = subsval
0465 DEBUG(f"After substitution, {field} is {subsval}")
0466 environment=f'SPHENIXPROD_SCRIPT_PATH={param_overrides.get("script_path","None")}'
0467 job_data["environment"]=environment
0468
0469
0470 branch_name="main"
0471 try:
0472 result = subprocess.run(
0473 [f"git -C {Path(__file__).parent} rev-parse --abbrev-ref HEAD"],
0474 shell=True,
0475 capture_output=True,
0476 text=True,
0477 check=True
0478 )
0479 branch_name = result.stdout.strip()
0480 CHATTY(f"Current Git branch: {branch_name}")
0481 except Exception as e:
0482 print(f"An error occurred: {e}")
0483 batch_name=job_data.pop("batch_name")
0484 job_data["batch_name"]=f"{branch_name}.{batch_name}"
0485
0486
0487 condor_job_dict={}
0488 for param in job_data:
0489 if param not in CondorJobConfig_fieldnames:
0490 WARN( f"Unexpected field '{param}' in params. Removing, but you should clean up the yaml")
0491
0492 continue
0493 condor_job_dict[param] = job_data[param]
0494 del job_data
0495
0496
0497 priority=param_overrides.get("priority",None)
0498 if priority:
0499 condor_job_dict["priority"]=priority
0500
0501 if param_overrides.get("request_memory",None):
0502 condor_job_dict["request_memory"]=param_overrides["request_memory"]
0503
0504 condor_job_dict["max_jobs"]=param_overrides.get("max_jobs",0)
0505
0506 if param_overrides.get("max_queued_jobs",0):
0507 condor_job_dict["max_queued_jobs"]=param_overrides["max_queued_jobs"]
0508 if condor_job_dict["max_jobs"] ==0:
0509 condor_job_dict["max_jobs"]=condor_job_dict["max_queued_jobs"]
0510 if condor_job_dict["max_jobs"] > condor_job_dict["max_queued_jobs"]:
0511 WARN("max_jobs exceeds max_queued_jobs. Adjusting max_jobs to match max_queued_jobs.")
0512 condor_job_dict["max_jobs"]=condor_job_dict["max_queued_jobs"]
0513
0514 request_memory=condor_job_dict.get("request_memory",None)
0515 if not request_memory:
0516 raise ValueError( "Missing required field 'request_memory'.")
0517
0518
0519 job_config=CondorJobConfig(**condor_job_dict)
0520 DebugString="CondorJobConfig:\n"
0521 for k,v in asdict(job_config).items():
0522 DebugString += f"{k}:\t {v} \n"
0523 DEBUG(DebugString)
0524
0525
0526 return cls(
0527 dsttype=dsttype,
0528 period=params_data["period"],
0529 physicsmode=physicsmode,
0530 dataset=params_data.get("dataset"),
0531 build=params_data["build"],
0532 dbtag=params_data["dbtag"],
0533 version=params_data["version"],
0534 build_string=build_string,
0535 version_string=version_string,
0536 outtriplet=outtriplet,
0537 runlist_int=runlist_int,
0538 runlist=runlist_filename or "",
0539 input_config=input_config,
0540 job_config=job_config,
0541 )
0542
0543
0544 @classmethod
0545 def from_yaml_file(cls, yaml_file: str, rule_name: str, param_overrides=None ) -> "RuleConfig":
0546 """
0547 Constructs a dictionary of RuleConfig objects from a YAML file.
0548
0549 Args:
0550 yaml_file: The path to the YAML file.
0551
0552 Returns:
0553 A RuleConfig objects, keyed by rule name.
0554 """
0555 try:
0556 with open(yaml_file, "r") as yamlstream:
0557 yaml_data = yaml.safe_load(yamlstream)
0558 except yaml.YAMLError as exc:
0559 raise ValueError(f"Error parsing YAML file: {exc}")
0560 except FileNotFoundError:
0561 raise FileNotFoundError(f"YAML file not found: {yaml_file}")
0562
0563 return cls.from_yaml(yaml_file=yaml_file,
0564 yaml_data=yaml_data,
0565 rule_name=rule_name,
0566 param_overrides=param_overrides,
0567 )
0568
0569
0570 def parse_spiderstuff(filename: str) -> Tuple[str,...] :
0571 try:
0572 size=-1
0573 ctime=-1
0574 if 'size' in filename and 'ctime'in filename:
0575 lfn,_,nevents,_,first,_,last,_,md5,_,size,_,ctime,_,dbid = filename.split(':')
0576 else:
0577 lfn,_,nevents,_,first,_,last,_,md5,_,dbid = filename.split(':')
0578
0579 lfn=Path(lfn).name
0580 except Exception as e:
0581 ERROR(f"Error: {e}")
0582 print(filename)
0583 print(filename.split(':'))
0584 exit(-1)
0585
0586 return lfn,int(nevents),int(first),int(last),md5,int(size),int(ctime),int(dbid)
0587
0588