Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 import yaml
0002 import re
0003 import glob
0004 from typing import Dict, List, Tuple, Any, Optional
0005 from dataclasses import dataclass, asdict
0006 from pathlib import Path
0007 import stat
0008 import subprocess
0009 import pprint # noqa: F401
0010 
0011 from simpleLogger import CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0012 from sphenixjobdicts import inputs_from_output
0013 from sphenixcondorjobs import CondorJobConfig,CondorJobConfig_fieldnames,glob_arguments_tmpl
0014 
0015 """ This file contains the dataclasses for the rule configuration and matching.
0016     It encapsulates what is tedious but hopefully easily understood instantiation
0017     from a YAML file, with some values being changed or completed by command line arguments of the caller.
0018     RuleConfig  represents a single rule configuration.
0019     InputConfig represents the input configuration sub-block.
0020     JobConfig   represents the job configuration sub-block.
0021 """
0022 
0023 # Striving to keep Dataclasses immutable (frozen=True)
0024 # All modifications and should be done in the constructor
0025 
0026 # ============================================================================
0027 # shared format strings and default filesystem paths
0028 RUNFMT = '%08i'
0029 SEGFMT = '%05i'
0030 VERFMT = '03d'
0031 pRUNFMT = RUNFMT.replace('%','').replace('i','d')
0032 pSEGFMT = SEGFMT.replace('%','').replace('i','d')
0033 
0034 # "{leafdir}" needs to stay changeable.  Typical leafdir: DST_STREAMING_EVENT_TPC20 or DST_TRKR_CLUSTER
0035 # "{rungroup}" needs to stay changeable. Typical rungroup: run_00057900_00058000
0036 # Target example:
0037 # /sphenix/lustre01/sphnxpro/{prodmode} / {period}  / {runtype} / outtriplet={build}_{dbtag}_{version} / {leafdir}       /     {rungroup}       /
0038 # /sphenix/lustre01/sphnxpro/production / run3auau  /  cosmics  /        new_nocdbtag_v000          / DST_CALOFITTING / run_00057900_00058000/
0039 _default_filesystem = {
0040     'outdir'   :    "/sphenix/lustre01/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}",
0041     'finaldir' :    "/sphenix/lustre01/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}",
0042     'logdir'   : "/sphenix/data/data02/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}/log",
0043     'histdir'  : "/sphenix/data/data02/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}/hist",
0044     'condor'   :          "/tmp/data02/sphnxpro/{prodmode}/{period}/{physicsmode}/{outtriplet}/{leafdir}/{rungroup}/log",
0045 }
0046 
0047 # ============================================================================
0048 def is_executable(file_path):
0049   """
0050     Checks if a file is executable.
0051 
0052     Args:
0053         file_path (str or Path): The path to the file.
0054 
0055     Returns:
0056         bool: True if the file is executable, False otherwise.
0057   """
0058   path = Path(file_path)
0059 
0060   if not path.is_file():
0061     return False
0062 
0063   st = path.stat()
0064   return bool(st.st_mode & stat.S_IXUSR or
0065               st.st_mode & stat.S_IXGRP or
0066               st.st_mode & stat.S_IXOTH)
0067 
0068 # ============================================================================
0069 def check_params(params_data: Dict[str, Any], required: List[str], optional: List[str] ) -> bool:
0070     """
0071     Check that all required parameters are present, and no unexpected ones.
0072     """
0073     check_clean = True
0074     for f in required:
0075         if f not in params_data:
0076             check_clean = False
0077             raise ValueError(f"Missing required field '{f}'.")
0078     # Have to iterate over a copy since we are deleting fields
0079     if optional:
0080         for f in params_data.copy():
0081             if f not in optional + required:
0082                 WARN( f"Unexpected field '{f}' in params. Removing, but you should clean up the yaml")
0083                 # raise ValueError(f"Unexpected field '{f}'.")
0084                 check_clean = False
0085                 del params_data[f]
0086 
0087     return check_clean
0088 
0089 # ============================================================================
0090 @dataclass( frozen = True )
0091 class InputConfig:
0092     """Represents the input configuration block in the YAML."""
0093     db: str
0094     table: str
0095     # Input descriptors. Optional because not needed for event combining
0096     intriplet:        str = None # ==tag, i.e. new_nocdbtag_v001
0097     indsttype:        List[str] = None # ['DST_STREAMING_EVENT_epcd01_0','DST_STREAMING_EVENT_epcd01_1'];
0098     indsttype_str:    str = None        # " ".join(indsttype) for SQL query
0099     # Rule name. Sometimes needed to identify input files
0100     rule_name: str = None
0101     # Run Quality
0102     min_run_events:   Optional[int] = None
0103     min_run_time:     Optional[int] = None
0104     combine_seg0_only:          Optional[bool] = True  # For combination jobs, use only segment 0. Default is yes. No effect for downstream jobs.
0105     choose20:         Optional[bool] = False  # Randomly choose 20% of available files
0106     cut_segment:      Optional[int] = 1       # For downstream jobs, submit only if segment % cut_segment == 0
0107     infile_query_constraints:   Optional[str] = None  # Additional constraints for the input filecatalog query.
0108     status_query_constraints:   Optional[str] = None  # Additional constraints for the production catalog query
0109     direct_path: Optional[str]                = None  # Make direct_path optional
0110 
0111 # ============================================================================
0112 @dataclass( frozen = True )
0113 class RuleConfig:
0114     """Represents a single rule configuration in the YAML."""
0115 
0116     # Direct input (explictly provided by yaml file + command line arguments)
0117     dsttype: str       # DST_CALOFITTING
0118     period: str         # run3auau
0119     build: str          # for output; ex. ana.472
0120     dbtag: str          # for output; ex. 2025p000, nocdbtag
0121     version: int        # for output; ex. 0, 1, 2. Can separate repeated productions
0122 
0123     # Inferred
0124     build_string: str   # ana472, new
0125     version_string: str # v000
0126     outtriplet: str     # new_2025p000_v000
0127     runlist_int: List[int] # name chosen to differentiate it from --runlist which points to a text file
0128     
0129     # Nested dataclasses
0130     input_config: InputConfig
0131     job_config:   Any  # CondorJobConfig created dynamically via make_dataclass
0132 
0133     ### Optional fields have to be down here to allow for default values
0134     physicsmode: str     # cosmics, commissioning, physics (default: physics)
0135     dataset: str         # run3cosmics for 'DST_STREAMING_EVENT_%_run3cosmics' in run3auau root directory (default: period)
0136     runlist: str = ""
0137 
0138     # ------------------------------------------------
0139     def dict(self) -> Dict[str, Any]:
0140         """Convert to a dictionary, handling nested dataclasses."""
0141         data = asdict(self)
0142 
0143         data['input'] = asdict(self.input_config)
0144         data['job']   = asdict(self.job_config)
0145         return data
0146 
0147     # ------------------------------------------------
0148     @classmethod
0149     def from_yaml(cls,
0150                   yaml_file: str, #  Used for paths
0151                   yaml_data: Dict[str, Any],
0152                   rule_name: str,
0153                   param_overrides=None,
0154                   ) -> "RuleConfig":
0155         """
0156         Constructs a RuleConfig object from a YAML data dictionary.
0157 
0158         Args:
0159             yaml_data: The dictionary loaded from the YAML file.
0160             rule_name: The name of the rule to extract from the YAML.
0161             param_overrides: A dictionary (usually originating from argparse) to override the YAML data and fill in placeholders.
0162 
0163         Returns:
0164             A RuleConfig object.
0165         """
0166         try:
0167             rule_data = yaml_data[rule_name]
0168         except KeyError:
0169             raise ValueError(f"Rule '{rule_name}' not found in YAML data.")
0170 
0171         if param_overrides is None:
0172             WARN("No rule substitutions provided. This may fail.")
0173             param_overrides = {}
0174 
0175         ### Extract and validate top level rule parameters
0176         params_data = rule_data.get("params", {})
0177         check_params(params_data
0178                     , required=["dsttype", "period","build", "dbtag", "version"]
0179                     , optional=["dataset", "physicsmode"] )
0180 
0181         ### Verify build tag exists on cvmfs before going further
0182         build_tag = params_data["build"]
0183         cvmfs_matches = glob.glob(
0184             f"/cvmfs/sphenix.sdcc.bnl.gov/alma9.2-gcc-14.2.0/release/release_*/{build_tag}"
0185         )
0186         if not cvmfs_matches:
0187             CRITICAL(
0188                 f"Build tag '{build_tag}' not found under "
0189                 f"/cvmfs/sphenix.sdcc.bnl.gov/alma9.2-gcc-14.2.0/release/release_*/ — "
0190                 f"check spelling or cvmfs availability."
0191             )
0192             exit(2)
0193         INFO(f"Build tag '{build_tag}' found: {cvmfs_matches[0]}")
0194 
0195         ### Fill derived data fields
0196         build_string=params_data["build"].replace(".","")
0197         version_string = f'v{params_data["version"]:{VERFMT}}'
0198         outtriplet = f'{build_string}_{params_data["dbtag"]}_{version_string}'
0199 
0200         ### Which runs to process?
0201         runs=param_overrides["runs"]
0202         runlist_filename=param_overrides.get("runlist")
0203         INFO(f"runs = {runs}")
0204         INFO(f"runlist = {runlist_filename}")
0205         runlist_int=None
0206         ## By default, run over "physics" runs in run3
0207         default_runmin=66456
0208         default_runmax=90000
0209         if runlist_filename: # white-space separated numbers from a file
0210             INFO(f"Processing runs from file: {runlist_filename}")
0211             try:
0212                 with open(runlist_filename, 'r') as file:
0213                     content = file.read()
0214             except FileNotFoundError:
0215                 ERROR(f"Error: Runlist file not found at {runlist_filename}")
0216                 exit(-1)
0217             try:
0218                 number_strings = re.findall(r"[-+]?\d+", content)
0219                 runlist_int=[int(runstr) for runstr in number_strings]
0220             except Exception as e:
0221                 ERROR(f"Error: Exception parsing runlist file {runlist_filename}: {e}")
0222         else: # Use "--runs". 0 for all default runs; 1, 2 numbers for a single run or a range; 3+ for an explicit list
0223             INFO(f"Processing runs argument: {runs}")
0224             if not runs:
0225                 WARN("Processing all runs.")
0226                 runs=['-1','-1']
0227             nargs=len( runs )
0228             if  nargs==1:
0229                 runlist_int=[int(runs[0])]
0230                 if runlist_int[0]<=0 :
0231                     ERROR(f"Can't run on single run {runlist_int[0]}")
0232             elif nargs==2:
0233                 runmin,runmax=tuple(map(int,runs))
0234                 if runmin<0:
0235                     runmin=default_runmin
0236                     WARN(f"Using runmin={runmin}")
0237                 if runmax<0:
0238                     runmax=default_runmax
0239                     WARN(f"Using runmax={runmax}")
0240                 runlist_int=list(range(runmin, runmax+1))
0241             else :
0242                 # dense command here, all it does is make a list of unique ints, and sort it
0243                 runlist_int=sorted(set(map(int,runs)))
0244                 # Remove non-positive entries while we're at it
0245                 runlist_int=[r for r in runlist_int if r>=0]
0246         if not runlist_int or runlist_int==[]:
0247             ERROR("Something's wrong parsing the runs to be processed. Maybe runmax < runmin?")
0248             exit(-1)
0249         CHATTY(f"Runlist: {runlist_int}")
0250 
0251         ### Optionals
0252         physicsmode = params_data.get("physicsmode", "physics")
0253         physicsmode = param_overrides.get("physicsmode", physicsmode)
0254 
0255         ###### Now create InputConfig and CondorJobConfig
0256         # Extract and validate input_config
0257         input_data = rule_data.get("input", {})
0258         check_params(input_data
0259                     , required=["db", "table"]
0260                     , optional=["intriplet",
0261                                 "min_run_events","min_run_time",
0262                                 "direct_path", "dataset",
0263                                 "combine_seg0_only","choose20",
0264                                 "cut_segment",
0265                                 "infile_query_constraints",
0266                                 "status_query_constraints","physicsmode"] )
0267 
0268         intriplet=input_data.get("intriplet")
0269         dsttype=params_data["dsttype"]
0270         input_stem = inputs_from_output[dsttype]
0271         CHATTY( f'Input files are of the form:\n{pprint.pformat(input_stem)}')
0272         if isinstance(input_stem, dict):
0273             indsttype = list(input_stem.values())
0274         elif isinstance(input_stem, list):
0275             indsttype = input_stem
0276         else:
0277             ERROR("Unrecognized type of input file descriptor {type(input_stem)}")
0278             exit(1)
0279         indsttype_str=",".join(indsttype)
0280         # indsttype_str=f"('{indsttype_str}')" ## Commented out. Adding parens here doesn't play well with handover to condor
0281 
0282         min_run_events=input_data.get("min_run_events",100000)
0283         min_run_time=input_data.get("min_run_time",300)
0284 
0285         combine_seg0_only=input_data.get("combine_seg0_only",False) # Default is false
0286         # If explicitly specified, argv overrides
0287         argv_combine_seg0_only=param_overrides.get("combine_seg0_only")
0288         if argv_combine_seg0_only is not None:
0289             combine_seg0_only=argv_combine_seg0_only
0290 
0291         choose20=input_data.get("choose20",False)
0292         argv_choose20=param_overrides.get("choose20")
0293         if argv_choose20 :
0294             choose20=True
0295         if choose20:
0296             CRITICAL("Option choose20 shouldn't be used.")
0297             exit(11)
0298 
0299         cut_segment = input_data.get("cut_segment", 1)
0300         argv_cut_segment = param_overrides.get("cut_segment")
0301         if argv_cut_segment is not None:
0302             cut_segment = argv_cut_segment
0303 
0304         ### Use choose20 only for combination jobs.
0305         if choose20 :
0306             if 'raw' in input_data["db"]:
0307                 WARN ("Selecting only 20% of good runs.")
0308             else:
0309                 WARN ("Option 'choose20' ignored for downstream production.")
0310                 choose20=False
0311 
0312         # Substitutions in direct input path, if given
0313         input_direct_path = input_data.get("direct_path")
0314         if input_direct_path is not None:
0315             input_direct_path = input_direct_path.format(mode=physicsmode)
0316             DEBUG (f"Using direct path {input_direct_path}")
0317 
0318         # Allow arbitrary query constraints to be added
0319         infile_query_constraints  = input_data.get("infile_query_constraints", "")
0320         infile_query_constraints += param_overrides.get("infile_query_constraints", "")
0321         status_query_constraints = input_data.get("status_query_constraints", "")
0322         status_query_constraints += param_overrides.get("status_query_constraints", "")
0323         DEBUG(f"Input query constraints: {infile_query_constraints}" )
0324         DEBUG(f"Status query constraints: {status_query_constraints}" )
0325 
0326         input_config=InputConfig(
0327             db=input_data["db"],
0328             table=input_data["table"],
0329             intriplet=intriplet,
0330             indsttype=indsttype,
0331             indsttype_str=indsttype_str,
0332             rule_name=rule_name,
0333             min_run_events=min_run_events,
0334             min_run_time=min_run_time,
0335             combine_seg0_only=combine_seg0_only,
0336             choose20=choose20,
0337             cut_segment=cut_segment,
0338             infile_query_constraints=infile_query_constraints,
0339             status_query_constraints=status_query_constraints,
0340             direct_path=input_direct_path,
0341         )
0342 
0343         # Extract and validate job_config
0344         job_data = rule_data.get("job", {})
0345         check_params(job_data
0346                     , required=[
0347                         "script", "payload", "neventsper","log","priority",
0348                         # "request_memory",  ## request_memory should be required; but we'll check on it later because "mem" is a deprecated synonym
0349                     ],
0350                      optional=None
0351                     )
0352 
0353         ### Some yaml parameters don't directly correspond to condor ones. Treat those first.
0354         # These are just named differently to reflect their template character
0355         job_data["log_tmpl"]=job_data.pop("log")
0356         arguments_tmpl=job_data.pop("arguments",None)
0357         if arguments_tmpl:
0358             # WARN("Using 'arguments' from the yaml file.")
0359             ERROR("Yaml rule contains 'arguments' field. That almost certainly means the file is outdated.")
0360             exit(1)
0361         else:
0362             arguments_tmpl=glob_arguments_tmpl
0363         job_data["arguments_tmpl"]=arguments_tmpl
0364 
0365         # Payload code etc.
0366         payload_list  = job_data.pop("payload")
0367         payload_list += param_overrides.get("payload_list",[])
0368         # Prepend by the yaml file's path unless they are direct
0369         yaml_path = Path(yaml_file).parent.resolve()
0370         for i,loc in enumerate(payload_list):
0371             if not loc.startswith("/"):
0372                 payload_list[i]= f'{yaml_path}/{loc}'
0373         DEBUG(f'List of payload items is {payload_list}')
0374 
0375         # Filesystem paths
0376         filesystem = _default_filesystem.copy()
0377         custom_fs = job_data.get("filesystem",None)
0378         if custom_fs:
0379             INFO("Updating default filesystem paths with custom paths from YAML file")
0380             filesystem.update(custom_fs)
0381         else:
0382             INFO("Using default filesystem paths")
0383 
0384         # Partially substitute placeholders.
0385         for key in filesystem:
0386             filesystem[key]=filesystem[key].format( prodmode=param_overrides["prodmode"],
0387                                                     period=params_data["period"],
0388                                                     physicsmode=physicsmode,
0389                                                     outtriplet=outtriplet,
0390                                                     leafdir='{leafdir}',
0391                                                     rungroup='{rungroup}',
0392                                                     )
0393             DEBUG(f"{key}:\t {filesystem[key]}")
0394         job_data["filesystem"]=filesystem
0395 
0396         # The executable
0397         script = job_data.pop("script")
0398         # Adjust the executable's path
0399         if not script.startswith("/"): # Search in the payload unless script has an absolute path
0400             p = subprocess.Popen(f'/usr/bin/find {" ".join(payload_list)} -type f',
0401                                  shell=True, # needed to expand "*"
0402                                  stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0403             stdout, stderr = p.communicate()
0404             errfiles = stderr.decode(errors='ignore').splitlines()
0405             if errfiles:
0406                 WARN("The following errors occurred while searching the payload:")
0407                 for errf in errfiles:
0408                     WARN(errf)
0409             allfiles = stdout.decode(errors='ignore').split()
0410             for f in allfiles:
0411                 if script == Path(f).name:
0412                     script = f
0413                     break
0414         INFO(f'Full path to script is {script}')
0415         if not Path(script).exists() :
0416             ERROR(f"Executable {script} does not exist")
0417             exit(1)
0418         if not is_executable(Path(script)):
0419             ERROR(f"{script} is not executable")
0420             exit(1)
0421         job_data["executable"]=script
0422 
0423         # Some tedium to deal with a now deprecated field.
0424         mem            = job_data.pop("mem",None)
0425         request_memory = job_data.get("request_memory",None)
0426         if mem:
0427             WARN("'mem' is deprecated, use 'request_memory' instead.")
0428             if not request_memory:
0429                 job_data["request_memory"]=mem
0430             elif request_memory != mem:
0431                 ERROR("Conflicting 'mem' (deprecated) and  'request_memory' fields.")
0432                 exit(1)
0433 
0434         # for k,v in job_data.items():
0435         #     print(f"{k}:\t {v}")
0436 
0437         # Partially fill param_overrides into the job data
0438         ## This isn't particularly elegant since it's self-referential.
0439         ## And you can't pass **job_data, which would be ideal, because of name clashes
0440         for field in 'batch_name', 'arguments_tmpl','log_tmpl':
0441             subsval = job_data.get(field)
0442             if not isinstance(subsval, str): # don't try changing None or dictionaries
0443                 continue
0444             subsval = subsval.format(
0445                 nevents=param_overrides["nevents"],
0446                 **params_data,
0447                 **filesystem,
0448                 **asdict(input_config),
0449                 payload=",".join(payload_list),
0450                 comment=job_data.get("comment",None),
0451                 neventsper=job_data.get("neventsper"),
0452                 buildarg=params_data["build"],
0453                 tag=params_data["dbtag"],
0454                 outtriplet=outtriplet,
0455                 # pass remaining per-job parameters forward to be replaced later
0456                 outbase='{outbase}',
0457                 logbase='{logbase}',
0458                 inbase='{inbase}',
0459                 run='{run}',
0460                 seg='{seg}',
0461                 daqhost='{daqhost}',
0462                 inputs='{inputs}',
0463             )
0464             job_data[field] = subsval
0465             DEBUG(f"After substitution, {field} is {subsval}")
0466         environment=f'SPHENIXPROD_SCRIPT_PATH={param_overrides.get("script_path","None")}'
0467         job_data["environment"]=environment
0468 
0469         # catch different production branches - prepend by branch if not main
0470         branch_name="main"
0471         try:
0472             result = subprocess.run(
0473                 [f"git -C {Path(__file__).parent} rev-parse --abbrev-ref HEAD"],
0474                 shell=True,
0475                 capture_output=True,
0476                 text=True,
0477                 check=True
0478             )
0479             branch_name = result.stdout.strip()
0480             CHATTY(f"Current Git branch: {branch_name}")
0481         except Exception as e:
0482             print(f"An error occurred: {e}")
0483         batch_name=job_data.pop("batch_name")
0484         job_data["batch_name"]=f"{branch_name}.{batch_name}"
0485 
0486         # Fill in all class fields.
0487         condor_job_dict={}
0488         for param in job_data:
0489             if param not in CondorJobConfig_fieldnames:
0490                 WARN( f"Unexpected field '{param}' in params. Removing, but you should clean up the yaml")
0491                 # raise ValueError(f"Unexpected field '{param}'.")
0492                 continue
0493             condor_job_dict[param] = job_data[param]
0494         del job_data         # Kill job_data - it's stale now and easily used accidentally
0495 
0496         ## Any remaining overrides
0497         priority=param_overrides.get("priority",None)
0498         if priority:
0499             condor_job_dict["priority"]=priority
0500 
0501         if param_overrides.get("request_memory",None):
0502             condor_job_dict["request_memory"]=param_overrides["request_memory"]
0503 
0504         condor_job_dict["max_jobs"]=param_overrides.get("max_jobs",0)
0505 
0506         if param_overrides.get("max_queued_jobs",0):
0507             condor_job_dict["max_queued_jobs"]=param_overrides["max_queued_jobs"]
0508             if condor_job_dict["max_jobs"] ==0:
0509                 condor_job_dict["max_jobs"]=condor_job_dict["max_queued_jobs"]
0510             if condor_job_dict["max_jobs"] > condor_job_dict["max_queued_jobs"]:
0511                 WARN("max_jobs exceeds max_queued_jobs. Adjusting max_jobs to match max_queued_jobs.")
0512                 condor_job_dict["max_jobs"]=condor_job_dict["max_queued_jobs"]
0513 
0514         request_memory=condor_job_dict.get("request_memory",None)         # Ensure sanity after the mem juggling act
0515         if not request_memory:
0516             raise ValueError( "Missing required field 'request_memory'.")
0517 
0518         #####  Now instantiate the main condor config object for all jobs
0519         job_config=CondorJobConfig(**condor_job_dict) # Do NOT forget the ** for Dictionary Unpacking
0520         DebugString="CondorJobConfig:\n"
0521         for k,v in asdict(job_config).items():
0522             DebugString += f"{k}:\t {v} \n"
0523         DEBUG(DebugString)
0524 
0525         ### With all preparations done, construct the constant RuleConfig object
0526         return cls(
0527             dsttype=dsttype,
0528             period=params_data["period"],
0529             physicsmode=physicsmode,
0530             dataset=params_data.get("dataset"),
0531             build=params_data["build"],
0532             dbtag=params_data["dbtag"],
0533             version=params_data["version"],
0534             build_string=build_string,
0535             version_string=version_string,
0536             outtriplet=outtriplet,
0537             runlist_int=runlist_int,
0538             runlist=runlist_filename or "",
0539             input_config=input_config,
0540             job_config=job_config,
0541         )
0542 
0543     # ------------------------------------------------
0544     @classmethod
0545     def from_yaml_file(cls, yaml_file: str, rule_name: str, param_overrides=None ) -> "RuleConfig":
0546         """
0547         Constructs a dictionary of RuleConfig objects from a YAML file.
0548 
0549         Args:
0550             yaml_file: The path to the YAML file.
0551 
0552         Returns:
0553             A RuleConfig objects, keyed by rule name.
0554         """
0555         try:
0556             with open(yaml_file, "r") as yamlstream:
0557                 yaml_data = yaml.safe_load(yamlstream)
0558         except yaml.YAMLError as exc:
0559             raise ValueError(f"Error parsing YAML file: {exc}")
0560         except FileNotFoundError:
0561             raise FileNotFoundError(f"YAML file not found: {yaml_file}")
0562 
0563         return cls.from_yaml(yaml_file=yaml_file,
0564                              yaml_data=yaml_data,
0565                              rule_name=rule_name,
0566                              param_overrides=param_overrides,
0567                             )
0568 
0569 # ============================================================================
0570 def parse_spiderstuff(filename: str) -> Tuple[str,...] :
0571     try:
0572         size=-1
0573         ctime=-1
0574         if 'size' in filename and 'ctime'in filename:
0575             lfn,_,nevents,_,first,_,last,_,md5,_,size,_,ctime,_,dbid = filename.split(':')
0576         else:
0577             lfn,_,nevents,_,first,_,last,_,md5,_,dbid = filename.split(':')
0578 
0579         lfn=Path(lfn).name
0580     except Exception as e:
0581         ERROR(f"Error: {e}")
0582         print(filename)
0583         print(filename.split(':'))
0584         exit(-1)
0585 
0586     return lfn,int(nevents),int(first),int(last),md5,int(size),int(ctime),int(dbid)
0587 
0588 # ============================================================================