Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:07

0001 #!/usr/bin/env python
0002 
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import subprocess
0009 import sys
0010 import shutil
0011 import os
0012 import math
0013 from typing import List
0014 
0015 # from dataclasses import fields
0016 import pprint # noqa F401
0017 
0018 from argparsing import submission_args
0019 from sphenixmisc import setup_rot_handler, should_I_quit, make_chunks
0020 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0021 from sphenixprodrules import RuleConfig,inputs_from_output
0022 from sphenixprodrules import parse_lfn,parse_spiderstuff
0023 from sphenixdbutils import test_mode as dbutils_test_mode
0024 from sphenixdbutils import long_filedb_info, filedb_info, full_db_info, upsert_filecatalog, update_proddb  # noqa: F401
0025 from sphenixmisc import binary_contains_bisect
0026 
0027 # ============================================================================================
0028 def shell_command(command: str) -> List[str]:
0029     """Minimal wrapper to hide away subbprocess tedium"""
0030     DEBUG(f"[shell_command] Command: {command}")
0031     ret=[]
0032     try:
0033         ret = subprocess.run(command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()
0034     except subprocess.CalledProcessError as e:
0035         WARN("[shell_command] Command failed with exit code:", e.returncode)
0036     finally:
0037         pass
0038 
0039     DEBUG(f"[shell_command] Return value length is {len(ret)}.")
0040     return ret
0041 
0042 # ============================================================================================
0043 
0044 def main():
0045     ### digest arguments
0046     args = submission_args()
0047 
0048     #################### Test mode?
0049     test_mode = (
0050             dbutils_test_mode
0051             or args.test_mode
0052             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0053         )
0054 
0055     # Set up submission logging before going any further
0056     sublogdir=setup_rot_handler(args)
0057     slogger.setLevel(args.loglevel)
0058     
0059     # Exit without fuss if we are already running 
0060     if should_I_quit(args=args, myname=sys.argv[0]):
0061         DEBUG("Stop.")
0062         exit(0)
0063     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0064 
0065     if args.profile:
0066         DEBUG(f"Profiling is ENABLED.")
0067         profiler = cProfile.Profile()
0068         profiler.enable()    
0069     
0070     if test_mode:
0071         INFO("Running in testbed mode.")
0072         args.mangle_dirpath = 'production-testbed'
0073     else:
0074         INFO("Running in production mode.")
0075 
0076     #################### Rule has steering parameters and two subclasses for input and job specifics
0077     # Rule is instantiated via the yaml reader.
0078 
0079     ### Parse command line arguments into a substitution dictionary
0080     # This dictionary is passed to the ctor to override/customize yaml file parameters
0081     # Note: The following could all be hidden away in the RuleConfig ctor
0082     # but this way, CLI arguments are used by the function that received them and
0083     # constraint constructions are visibly handled away from the RuleConfig class
0084     param_overrides = {}
0085     param_overrides["runs"]=args.runs
0086     param_overrides["runlist"]=args.runlist
0087     param_overrides["nevents"] = 0 # Not relevant, but needed for the RuleConfig ctor
0088         
0089     # Rest of the input substitutions
0090     if args.physicsmode is not None:
0091         param_overrides["physicsmode"] = args.physicsmode # e.g. physics
0092 
0093     # filesystem is the base for all output, allow for mangling here
0094     # "production" (in the default filesystem) is replaced
0095     param_overrides["prodmode"] = "production"
0096     if args.mangle_dirpath:
0097         param_overrides["prodmode"] = args.mangle_dirpath
0098 
0099     CHATTY(f"Rule substitutions: {param_overrides}")
0100     INFO("Now loading and building rule configuration.")
0101 
0102     #################### Load specific rule from the given yaml file.
0103     try:
0104         rule =  RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0105         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0106     except (ValueError, FileNotFoundError) as e:
0107         ERROR(f"Error: {e}")
0108         exit(1)
0109 
0110     CHATTY("Rule configuration:")
0111     CHATTY(yaml.dump(rule.dict))
0112     
0113     filesystem = rule.job_config.filesystem
0114     DEBUG(f"Filesystem: {filesystem}")
0115 
0116     ### Which find command to use for lustre?
0117     # Lustre's robin hood, rbh-find, doesn't offer advantages for our usecase, and it is more cumbersome to use.
0118     # But "lfs find" is preferrable to the regular kind.
0119     lfind = shutil.which('lfs')
0120     if lfind is None:
0121         WARN("'lfs find' not found")
0122         lfind = shutil.which('find')
0123     else:
0124         lfind = f'{lfind} find'
0125     INFO(f'Using "{lfind}.')
0126 
0127     ##################### DSTs, from lustre to lustre
0128     # Original output directory, the final destination, and the file name trunk
0129     dstbase = f'{rule.dsttype}\*{rule.dataset}_{rule.outtriplet}\*'
0130     # dstbase = f'{rule.dsttype}\*{rule.outtriplet}_{rule.dataset}\*' ## WRONG
0131     INFO(f'DST files filtered as {dstbase}')
0132     lakelocation=filesystem['outdir']
0133     INFO(f"Original output directory: {lakelocation}")
0134 
0135     ### Use or create a list file containing all the existing lake files to work on.
0136     ### This reduces memory footprint and repeated slow `find` commands for large amounts of files
0137     # Use the name of the lake directory
0138     #lakelistname=lakelocation
0139     lakelistname=filesystem['logdir']
0140     lakelistname=lakelistname.split("{")[0]
0141     while lakelistname.endswith("/"):
0142         lakelistname=lakelistname[0:-1]
0143     lakelistname=f"{lakelistname}/{rule.dsttype}_lakelist"
0144     lakelistlock=lakelistname+".lock"
0145     # First, lock. This way multiple spiders can work on a file without stepping on each others' (8) toes
0146     if Path(lakelistlock).exists():
0147         WARN(f"Lock file {lakelistlock} already exists, indicating another spider is running over the same rule.")
0148         # Safety valve. If it's old, we assume some job didn't end gracefully and proceed anyway.
0149         mod_timestamp = Path(lakelistlock).stat().st_mtime 
0150         mod_datetime = datetime.fromtimestamp(mod_timestamp) 
0151         time_difference = datetime.now() - mod_datetime
0152         threshold = 8 * 60 * 60
0153         if time_difference.total_seconds() > threshold:
0154             WARN(f"lock file is already {time_difference.total_seconds()} seconds old. Overriding.")
0155         else:
0156             exit(0)
0157     if not args.dryrun:
0158         Path(lakelistlock).parent.mkdir(parents=True,exist_ok=True)
0159         Path(lakelistlock).touch()
0160     INFO(f"Looking for existing filelist {lakelistname}")
0161     if not Path(lakelistname).exists():
0162         INFO(" ... not found. Creating a new one.")
0163         findcommand=f"{lfind} {lakelocation} -type f -name {dstbase}\*.root\* > {lakelistname}; wc -l {lakelistname}"
0164         DEBUG(f"Using:\n{findcommand}")
0165         ret = shell_command(findcommand)
0166         INFO(f"Found {ret[0]} matching dsts without cuts in the lake, piped into {ret[1]}")
0167     else:
0168         wccommand=f"wc -l {lakelistname}"
0169         ret = shell_command(wccommand)
0170         INFO(f" ... found. List contains {ret[0]} files.")
0171 
0172     ### Grab the first N files and work on those.
0173     nfiles_to_process=500000
0174     exhausted=False
0175     lakefiles=[]
0176     tmpname=f"{lakelistname}.tmp"
0177     with open(lakelistname,"r") as infile, open(f"{lakelistname}.tmp", "w") as smallerlakefile:
0178         for _ in range(nfiles_to_process):
0179             line=infile.readline()
0180             if line:
0181                 lakefiles.append(line.strip())
0182             else:
0183                 exhausted=True
0184                 break
0185         for line in infile:
0186             smallerlakefile.write(line)
0187     if not args.dryrun:
0188         shutil.move(tmpname,lakelistname)
0189         if exhausted: # Used up the existing list.
0190             INFO("Used up all previously found lake files. Next call will create a new list")
0191             Path(lakelistname).unlink(missing_ok=True)
0192     else:
0193         Path(tmpname).unlink(missing_ok=True)
0194     # Done with selecting or creating our chunk, release the lock
0195     if not args.dryrun:
0196         Path(lakelistlock).unlink()
0197 
0198     ### Collect root files that satisfy run and dbid requirements
0199     mvfiles_info=[]
0200     for file in lakefiles:
0201         lfn=Path(file).name
0202         dsttype,run,seg,_=parse_lfn(lfn,rule)
0203         if binary_contains_bisect(rule.runlist_int,run):
0204             fullpath,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(file)
0205             if dbid <= 0:
0206                 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0207                 exit(0)
0208             info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5,size,ctime)
0209             mvfiles_info.append( (file,info) )
0210             
0211     INFO(f"{len(mvfiles_info)} total root files to be processed.")
0212     
0213     finaldir_tmpl=filesystem['finaldir']
0214     INFO(f"Final destination template: {finaldir_tmpl}")
0215 
0216     input_stubs = inputs_from_output[rule.dsttype]
0217     DEBUG(f"Input stub(s): {input_stubs}")
0218     dataset = rule.dataset
0219     INFO(f"Dataset identifier: {dataset}")
0220     leaf_template = f'{rule.dsttype}'
0221     if 'raw' in rule.input_config.db:
0222         leaf_template += '_{host}'
0223         leaf_types = { f'{leaf_template}'.format(host=host) for host in input_stubs.keys() }
0224     else:
0225         leaf_types=[rule.dsttype]
0226     INFO(f"Destination type template: {leaf_template}")
0227     DEBUG(f"Destination types: {leaf_types}")
0228     
0229     ####################################### Start moving and registering DSTs
0230     tstart = datetime.now()
0231     tlast = tstart
0232     chunksize=2000
0233     fmax=len(mvfiles_info)
0234     
0235     chunked_mvfiles = make_chunks(mvfiles_info, chunksize)
0236     for i, chunk in enumerate(chunked_mvfiles):
0237         now = datetime.now()            
0238         print( f'DST #{i*chunksize}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({chunksize/(now - tlast).total_seconds():.2f} Hz). ' )
0239         print( f'                   time since the start:       \t {(now - tstart).total_seconds():.2f} seconds (cum. {i*chunksize/(now - tstart).total_seconds():.2f} Hz). ' )
0240         tlast = now
0241 
0242         fullinfo_chunk=[]
0243         seen_lfns=set()
0244         for file_and_info in chunk:
0245             file,info=file_and_info
0246             dsttype,run,seg,lfn,nevents,first,last,md5,size,time=info
0247             ## lfn duplication can happen for unclean productions. Detect here.
0248             ## We could try and id the "best" one but that's pricey for a rare occasion. Just delete the file and move on.
0249             if lfn in seen_lfns:
0250                 WARN(f"We already have a file with lfn {lfn}. Deleting {file}.")
0251                 Path(file).unlink(missing_ok=True)
0252                 continue
0253             seen_lfns.add(lfn)
0254 
0255             # Check if we recognize the file name
0256             leaf=None
0257             for leaf_type in leaf_types:
0258                 if lfn.startswith(leaf_type):
0259                     leaf=leaf_type
0260                     break
0261             if leaf is None:
0262                 ERROR(f"Unknown file type: {lfn}")
0263                 ERROR(f"Full file name: {file}")
0264                 exit(-1)
0265 
0266             ### Fill in templates and save full information
0267             rungroup= rule.job_config.rungroup_tmpl.format(a=100*math.floor(run/100), b=100*math.ceil((run+1)/100))
0268             finaldir = finaldir_tmpl.format( leafdir=leaf, rungroup=rungroup )
0269             # Create destination dir if it doesn't exit. Can't be done elsewhere/earlier, we need the full relevant runnumber range
0270             if not args.dryrun:
0271                 Path(finaldir).mkdir( parents=True, exist_ok=True )
0272 
0273             full_file_path = f'{finaldir}/{lfn}'
0274             fullinfo_chunk.append(full_db_info(
0275                 origfile=file,
0276                 info=info,
0277                 lfn=lfn,
0278                 full_file_path=full_file_path,
0279                 dataset=rule.dataset,
0280                 tag=rule.outtriplet,
0281                 ))
0282             # end of chunk creation loop
0283             
0284         ###### Here be dragons        
0285         ### Register first, then move. 
0286         try:
0287             upsert_filecatalog(fullinfos=fullinfo_chunk,
0288                            dryrun=args.dryrun # only prints the query if True
0289                            )
0290         except Exception as e:
0291             WARN(f"dstspider is ignoring the database exception and moving on.")
0292             ### database errors can happen when there are multiples of a file in the lake.
0293             ### Why _that_ happens should be investigated, but here, we can just move on to the next chunk.
0294             continue
0295             exit(1)
0296         if not args.dryrun:
0297             for fullinfo in fullinfo_chunk:
0298                 try:
0299                     os.rename( fullinfo.origfile, fullinfo.full_file_path )
0300                     # shutil.move( fullinfo.origfile, fullinfo.full_file_path )
0301                 except Exception as e:
0302                     WARN(e)
0303                     # exit(-1)
0304                 # end of chunk move loop
0305             # dryrun?
0306         pass # End of DST loop 
0307 
0308     if args.profile:
0309         profiler.disable()
0310         DEBUG("Profiling finished. Printing stats...")
0311         stats = pstats.Stats(profiler)
0312         stats.strip_dirs().sort_stats('time').print_stats(10)
0313 
0314     INFO(f"{Path(sys.argv[0]).name} DONE.")
0315         
0316 # ============================================================================================
0317 
0318 if __name__ == '__main__':
0319     main()
0320     exit(0)