File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import subprocess
0009 import sys
0010 import shutil
0011 import os
0012 from typing import List
0013
0014
0015 import pprint
0016
0017 from argparsing import submission_args
0018 from sphenixmisc import setup_rot_handler, should_I_quit
0019 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0020 from sphenixprodrules import RuleConfig
0021 from sphenixmatching import parse_lfn, parse_spiderstuff
0022 from sphenixdbutils import test_mode as dbutils_test_mode
0023 from sphenixdbutils import long_filedb_info, filedb_info, full_db_info, upsert_filecatalog, update_proddb
0024 from sphenixmisc import binary_contains_bisect
0025
0026
0027 def shell_command(command: str) -> List[str]:
0028 """Minimal wrapper to hide away subbprocess tedium"""
0029 CHATTY(f"[shell_command] Command: {command}")
0030 ret=[]
0031 try:
0032 ret = subprocess.run(command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()
0033 except subprocess.CalledProcessError as e:
0034 WARN("[shell_command] Command failed with exit code:", e.returncode)
0035 finally:
0036 pass
0037
0038 CHATTY(f"[shell_command] Found {len(ret)} matches.")
0039 return ret
0040
0041
0042
0043 def main():
0044
0045 args = submission_args()
0046
0047
0048 test_mode = (
0049 dbutils_test_mode
0050 or args.test_mode
0051
0052 )
0053
0054
0055 sublogdir=setup_rot_handler(args)
0056 slogger.setLevel(args.loglevel)
0057
0058
0059 if should_I_quit(args=args, myname=sys.argv[0]):
0060 DEBUG("Stop.")
0061 exit(0)
0062
0063 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0064
0065 if args.profile:
0066 DEBUG( "Profiling is ENABLED.")
0067 profiler = cProfile.Profile()
0068 profiler.enable()
0069
0070 if test_mode:
0071 INFO("Running in testbed mode.")
0072 args.mangle_dirpath = 'production-testbed'
0073 else:
0074 INFO("Running in production mode.")
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084 param_overrides = {}
0085 param_overrides["runs"]=args.runs
0086 param_overrides["runlist"]=args.runlist
0087 param_overrides["nevents"] = 0
0088
0089
0090 if args.physicsmode is not None:
0091 param_overrides["physicsmode"] = args.physicsmode
0092
0093
0094
0095 param_overrides["prodmode"] = "production"
0096 if args.mangle_dirpath:
0097 param_overrides["prodmode"] = args.mangle_dirpath
0098
0099 CHATTY(f"Rule substitutions: {param_overrides}")
0100 INFO("Now loading and building rule configuration.")
0101
0102
0103 try:
0104 rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0105 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0106 except (ValueError, FileNotFoundError) as e:
0107 ERROR(f"Error: {e}")
0108 exit(1)
0109
0110 CHATTY("Rule configuration:")
0111 CHATTY(yaml.dump(rule.dict))
0112
0113 filesystem = rule.job_config.filesystem
0114 DEBUG(f"Filesystem: {filesystem}")
0115
0116
0117
0118
0119
0120 find = shutil.which('find')
0121 histdir=filesystem['histdir']
0122 INFO(f"Histogram directory template: {histdir}")
0123
0124
0125 leafparent=histdir.split('/{leafdir}')[0]
0126 leafdirs = shell_command(rf"{find} {leafparent} -type d -name {rule.dsttype}\* -mindepth 1 -a -maxdepth 1")
0127 DEBUG(f"Leaf directories: \n{pprint.pformat(leafdirs)}")
0128 allhistdirs = []
0129 for leafdir in leafdirs :
0130 allhistdirs += shell_command(f"{find} {leafdir} -name hist -type d")
0131 CHATTY(f"hist directories: \n{allhistdirs}")
0132
0133
0134
0135 foundhists=[]
0136 for hdir in allhistdirs:
0137 tmpfound = shell_command(rf"{find} {hdir} -type f -name HIST\*root:\* -o -name CALIB\*")
0138
0139
0140 foundhists += [ file for file in tmpfound if not file.endswith(".root") ]
0141
0142
0143 INFO(f"Found a total of {len(foundhists)} histograms to register. Checking against run constraint")
0144 act_on_hists=[]
0145 for loopfile in foundhists:
0146 try:
0147 lfn,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(loopfile)
0148 except Exception as e:
0149 WARN(f"Error: {e}")
0150 continue
0151 try:
0152 dsttype,run,seg,_=parse_lfn(lfn,rule)
0153 except Exception as e:
0154 if e.args[0]=="killkillkill":
0155 WARN(f"{lfn} does not contain run and segment information. Delete.")
0156
0157 if not loopfile.endswith(".root"):
0158
0159 print(loopfile)
0160 continue
0161 else:
0162 WARN(f"{loopfile} looks like one we shouldn't have caught here anyway. Keep.")
0163 WARN(f"Error parsing lfn {lfn}: {e}. Skipped.")
0164 continue
0165
0166 fullpath=str(Path(loopfile).parent)+'/'+lfn
0167 if binary_contains_bisect(rule.runlist_int,run):
0168 if dbid <= 0:
0169 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0170 exit(0)
0171 info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5,size,ctime)
0172 else:
0173 continue
0174
0175
0176 full_file_path = fullpath
0177
0178 fullinfo=full_db_info(
0179 origfile=loopfile,
0180 info=info,
0181 lfn=lfn,
0182 full_file_path=full_file_path,
0183 dataset=rule.dataset,
0184 tag=rule.outtriplet,
0185 )
0186 act_on_hists.append((full_file_path,fullinfo))
0187
0188 fmax=len(act_on_hists)
0189 INFO(f"Found {fmax} in the specified run range")
0190
0191
0192 tstart = datetime.now()
0193 tlast = tstart
0194 when2blurb=2000
0195 for f, (full_file_path,fullinfo) in enumerate(act_on_hists):
0196 if f%when2blurb == 0:
0197 now = datetime.now()
0198 print( f'HIST #{f}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({when2blurb/(now - tlast).total_seconds():.2f} Hz). ' )
0199 print( f' time since the start :\t {(now - tstart).total_seconds():.2f} seconds (cum. {f/(now - tstart).total_seconds():.2f} Hz). ' )
0200 tlast = now
0201
0202 origfile=fullinfo.origfile
0203
0204 upsert_filecatalog(fullinfos=fullinfo,
0205 dryrun=args.dryrun
0206 )
0207 if args.dryrun:
0208 if not Path(origfile).is_file():
0209 ERROR(f"Can't see {origfile}")
0210 exit(1)
0211 try:
0212 os.rename( origfile, full_file_path )
0213 except Exception as e:
0214 print(f" {origfile}\n{full_file_path}" )
0215 ERROR(e)
0216 exit(1)
0217
0218 if args.profile:
0219 profiler.disable()
0220 DEBUG("Profiling finished. Printing stats...")
0221 stats = pstats.Stats(profiler)
0222 stats.strip_dirs().sort_stats('time').print_stats(10)
0223
0224 INFO(f"{Path(sys.argv[0]).name} DONE.")
0225
0226
0227
0228 if __name__ == '__main__':
0229 main()
0230 exit(0)