Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/bin/env python
0002 
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import sys
0009 import shutil
0010 import os
0011 
0012 # from dataclasses import fields
0013 import pprint # noqa F401
0014 
0015 from argparsing import submission_args
0016 from sphenixmisc import setup_rot_handler, should_I_quit, make_chunks
0017 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0018 from sphenixprodrules import RuleConfig
0019 from sphenixmatching import MatchConfig, parse_lfn, parse_spiderstuff
0020 from sphenixdbutils import test_mode as dbutils_test_mode
0021 from sphenixdbutils import long_filedb_info, filedb_info, full_db_info, upsert_filecatalog, update_proddb  # noqa: F401
0022 from sphenixmisc import binary_contains_bisect,shell_command,lock_file,unlock_file
0023 
0024 
0025 # ============================================================================================
0026 def main():
0027     ### digest arguments
0028     args = submission_args()
0029 
0030     #################### Test mode?
0031     test_mode = (
0032             dbutils_test_mode
0033             or args.test_mode
0034             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0035         )
0036 
0037     # Set up submission logging before going any further
0038     sublogdir=setup_rot_handler(args)
0039     slogger.setLevel(args.loglevel)
0040 
0041     # Exit without fuss if we are already running
0042     if should_I_quit(args=args, myname=sys.argv[0]):
0043         DEBUG("Stop.")
0044         exit(0)
0045     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0046 
0047     if args.profile:
0048         DEBUG( "Profiling is ENABLED.")
0049         profiler = cProfile.Profile()
0050         profiler.enable()
0051 
0052     INFO("Starting dstspider.")
0053     INFO(sys.argv)
0054 
0055     if test_mode:
0056         INFO("Running in testbed mode.")
0057         args.mangle_dirpath = 'production-testbed'
0058     else:
0059         INFO("Running in production mode.")
0060 
0061     #################### Rule has steering parameters and two subclasses for input and job specifics
0062     # Rule is instantiated via the yaml reader.
0063 
0064     ### Parse command line arguments into a substitution dictionary
0065     # This dictionary is passed to the ctor to override/customize yaml file parameters
0066     # Note: The following could all be hidden away in the RuleConfig ctor
0067     # but this way, CLI arguments are used by the function that received them and
0068     # constraint constructions are visibly handled away from the RuleConfig class
0069     param_overrides = {}
0070     param_overrides["runs"]=args.runs
0071     param_overrides["runlist"]=args.runlist
0072     param_overrides["nevents"] = 0 # Not relevant, but needed for the RuleConfig ctor
0073 
0074     # Rest of the input substitutions
0075     if args.physicsmode is not None:
0076         param_overrides["physicsmode"] = args.physicsmode # e.g. physics
0077 
0078     # filesystem is the base for all output, allow for mangling here
0079     # "production" (in the default filesystem) is replaced
0080     param_overrides["prodmode"] = "production"
0081     if args.mangle_dirpath:
0082         param_overrides["prodmode"] = args.mangle_dirpath
0083 
0084     CHATTY(f"Rule substitutions: {param_overrides}")
0085     INFO("Now loading and building rule configuration.")
0086 
0087     #################### Load specific rule from the given yaml file.
0088     try:
0089         rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0090         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0091     except (ValueError, FileNotFoundError) as e:
0092         ERROR(f"Error: {e}")
0093         exit(1)
0094 
0095     CHATTY("Rule configuration:")
0096     CHATTY(yaml.dump(rule.dict))
0097     filesystem = rule.job_config.filesystem
0098 
0099     # Create a match configuration from the rule
0100     match_config = MatchConfig.from_rule_config(rule)
0101     CHATTY("Match configuration:")
0102     CHATTY(yaml.dump(match_config.dict))
0103 
0104     ### Use or create a list file containing all the existing files to work on.
0105     ### This reduces memory footprint and repeated slow `find` commands for large amounts of files
0106     dstlistname=filesystem['histdir']
0107     dstlistname=dstlistname.split("{")[0]
0108     while dstlistname.endswith("/"):
0109         dstlistname=dstlistname[0:-1]
0110     #dstlistname=f"{dstlistname}/{args.rulename}_{rule.dsttype}_dstlist"
0111     dstlistname=f"{dstlistname}/{args.rulename}_dstlist"
0112     
0113     # First, lock. This way multiple spiders can work on a file without stepping on each others' (8) toes
0114     if not lock_file(dstlistname,args.dryrun):
0115         exit(0)
0116 
0117     INFO(f"Looking for existing filelist {dstlistname}")
0118     if Path(dstlistname).exists():
0119         INFO( " ... found.")
0120     else:
0121         INFO(" ... not found. Creating a new one.")
0122         Path(dstlistname).parent.mkdir( parents=True, exist_ok=True )
0123         match_config.get_output_files(r"\*root:\*",dstlistname,args.dryrun)
0124 
0125     if not Path(dstlistname).is_file():
0126         INFO("List file not found.")
0127         exit(0)
0128 
0129     wccommand=f"wc -l {dstlistname}"
0130     ret = shell_command(wccommand)
0131     INFO(f"List contains {ret[0]} files.")
0132 
0133     ### Grab the first N files and work on those.
0134     nfiles_to_process=500000
0135     exhausted=False
0136     dstfiles=[]
0137     tmpname=f"{dstlistname}.tmp"
0138     with open(dstlistname,"r") as infile, open(f"{dstlistname}.tmp", "w") as smallerdstlist:
0139         for _ in range(nfiles_to_process):
0140             line=infile.readline()
0141             if line:
0142                 dstfiles.append(line.strip())
0143             else:
0144                 exhausted=True
0145                 break
0146         for line in infile:
0147             smallerdstlist.write(line)
0148     if not args.dryrun:
0149         shutil.move(tmpname,dstlistname)
0150         if exhausted: # Used up the existing list.
0151             INFO("Used up all previously found dst files. Next call will create a new list")
0152             Path(dstlistname).unlink(missing_ok=True)
0153     else:
0154         Path(tmpname).unlink(missing_ok=True)
0155  
0156     # Done with selecting or creating our chunk, release the lock
0157     unlock_file(dstlistname,args.dryrun)
0158 
0159     ### Collect root files that satisfy run and dbid requirements
0160     mvfiles_info=[]
0161     for file in dstfiles:
0162         lfn=Path(file).name
0163         dsttype,run,seg,_=parse_lfn(lfn,rule)
0164         if binary_contains_bisect(rule.runlist_int,run):  # Safety net to move only specified runs
0165             fullpath,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(file)
0166             if dbid <= 0:
0167                 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0168                 exit(0)
0169             info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5,size,ctime)
0170             mvfiles_info.append( (file,info) )
0171 
0172     INFO(f"{len(mvfiles_info)} total root files to be processed.")
0173 
0174     ####################################### Start moving and registering DSTs
0175     tstart = datetime.now()
0176     tlast = tstart
0177     chunksize=2000
0178     fmax=len(mvfiles_info)
0179 
0180     chunked_mvfiles = make_chunks(mvfiles_info, chunksize)
0181     for i, chunk in enumerate(chunked_mvfiles):
0182         now = datetime.now()
0183         print( f'DST #{i*chunksize}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({chunksize/(now - tlast).total_seconds():.2f} Hz). ' )
0184         print( f'                   time since the start:       \t {(now - tstart).total_seconds():.2f} seconds (cum. {i*chunksize/(now - tstart).total_seconds():.2f} Hz). ' )
0185         tlast = now
0186 
0187         fullinfo_chunk=[]
0188         seen_lfns=set()
0189         for file_and_info in chunk:
0190             file,info=file_and_info
0191             dsttype,run,seg,lfn,nevents,first,last,md5,size,time=info
0192             ## lfn duplication can happen for reproductions where only the db was updated without deleting existing output.
0193             ## The "best" one isn't always clear, so assume the latest one is better than what's old.
0194             if lfn in seen_lfns:
0195                 existing = str(Path(file).parent)+'/'+lfn
0196                 INFO(f"We already have a file with lfn {lfn}. Deleting {existing}.")
0197                 if not args.dryrun:
0198                     Path(existing).unlink(missing_ok=True)
0199                 continue
0200             seen_lfns.add(lfn)
0201 
0202             fileparent=Path(file).parent
0203             full_file_path = f'{fileparent}/{lfn}'
0204             fullinfo_chunk.append(full_db_info(
0205                 origfile=file,
0206                 info=info,
0207                 lfn=lfn,
0208                 full_file_path=full_file_path,
0209                 dataset=rule.dataset,
0210                 tag=rule.outtriplet,
0211                 ))
0212             # end of chunk creation loop
0213 
0214         ###### Here be dragons
0215         ### Register first, then move.
0216         try:
0217             upsert_filecatalog(fullinfos=fullinfo_chunk,
0218                            dryrun=args.dryrun # only prints the query if True
0219                            )
0220         except Exception as e:
0221             WARN( f"dstspider is ignoring the database exception and moving on: {e}")
0222             ### database errors can happen when there are multiples of a file in the prod db.
0223             ### Why _that_ happens should be investigated, but here, we can just move on to the next chunk.
0224             continue
0225             exit(1)
0226 
0227         if not args.dryrun:
0228             for fullinfo in fullinfo_chunk:
0229                 try:
0230                     os.rename( fullinfo.origfile, fullinfo.full_file_path )
0231                     # shutil.move( fullinfo.origfile, fullinfo.full_file_path )
0232                 except Exception as e:
0233                     WARN(e)
0234                     # exit(-1)
0235                 # end of chunk move loop
0236             # dryrun?
0237         pass # End of DST loop
0238 
0239     if args.profile:
0240         profiler.disable()
0241         DEBUG("Profiling finished. Printing stats...")
0242         stats = pstats.Stats(profiler)
0243         stats.strip_dirs().sort_stats('time').print_stats(10)
0244 
0245     INFO(f"{Path(sys.argv[0]).name} DONE.")
0246 
0247 # ============================================================================================
0248 
0249 if __name__ == '__main__':
0250     main()
0251     exit(0)