Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:40

0001 #!/usr/bin/env python
0002 
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import subprocess
0008 import sys
0009 import shutil
0010 import math
0011 from typing import List
0012 
0013 # from dataclasses import fields
0014 import pprint # noqa F401
0015 
0016 from argparsing import submission_args
0017 from sphenixmisc import setup_rot_handler, should_I_quit
0018 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0019 from sphenixprodrules import RuleConfig,inputs_from_output
0020 from sphenixprodrules import parse_lfn,parse_spiderstuff
0021 from sphenixdbutils import test_mode as dbutils_test_mode
0022 from sphenixdbutils import filedb_info, upsert_filecatalog, update_proddb  # noqa: F401
0023 from sphenixmisc import binary_contains_bisect
0024 
0025 # ============================================================================================
0026 def shell_command(command: str) -> List[str]:
0027     """Minimal wrapper to hide away subbprocess tedium"""
0028     DEBUG(f"[shell_command] Command: {command}")
0029     ret=[]
0030     try:
0031         ret = subprocess.run(command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()
0032     except subprocess.CalledProcessError as e:
0033         WARN("[shell_command] Command failed with exit code:", e.returncode)
0034     finally:
0035         pass
0036 
0037     DEBUG(f"[shell_command] Found {len(ret)} matches.")
0038     return ret
0039 
0040 # ============================================================================================
0041 
0042 def main():
0043     ### digest arguments
0044     args = submission_args()
0045 
0046     #################### Test mode?
0047     test_mode = (
0048             dbutils_test_mode
0049             or args.test_mode
0050             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0051         )
0052 
0053     # Set up submission logging before going any further
0054     sublogdir=setup_rot_handler(args)
0055     slogger.setLevel(args.loglevel)
0056     
0057     # Exit without fuss if we are already running 
0058     if should_I_quit(args=args, myname=sys.argv[0]):
0059         DEBUG("Stop.")
0060         exit(0)
0061     
0062     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0063 
0064     if test_mode:
0065         INFO("Running in testbed mode.")
0066         args.mangle_dirpath = 'production-testbed'
0067     else:
0068         INFO("Running in production mode.")
0069 
0070     #################### Rule has steering parameters and two subclasses for input and job specifics
0071     # Rule is instantiated via the yaml reader.
0072 
0073     ### Parse command line arguments into a substitution dictionary
0074     # This dictionary is passed to the ctor to override/customize yaml file parameters
0075     # Note: The following could all be hidden away in the RuleConfig ctor
0076     # but this way, CLI arguments are used by the function that received them and
0077     # constraint constructions are visibly handled away from the RuleConfig class
0078     param_overrides = {}
0079     param_overrides["runs"]=args.runs
0080     param_overrides["runlist"]=args.runlist
0081     param_overrides["nevents"] = 0 # Not relevant, but needed for the RuleConfig ctor
0082         
0083     # Rest of the input substitutions
0084     if args.physicsmode is not None:
0085         param_overrides["physicsmode"] = args.physicsmode # e.g. physics
0086 
0087     if args.mangle_dstname:
0088         DEBUG("Mangling DST name")
0089         param_overrides['DST']=args.mangle_dstname
0090 
0091     # filesystem is the base for all output, allow for mangling here
0092     # "production" (in the default filesystem) is replaced
0093     param_overrides["prodmode"] = "production"
0094     if args.mangle_dirpath:
0095         param_overrides["prodmode"] = args.mangle_dirpath
0096 
0097     CHATTY(f"Rule substitutions: {param_overrides}")
0098     INFO("Now loading and building rule configuration.")
0099 
0100     #################### Load specific rule from the given yaml file.
0101     try:
0102         rule =  RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0103         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0104     except (ValueError, FileNotFoundError) as e:
0105         ERROR(f"Error: {e}")
0106         exit(1)
0107 
0108     CHATTY("Rule configuration:")
0109     CHATTY(yaml.dump(rule.dict))
0110         
0111     outstub = rule.outstub
0112     INFO(f"Output stub: {outstub}")
0113 
0114     input_stubs = inputs_from_output[rule.dsttype]
0115     DEBUG(f"Input stub(s): {input_stubs}")
0116     dataset = rule.dataset
0117     INFO(f"Dataset identifier: {dataset}")
0118     leaf_template = f'{rule.dsttype}'
0119     if 'raw' in rule.input_config.db:
0120         leaf_template += '_{host}'
0121     leaf_types = { f'{leaf_template}'.format(host=host) for host in input_stubs.keys() }
0122     INFO(f"Destination type template: {leaf_template}")
0123     DEBUG(f"Destination types: {leaf_types}")
0124 
0125     ### Which find command to use for lustre?
0126     # Lustre's robin hood, rbh-find, doesn't offer advantages for our usecase, and it is more cumbersome to use.
0127     # But "lfs find" is preferrable to the regular kind.
0128     lfind = shutil.which('lfs')
0129     if lfind is None:
0130         WARN("'lfs find' not found.")
0131         lfind = shutil.which('find')
0132     else:
0133         lfind = f'{lfind} find'
0134     INFO(f'Using "{lfind}.')
0135 
0136     ##################### DSTs, from lustre to lustre
0137     # Original output directory, the final destination, and the file name trunk
0138     filesystem = rule.job_config.filesystem
0139     DEBUG(f"Filesystem: {filesystem}")
0140     dstbase = f'{rule.rulestem}\*{rule.outstub}_{rule.outdataset}\*'
0141     INFO(f'DST files filtered as {dstbase}')
0142     lakelocation=filesystem['outdir']
0143     INFO(f"Original output directory: {lakelocation}")
0144 
0145     ### root files without cuts
0146     lakefiles = shell_command(f"{lfind} {lakelocation} -maxdepth 1 -type f -name {dstbase}\*.root\*")
0147     DEBUG(f"Found {len(lakefiles)} matching dsts without cuts in the lake.")
0148 
0149     # ### indicator files for 'finished'
0150     # finishedfiles = shell_command(f"{lfind} {lakelocation} -maxdepth 1 -type f -name {dstbase}\*.finished\*")
0151     # DEBUG(f"Found {len(finishedfiles)} matching .finished files in the lake.")
0152     
0153     # ### Mark off dbids (==finished jobs) that can be transferred
0154     # finished={}
0155     # for finfile in finishedfiles:
0156     #     pseudolfn=Path(finfile).name
0157     #     _,run,seg,end=parse_lfn(pseudolfn,rule)
0158     #     if binary_contains_bisect(rule.runlist_int,run):
0159     #         fullpath,_,_,_,_,dbid = parse_spiderstuff(finfile)
0160     #         if dbid <= 0:
0161     #             ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0162     #             exit(0)
0163     #         if dbid in finished:
0164     #             raise KeyError(f"dbid '{dbid}' already exists in the dictionary.")
0165     #         finished[dbid]=finfile
0166 
0167     # if len(finished) ==0 :
0168     #     INFO(f"No runs have finished yet. TTYL!")
0169     #     exit(0)
0170     # INFO(f"{len(finished)} runs have finished. Processing their root files.")
0171          
0172     ### Collect root files that satisfy run and dbid requirements
0173     mvfiles_info=[]
0174     for file in lakefiles:
0175         pseudolfn=Path(file).name
0176         dsttype,run,seg,_=parse_lfn(pseudolfn,rule)
0177         if binary_contains_bisect(rule.runlist_int,run):
0178             lfn,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(file)
0179             if dbid <= 0:
0180                 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0181                 exit(0)
0182             info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5,size,ctime)
0183 
0184             # if dbid not in finished:
0185             #     CHATTY(f"{dbid} isn't done yet")
0186             #     continue;
0187             mvfiles_info.append( (file,info) )
0188             
0189     INFO(f"{len(mvfiles_info)} total root files to be processed.")
0190     
0191     finaldir_tmpl=filesystem['finaldir']
0192     INFO(f"Final destination template: {finaldir_tmpl}")
0193 
0194     input_stem = inputs_from_output[rule.rulestem]
0195     DEBUG(f"Input stem: {input_stem}")
0196     outstub = rule.outstub
0197     INFO(f"Output stub: {outstub}")
0198 
0199     # Regrettably, 'dsttype' in the database refers to e.g. DST_STREAMING_EVENT_ebdc01_1_run3auau
0200     # Here, we want the base of that without the run3auau. Also known as "leaf" or "leafdir" sometimes.
0201     leaf_template = f'{rule.rulestem}'
0202     if 'raw' in rule.input_config.db:
0203         leaf_template += '_{host}'
0204     leaf_types = { f'{leaf_template}'.format(host=host) for host in input_stem.keys() }
0205     INFO(f"Destination type template: {leaf_template}")
0206     DEBUG(f"Destination types: {leaf_types}")
0207     
0208     ####################################### Start moving and registering DSTs
0209     tstart = datetime.now()
0210     tlast = tstart
0211     when2blurb=2000
0212     fmax=len(mvfiles_info)
0213     for f, file_and_info in enumerate(mvfiles_info):
0214         if f%when2blurb == 0:
0215             now = datetime.now()            
0216             print( f'DST #{f}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({when2blurb/(now - tlast).total_seconds():.2f} Hz). ' )
0217             print( f'                   time since the start:       \t {(now - tstart).total_seconds():.2f} seconds (cum. {f/(now - tstart).total_seconds():.2f} Hz). ' )
0218             tlast = now
0219         file,info=file_and_info
0220         dsttype,run,seg,lfn,nevents,first,last,md5=info
0221 
0222         # Check if we recognize the file name
0223         leaf=None
0224         for leaf_type in leaf_types:
0225             if lfn.startswith(leaf_type):
0226                 leaf=leaf_type
0227                 break
0228         if leaf is None:
0229             # DEBUG(f"Unknown file name: {lfn}")
0230             # continue
0231             ERROR(f"Unknown file name: {lfn}")
0232             exit(-1)
0233 
0234         ### Fill in templates
0235         rungroup= rule.job_config.rungroup_tmpl.format(a=100*math.floor(run/100), b=100*math.ceil((run+1)/100))
0236         finaldir = finaldir_tmpl.format( leafdir=leaf, rungroup=rungroup )
0237 
0238         ### Extract what else we need for file databases
0239         ### For additional db info. Note: stat is costly. Could be omitted.
0240         filestat=Path(file).stat()
0241         # filestat=None
0242 
0243         ###### Here be dragons
0244         full_file_path = f'{finaldir}/{lfn}'
0245         ### Move
0246         if args.dryrun:
0247             if f%when2blurb == 0:
0248                 print( f"Dryrun: Pretending to do:\n mv {file} {full_file_path}" )
0249         else:   
0250             # Create destination dir if it doesn't exit. Can't be done elsewhere/earlier, we need the full relevant runnumber range
0251             Path(finaldir).mkdir( parents=True, exist_ok=True )
0252             # Move the file
0253             try:
0254                 shutil.move( file, full_file_path )
0255             except Exception as e:
0256                 WARN(e)
0257                 
0258         ### ... and upsert catalog tables
0259         upsert_filecatalog(lfn=lfn,
0260                            info=info,
0261                            full_file_path = full_file_path,
0262                            filestat=filestat,
0263                            dataset=rule.outdataset,
0264                            tag=rule.outtriplet,
0265                            dryrun=args.dryrun
0266                            )
0267         pass # End of DST loop 
0268 
0269     ################################  Same thing for histogram files.
0270     # Very similar, use one function for both types.
0271     # Main difference is that it's easier to identify daqhost/leaf from the path
0272     # TODO: Dirty hardcoding assuming knowledge of histdir naming scheme
0273     find = shutil.which('find') # on gpfs, no need for lfs find, use the more powerful generic find
0274     histdir=filesystem['histdir']
0275     INFO(f"Histogram directory template: {histdir}")
0276     
0277     # # All leafs:
0278     leafparent=histdir.split('/{leafdir}')[0]
0279     INFO(f"Leaf directories: \n{leafparent}")
0280 
0281     leafdirs = shell_command(f"{find} {leafparent} -type d -mindepth 1 -a -maxdepth 1")
0282     CHATTY(f"Leaf directories: \n{leafdirs}")
0283     
0284     allhistdirs = []
0285     for leafdir in leafdirs :
0286         allhistdirs += shell_command(f"{find} {leafdir} -name hist -type d")
0287     CHATTY(f"hist directories: \n{allhistdirs}")
0288 
0289     ### Finally, run over all HIST files in those directories
0290     # They too have dbinfo and need to be registered and renamed
0291     foundhists=[]
0292     for hdir in allhistdirs:        
0293         tmpfound = shell_command(f"{find} {hdir} -type f -name HIST\*")
0294         # Remove files that already end in ".root" files
0295         foundhists += [ file for file in tmpfound if not file.endswith(".root") ]
0296 
0297     tstart = datetime.now()
0298     tlast = tstart
0299     when2blurb=2000
0300     fmax=len(foundhists)
0301     for f, file in enumerate(foundhists):
0302         if f%when2blurb == 0:
0303             now = datetime.now()
0304             print( f'HIST #{f}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({when2blurb/(now - tlast).total_seconds():.2f} Hz). ' )
0305             print( f'                  time since the start      :\t {(now - tstart).total_seconds():.2f} seconds (cum. {f/(now - tstart).total_seconds():.2f} Hz). ' )
0306             tlast = now            
0307         try:
0308             lfn,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(file)
0309         except Exception as e:
0310             WARN(f"Error: {e}")
0311             continue
0312 
0313         fullpath=str(Path(file).parent)+'/'+lfn
0314         dsttype,run,seg,_=parse_lfn(lfn,rule)
0315         
0316         if binary_contains_bisect(rule.runlist_int,run):
0317             if dbid <= 0:
0318                 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0319                 exit(0)
0320             info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5)
0321         else:
0322             continue
0323 
0324         # if dbid not in finished:
0325         #     CHATTY(f"{dbid} isn't done yet")
0326         #     continue
0327 
0328         ### Extract what else we need for file databases
0329         ### For additional db info. Note: stat is costly. Could be omitted.
0330         filestat=Path(file).stat()
0331         full_file_path = fullpath
0332 
0333         ### Move
0334         if args.dryrun:
0335             if f%when2blurb == 0:
0336                 print( f"Dryrun: Pretending to do:\n mv {file} {full_file_path}" )
0337         else:   
0338             # Move (rename) the file
0339             try:
0340                 shutil.move( file, full_file_path )
0341             except Exception as e:
0342                 WARN(e)
0343 
0344         ### ... and upsert catalog tables
0345         upsert_filecatalog(lfn=lfn,
0346                            info=info,
0347                            full_file_path = full_file_path,
0348                            filestat=filestat,
0349                            dataset=rule.outdataset,
0350                            tag=rule.outtriplet,                           
0351                            dryrun=args.dryrun
0352                            )
0353         pass # End of HIST loop 
0354 
0355     # ### finally, update prod db and remove the .finished signal files
0356     # for dbid,file in finished.items():
0357     #     CHATTY(f"Handling dbid={dbid}.")        
0358     #     update_proddb( dbid=dbid, filestat=Path(file).stat(), dryrun=args.dryrun )
0359     #     if not args.dryrun:
0360     #         Path(file).unlink()
0361                 
0362 # ============================================================================================
0363 
0364 if __name__ == '__main__':
0365     ERROR("This script is deprecated and not functional. Use dstspider and histspider instead.")
0366     exit(1)
0367     main()
0368     exit(0)
0369 
0370     cProfile.run('main()', '/tmp/sphenixprod.prof')
0371     import pstats
0372     p = pstats.Stats('/tmp/sphenixprod.prof')
0373     p.strip_dirs().sort_stats('time').print_stats(10)
0374 
0375     # Sort the output by the following options:
0376     # calls: Sort by the number of calls made.
0377     # cumulative: Sort by the cumulative time spent in the function and its callees.
0378     # filename: Sort by file name.
0379     # nfl: Sort by name/file/line.
0380     # pcalls: Sort by the number of primitive calls.
0381     # stdname: Sort by standard name (default).
0382     # time: Sort by the total time spent in the function itself.