File indexing completed on 2026-04-27 07:41:40
0001
0002
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import subprocess
0008 import sys
0009 import shutil
0010 import math
0011 from typing import List
0012
0013
0014 import pprint
0015
0016 from argparsing import submission_args
0017 from sphenixmisc import setup_rot_handler, should_I_quit
0018 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0019 from sphenixprodrules import RuleConfig,inputs_from_output
0020 from sphenixprodrules import parse_lfn,parse_spiderstuff
0021 from sphenixdbutils import test_mode as dbutils_test_mode
0022 from sphenixdbutils import filedb_info, upsert_filecatalog, update_proddb
0023 from sphenixmisc import binary_contains_bisect
0024
0025
0026 def shell_command(command: str) -> List[str]:
0027 """Minimal wrapper to hide away subbprocess tedium"""
0028 DEBUG(f"[shell_command] Command: {command}")
0029 ret=[]
0030 try:
0031 ret = subprocess.run(command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()
0032 except subprocess.CalledProcessError as e:
0033 WARN("[shell_command] Command failed with exit code:", e.returncode)
0034 finally:
0035 pass
0036
0037 DEBUG(f"[shell_command] Found {len(ret)} matches.")
0038 return ret
0039
0040
0041
0042 def main():
0043
0044 args = submission_args()
0045
0046
0047 test_mode = (
0048 dbutils_test_mode
0049 or args.test_mode
0050
0051 )
0052
0053
0054 sublogdir=setup_rot_handler(args)
0055 slogger.setLevel(args.loglevel)
0056
0057
0058 if should_I_quit(args=args, myname=sys.argv[0]):
0059 DEBUG("Stop.")
0060 exit(0)
0061
0062 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0063
0064 if test_mode:
0065 INFO("Running in testbed mode.")
0066 args.mangle_dirpath = 'production-testbed'
0067 else:
0068 INFO("Running in production mode.")
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078 param_overrides = {}
0079 param_overrides["runs"]=args.runs
0080 param_overrides["runlist"]=args.runlist
0081 param_overrides["nevents"] = 0
0082
0083
0084 if args.physicsmode is not None:
0085 param_overrides["physicsmode"] = args.physicsmode
0086
0087 if args.mangle_dstname:
0088 DEBUG("Mangling DST name")
0089 param_overrides['DST']=args.mangle_dstname
0090
0091
0092
0093 param_overrides["prodmode"] = "production"
0094 if args.mangle_dirpath:
0095 param_overrides["prodmode"] = args.mangle_dirpath
0096
0097 CHATTY(f"Rule substitutions: {param_overrides}")
0098 INFO("Now loading and building rule configuration.")
0099
0100
0101 try:
0102 rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0103 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0104 except (ValueError, FileNotFoundError) as e:
0105 ERROR(f"Error: {e}")
0106 exit(1)
0107
0108 CHATTY("Rule configuration:")
0109 CHATTY(yaml.dump(rule.dict))
0110
0111 outstub = rule.outstub
0112 INFO(f"Output stub: {outstub}")
0113
0114 input_stubs = inputs_from_output[rule.dsttype]
0115 DEBUG(f"Input stub(s): {input_stubs}")
0116 dataset = rule.dataset
0117 INFO(f"Dataset identifier: {dataset}")
0118 leaf_template = f'{rule.dsttype}'
0119 if 'raw' in rule.input_config.db:
0120 leaf_template += '_{host}'
0121 leaf_types = { f'{leaf_template}'.format(host=host) for host in input_stubs.keys() }
0122 INFO(f"Destination type template: {leaf_template}")
0123 DEBUG(f"Destination types: {leaf_types}")
0124
0125
0126
0127
0128 lfind = shutil.which('lfs')
0129 if lfind is None:
0130 WARN("'lfs find' not found.")
0131 lfind = shutil.which('find')
0132 else:
0133 lfind = f'{lfind} find'
0134 INFO(f'Using "{lfind}.')
0135
0136
0137
0138 filesystem = rule.job_config.filesystem
0139 DEBUG(f"Filesystem: {filesystem}")
0140 dstbase = f'{rule.rulestem}\*{rule.outstub}_{rule.outdataset}\*'
0141 INFO(f'DST files filtered as {dstbase}')
0142 lakelocation=filesystem['outdir']
0143 INFO(f"Original output directory: {lakelocation}")
0144
0145
0146 lakefiles = shell_command(f"{lfind} {lakelocation} -maxdepth 1 -type f -name {dstbase}\*.root\*")
0147 DEBUG(f"Found {len(lakefiles)} matching dsts without cuts in the lake.")
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160
0161
0162
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173 mvfiles_info=[]
0174 for file in lakefiles:
0175 pseudolfn=Path(file).name
0176 dsttype,run,seg,_=parse_lfn(pseudolfn,rule)
0177 if binary_contains_bisect(rule.runlist_int,run):
0178 lfn,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(file)
0179 if dbid <= 0:
0180 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0181 exit(0)
0182 info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5,size,ctime)
0183
0184
0185
0186
0187 mvfiles_info.append( (file,info) )
0188
0189 INFO(f"{len(mvfiles_info)} total root files to be processed.")
0190
0191 finaldir_tmpl=filesystem['finaldir']
0192 INFO(f"Final destination template: {finaldir_tmpl}")
0193
0194 input_stem = inputs_from_output[rule.rulestem]
0195 DEBUG(f"Input stem: {input_stem}")
0196 outstub = rule.outstub
0197 INFO(f"Output stub: {outstub}")
0198
0199
0200
0201 leaf_template = f'{rule.rulestem}'
0202 if 'raw' in rule.input_config.db:
0203 leaf_template += '_{host}'
0204 leaf_types = { f'{leaf_template}'.format(host=host) for host in input_stem.keys() }
0205 INFO(f"Destination type template: {leaf_template}")
0206 DEBUG(f"Destination types: {leaf_types}")
0207
0208
0209 tstart = datetime.now()
0210 tlast = tstart
0211 when2blurb=2000
0212 fmax=len(mvfiles_info)
0213 for f, file_and_info in enumerate(mvfiles_info):
0214 if f%when2blurb == 0:
0215 now = datetime.now()
0216 print( f'DST #{f}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({when2blurb/(now - tlast).total_seconds():.2f} Hz). ' )
0217 print( f' time since the start: \t {(now - tstart).total_seconds():.2f} seconds (cum. {f/(now - tstart).total_seconds():.2f} Hz). ' )
0218 tlast = now
0219 file,info=file_and_info
0220 dsttype,run,seg,lfn,nevents,first,last,md5=info
0221
0222
0223 leaf=None
0224 for leaf_type in leaf_types:
0225 if lfn.startswith(leaf_type):
0226 leaf=leaf_type
0227 break
0228 if leaf is None:
0229
0230
0231 ERROR(f"Unknown file name: {lfn}")
0232 exit(-1)
0233
0234
0235 rungroup= rule.job_config.rungroup_tmpl.format(a=100*math.floor(run/100), b=100*math.ceil((run+1)/100))
0236 finaldir = finaldir_tmpl.format( leafdir=leaf, rungroup=rungroup )
0237
0238
0239
0240 filestat=Path(file).stat()
0241
0242
0243
0244 full_file_path = f'{finaldir}/{lfn}'
0245
0246 if args.dryrun:
0247 if f%when2blurb == 0:
0248 print( f"Dryrun: Pretending to do:\n mv {file} {full_file_path}" )
0249 else:
0250
0251 Path(finaldir).mkdir( parents=True, exist_ok=True )
0252
0253 try:
0254 shutil.move( file, full_file_path )
0255 except Exception as e:
0256 WARN(e)
0257
0258
0259 upsert_filecatalog(lfn=lfn,
0260 info=info,
0261 full_file_path = full_file_path,
0262 filestat=filestat,
0263 dataset=rule.outdataset,
0264 tag=rule.outtriplet,
0265 dryrun=args.dryrun
0266 )
0267 pass
0268
0269
0270
0271
0272
0273 find = shutil.which('find')
0274 histdir=filesystem['histdir']
0275 INFO(f"Histogram directory template: {histdir}")
0276
0277
0278 leafparent=histdir.split('/{leafdir}')[0]
0279 INFO(f"Leaf directories: \n{leafparent}")
0280
0281 leafdirs = shell_command(f"{find} {leafparent} -type d -mindepth 1 -a -maxdepth 1")
0282 CHATTY(f"Leaf directories: \n{leafdirs}")
0283
0284 allhistdirs = []
0285 for leafdir in leafdirs :
0286 allhistdirs += shell_command(f"{find} {leafdir} -name hist -type d")
0287 CHATTY(f"hist directories: \n{allhistdirs}")
0288
0289
0290
0291 foundhists=[]
0292 for hdir in allhistdirs:
0293 tmpfound = shell_command(f"{find} {hdir} -type f -name HIST\*")
0294
0295 foundhists += [ file for file in tmpfound if not file.endswith(".root") ]
0296
0297 tstart = datetime.now()
0298 tlast = tstart
0299 when2blurb=2000
0300 fmax=len(foundhists)
0301 for f, file in enumerate(foundhists):
0302 if f%when2blurb == 0:
0303 now = datetime.now()
0304 print( f'HIST #{f}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({when2blurb/(now - tlast).total_seconds():.2f} Hz). ' )
0305 print( f' time since the start :\t {(now - tstart).total_seconds():.2f} seconds (cum. {f/(now - tstart).total_seconds():.2f} Hz). ' )
0306 tlast = now
0307 try:
0308 lfn,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(file)
0309 except Exception as e:
0310 WARN(f"Error: {e}")
0311 continue
0312
0313 fullpath=str(Path(file).parent)+'/'+lfn
0314 dsttype,run,seg,_=parse_lfn(lfn,rule)
0315
0316 if binary_contains_bisect(rule.runlist_int,run):
0317 if dbid <= 0:
0318 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0319 exit(0)
0320 info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5)
0321 else:
0322 continue
0323
0324
0325
0326
0327
0328
0329
0330 filestat=Path(file).stat()
0331 full_file_path = fullpath
0332
0333
0334 if args.dryrun:
0335 if f%when2blurb == 0:
0336 print( f"Dryrun: Pretending to do:\n mv {file} {full_file_path}" )
0337 else:
0338
0339 try:
0340 shutil.move( file, full_file_path )
0341 except Exception as e:
0342 WARN(e)
0343
0344
0345 upsert_filecatalog(lfn=lfn,
0346 info=info,
0347 full_file_path = full_file_path,
0348 filestat=filestat,
0349 dataset=rule.outdataset,
0350 tag=rule.outtriplet,
0351 dryrun=args.dryrun
0352 )
0353 pass
0354
0355
0356
0357
0358
0359
0360
0361
0362
0363
0364 if __name__ == '__main__':
0365 ERROR("This script is deprecated and not functional. Use dstspider and histspider instead.")
0366 exit(1)
0367 main()
0368 exit(0)
0369
0370 cProfile.run('main()', '/tmp/sphenixprod.prof')
0371 import pstats
0372 p = pstats.Stats('/tmp/sphenixprod.prof')
0373 p.strip_dirs().sort_stats('time').print_stats(10)
0374
0375
0376
0377
0378
0379
0380
0381
0382