Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import subprocess
0009 import sys
0010 import shutil
0011 import os
0012 from typing import List
0013 
0014 # from dataclasses import fields
0015 import pprint # noqa F401
0016 
0017 from argparsing import submission_args
0018 from sphenixmisc import setup_rot_handler, should_I_quit
0019 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0020 from sphenixprodrules import RuleConfig
0021 from sphenixmatching import parse_lfn, parse_spiderstuff
0022 from sphenixdbutils import test_mode as dbutils_test_mode
0023 from sphenixdbutils import long_filedb_info, filedb_info, full_db_info, upsert_filecatalog, update_proddb  # noqa: F401
0024 from sphenixmisc import binary_contains_bisect
0025 
0026 # ============================================================================================
0027 def shell_command(command: str) -> List[str]:
0028     """Minimal wrapper to hide away subbprocess tedium"""
0029     CHATTY(f"[shell_command] Command: {command}")
0030     ret=[]
0031     try:
0032         ret = subprocess.run(command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()
0033     except subprocess.CalledProcessError as e:
0034         WARN("[shell_command] Command failed with exit code:", e.returncode)
0035     finally:
0036         pass
0037 
0038     CHATTY(f"[shell_command] Found {len(ret)} matches.")
0039     return ret
0040 
0041 # ============================================================================================
0042 
0043 def main():
0044     ### digest arguments
0045     args = submission_args()
0046 
0047     #################### Test mode?
0048     test_mode = (
0049             dbutils_test_mode
0050             or args.test_mode
0051             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0052         )
0053 
0054     # Set up submission logging before going any further
0055     sublogdir=setup_rot_handler(args)
0056     slogger.setLevel(args.loglevel)
0057 
0058     # Exit without fuss if we are already running
0059     if should_I_quit(args=args, myname=sys.argv[0]):
0060         DEBUG("Stop.")
0061         exit(0)
0062 
0063     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0064 
0065     if args.profile:
0066         DEBUG( "Profiling is ENABLED.")
0067         profiler = cProfile.Profile()
0068         profiler.enable()
0069 
0070     if test_mode:
0071         INFO("Running in testbed mode.")
0072         args.mangle_dirpath = 'production-testbed'
0073     else:
0074         INFO("Running in production mode.")
0075 
0076     #################### Rule has steering parameters and two subclasses for input and job specifics
0077     # Rule is instantiated via the yaml reader.
0078 
0079     ### Parse command line arguments into a substitution dictionary
0080     # This dictionary is passed to the ctor to override/customize yaml file parameters
0081     # Note: The following could all be hidden away in the RuleConfig ctor
0082     # but this way, CLI arguments are used by the function that received them and
0083     # constraint constructions are visibly handled away from the RuleConfig class
0084     param_overrides = {}
0085     param_overrides["runs"]=args.runs
0086     param_overrides["runlist"]=args.runlist
0087     param_overrides["nevents"] = 0 # Not relevant, but needed for the RuleConfig ctor
0088 
0089     # Rest of the input substitutions
0090     if args.physicsmode is not None:
0091         param_overrides["physicsmode"] = args.physicsmode # e.g. physics
0092 
0093     # filesystem is the base for all output, allow for mangling here
0094     # "production" (in the default filesystem) is replaced
0095     param_overrides["prodmode"] = "production"
0096     if args.mangle_dirpath:
0097         param_overrides["prodmode"] = args.mangle_dirpath
0098 
0099     CHATTY(f"Rule substitutions: {param_overrides}")
0100     INFO("Now loading and building rule configuration.")
0101 
0102     #################### Load specific rule from the given yaml file.
0103     try:
0104         rule =  RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0105         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0106     except (ValueError, FileNotFoundError) as e:
0107         ERROR(f"Error: {e}")
0108         exit(1)
0109 
0110     CHATTY("Rule configuration:")
0111     CHATTY(yaml.dump(rule.dict))
0112 
0113     filesystem = rule.job_config.filesystem
0114     DEBUG(f"Filesystem: {filesystem}")
0115 
0116     ################################  Move histogram files.
0117     # Very similar to dstspider, use one function for both types.
0118     # Main difference is that it's easier to identify daqhost/leaf from the path
0119     # TODO: Dirty hardcoding assuming knowledge of histdir naming scheme
0120     find = shutil.which('find') # on gpfs, no need for lfs find, use the more powerful generic find
0121     histdir=filesystem['histdir']
0122     INFO(f"Histogram directory template: {histdir}")
0123 
0124     # All leafs:
0125     leafparent=histdir.split('/{leafdir}')[0]
0126     leafdirs = shell_command(rf"{find} {leafparent} -type d -name {rule.dsttype}\* -mindepth 1 -a -maxdepth 1")
0127     DEBUG(f"Leaf directories: \n{pprint.pformat(leafdirs)}")
0128     allhistdirs = []
0129     for leafdir in leafdirs :
0130         allhistdirs += shell_command(f"{find} {leafdir} -name hist -type d")
0131     CHATTY(f"hist directories: \n{allhistdirs}")
0132 
0133     ### Finally, run over all HIST files in those directories
0134     # They too have dbinfo and need to be registered and renamed
0135     foundhists=[]
0136     for hdir in allhistdirs:
0137         tmpfound = shell_command(rf"{find} {hdir} -type f -name HIST\*root:\* -o -name CALIB\*")
0138 
0139         # Remove files that already end in ".root" - they're already registered
0140         foundhists += [ file for file in tmpfound if not file.endswith(".root") ]
0141 
0142     # Final cuts
0143     INFO(f"Found a total of {len(foundhists)} histograms to register. Checking against run constraint")
0144     act_on_hists=[]
0145     for loopfile in foundhists:
0146         try:
0147             lfn,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(loopfile)
0148         except Exception as e:
0149             WARN(f"Error: {e}")
0150             continue
0151         try:
0152             dsttype,run,seg,_=parse_lfn(lfn,rule)
0153         except Exception as e:
0154             if e.args[0]=="killkillkill":
0155                 WARN(f"{lfn} does not contain run and segment information. Delete.")
0156                 ## DOUBLE-check to not delete already registered files.
0157                 if not loopfile.endswith(".root"):
0158                     #Path(loopfile).unlink()
0159                     print(loopfile)
0160                     continue
0161                 else:
0162                     WARN(f"{loopfile} looks like one we shouldn't have caught here anyway. Keep.")
0163             WARN(f"Error parsing lfn {lfn}: {e}. Skipped.")
0164             continue
0165 
0166         fullpath=str(Path(loopfile).parent)+'/'+lfn
0167         if binary_contains_bisect(rule.runlist_int,run):
0168             if dbid <= 0:
0169                 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0170                 exit(0)
0171             info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5,size,ctime)
0172         else:
0173             continue
0174 
0175         ### Extract what else we need for file databases
0176         full_file_path = fullpath
0177 
0178         fullinfo=full_db_info(
0179                 origfile=loopfile,
0180                 info=info,
0181                 lfn=lfn,
0182                 full_file_path=full_file_path,
0183                 dataset=rule.dataset,
0184                 tag=rule.outtriplet,
0185                 )
0186         act_on_hists.append((full_file_path,fullinfo))
0187 
0188     fmax=len(act_on_hists)
0189     INFO(f"Found {fmax} in the specified run range")
0190 
0191     ###### Here be dragons
0192     tstart = datetime.now()
0193     tlast = tstart
0194     when2blurb=2000
0195     for f, (full_file_path,fullinfo) in enumerate(act_on_hists):
0196         if f%when2blurb == 0:
0197             now = datetime.now()
0198             print( f'HIST #{f}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({when2blurb/(now - tlast).total_seconds():.2f} Hz). ' )
0199             print( f'                  time since the start      :\t {(now - tstart).total_seconds():.2f} seconds (cum. {f/(now - tstart).total_seconds():.2f} Hz). ' )
0200             tlast = now
0201 
0202         origfile=fullinfo.origfile
0203         ### Register first, then move.
0204         upsert_filecatalog(fullinfos=fullinfo,
0205                            dryrun=args.dryrun # only prints the query if True
0206                            )
0207         if args.dryrun:
0208             if not Path(origfile).is_file():
0209                 ERROR(f"Can't see {origfile}")
0210                 exit(1)
0211         try:
0212             os.rename( origfile, full_file_path )
0213         except Exception as e:
0214             print(f" {origfile}\n{full_file_path}" )
0215             ERROR(e)
0216             exit(1)
0217 
0218     if args.profile:
0219         profiler.disable()
0220         DEBUG("Profiling finished. Printing stats...")
0221         stats = pstats.Stats(profiler)
0222         stats.strip_dirs().sort_stats('time').print_stats(10)
0223 
0224     INFO(f"{Path(sys.argv[0]).name} DONE.")
0225 
0226 # ============================================================================================
0227 
0228 if __name__ == '__main__':
0229     main()
0230     exit(0)