File indexing completed on 2026-04-25 08:29:07
0001
0002
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import subprocess
0009 import sys
0010 import shutil
0011 import os
0012 import math
0013 from typing import List
0014
0015
0016 import pprint
0017
0018 from argparsing import submission_args
0019 from sphenixmisc import setup_rot_handler, should_I_quit, make_chunks
0020 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0021 from sphenixprodrules import RuleConfig,inputs_from_output
0022 from sphenixprodrules import parse_lfn,parse_spiderstuff
0023 from sphenixdbutils import test_mode as dbutils_test_mode
0024 from sphenixdbutils import long_filedb_info, filedb_info, full_db_info, upsert_filecatalog, update_proddb
0025 from sphenixmisc import binary_contains_bisect
0026
0027
0028 def shell_command(command: str) -> List[str]:
0029 """Minimal wrapper to hide away subbprocess tedium"""
0030 DEBUG(f"[shell_command] Command: {command}")
0031 ret=[]
0032 try:
0033 ret = subprocess.run(command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()
0034 except subprocess.CalledProcessError as e:
0035 WARN("[shell_command] Command failed with exit code:", e.returncode)
0036 finally:
0037 pass
0038
0039 DEBUG(f"[shell_command] Return value length is {len(ret)}.")
0040 return ret
0041
0042
0043
0044 def main():
0045
0046 args = submission_args()
0047
0048
0049 test_mode = (
0050 dbutils_test_mode
0051 or args.test_mode
0052
0053 )
0054
0055
0056 sublogdir=setup_rot_handler(args)
0057 slogger.setLevel(args.loglevel)
0058
0059
0060 if should_I_quit(args=args, myname=sys.argv[0]):
0061 DEBUG("Stop.")
0062 exit(0)
0063 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0064
0065 if args.profile:
0066 DEBUG(f"Profiling is ENABLED.")
0067 profiler = cProfile.Profile()
0068 profiler.enable()
0069
0070 if test_mode:
0071 INFO("Running in testbed mode.")
0072 args.mangle_dirpath = 'production-testbed'
0073 else:
0074 INFO("Running in production mode.")
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084 param_overrides = {}
0085 param_overrides["runs"]=args.runs
0086 param_overrides["runlist"]=args.runlist
0087 param_overrides["nevents"] = 0
0088
0089
0090 if args.physicsmode is not None:
0091 param_overrides["physicsmode"] = args.physicsmode
0092
0093
0094
0095 param_overrides["prodmode"] = "production"
0096 if args.mangle_dirpath:
0097 param_overrides["prodmode"] = args.mangle_dirpath
0098
0099 CHATTY(f"Rule substitutions: {param_overrides}")
0100 INFO("Now loading and building rule configuration.")
0101
0102
0103 try:
0104 rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0105 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0106 except (ValueError, FileNotFoundError) as e:
0107 ERROR(f"Error: {e}")
0108 exit(1)
0109
0110 CHATTY("Rule configuration:")
0111 CHATTY(yaml.dump(rule.dict))
0112
0113 filesystem = rule.job_config.filesystem
0114 DEBUG(f"Filesystem: {filesystem}")
0115
0116
0117
0118
0119 lfind = shutil.which('lfs')
0120 if lfind is None:
0121 WARN("'lfs find' not found")
0122 lfind = shutil.which('find')
0123 else:
0124 lfind = f'{lfind} find'
0125 INFO(f'Using "{lfind}.')
0126
0127
0128
0129 dstbase = f'{rule.dsttype}\*{rule.dataset}_{rule.outtriplet}\*'
0130
0131 INFO(f'DST files filtered as {dstbase}')
0132 lakelocation=filesystem['outdir']
0133 INFO(f"Original output directory: {lakelocation}")
0134
0135
0136
0137
0138
0139 lakelistname=filesystem['logdir']
0140 lakelistname=lakelistname.split("{")[0]
0141 while lakelistname.endswith("/"):
0142 lakelistname=lakelistname[0:-1]
0143 lakelistname=f"{lakelistname}/{rule.dsttype}_lakelist"
0144 lakelistlock=lakelistname+".lock"
0145
0146 if Path(lakelistlock).exists():
0147 WARN(f"Lock file {lakelistlock} already exists, indicating another spider is running over the same rule.")
0148
0149 mod_timestamp = Path(lakelistlock).stat().st_mtime
0150 mod_datetime = datetime.fromtimestamp(mod_timestamp)
0151 time_difference = datetime.now() - mod_datetime
0152 threshold = 8 * 60 * 60
0153 if time_difference.total_seconds() > threshold:
0154 WARN(f"lock file is already {time_difference.total_seconds()} seconds old. Overriding.")
0155 else:
0156 exit(0)
0157 if not args.dryrun:
0158 Path(lakelistlock).parent.mkdir(parents=True,exist_ok=True)
0159 Path(lakelistlock).touch()
0160 INFO(f"Looking for existing filelist {lakelistname}")
0161 if not Path(lakelistname).exists():
0162 INFO(" ... not found. Creating a new one.")
0163 findcommand=f"{lfind} {lakelocation} -type f -name {dstbase}\*.root\* > {lakelistname}; wc -l {lakelistname}"
0164 DEBUG(f"Using:\n{findcommand}")
0165 ret = shell_command(findcommand)
0166 INFO(f"Found {ret[0]} matching dsts without cuts in the lake, piped into {ret[1]}")
0167 else:
0168 wccommand=f"wc -l {lakelistname}"
0169 ret = shell_command(wccommand)
0170 INFO(f" ... found. List contains {ret[0]} files.")
0171
0172
0173 nfiles_to_process=500000
0174 exhausted=False
0175 lakefiles=[]
0176 tmpname=f"{lakelistname}.tmp"
0177 with open(lakelistname,"r") as infile, open(f"{lakelistname}.tmp", "w") as smallerlakefile:
0178 for _ in range(nfiles_to_process):
0179 line=infile.readline()
0180 if line:
0181 lakefiles.append(line.strip())
0182 else:
0183 exhausted=True
0184 break
0185 for line in infile:
0186 smallerlakefile.write(line)
0187 if not args.dryrun:
0188 shutil.move(tmpname,lakelistname)
0189 if exhausted:
0190 INFO("Used up all previously found lake files. Next call will create a new list")
0191 Path(lakelistname).unlink(missing_ok=True)
0192 else:
0193 Path(tmpname).unlink(missing_ok=True)
0194
0195 if not args.dryrun:
0196 Path(lakelistlock).unlink()
0197
0198
0199 mvfiles_info=[]
0200 for file in lakefiles:
0201 lfn=Path(file).name
0202 dsttype,run,seg,_=parse_lfn(lfn,rule)
0203 if binary_contains_bisect(rule.runlist_int,run):
0204 fullpath,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(file)
0205 if dbid <= 0:
0206 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0207 exit(0)
0208 info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5,size,ctime)
0209 mvfiles_info.append( (file,info) )
0210
0211 INFO(f"{len(mvfiles_info)} total root files to be processed.")
0212
0213 finaldir_tmpl=filesystem['finaldir']
0214 INFO(f"Final destination template: {finaldir_tmpl}")
0215
0216 input_stubs = inputs_from_output[rule.dsttype]
0217 DEBUG(f"Input stub(s): {input_stubs}")
0218 dataset = rule.dataset
0219 INFO(f"Dataset identifier: {dataset}")
0220 leaf_template = f'{rule.dsttype}'
0221 if 'raw' in rule.input_config.db:
0222 leaf_template += '_{host}'
0223 leaf_types = { f'{leaf_template}'.format(host=host) for host in input_stubs.keys() }
0224 else:
0225 leaf_types=[rule.dsttype]
0226 INFO(f"Destination type template: {leaf_template}")
0227 DEBUG(f"Destination types: {leaf_types}")
0228
0229
0230 tstart = datetime.now()
0231 tlast = tstart
0232 chunksize=2000
0233 fmax=len(mvfiles_info)
0234
0235 chunked_mvfiles = make_chunks(mvfiles_info, chunksize)
0236 for i, chunk in enumerate(chunked_mvfiles):
0237 now = datetime.now()
0238 print( f'DST #{i*chunksize}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({chunksize/(now - tlast).total_seconds():.2f} Hz). ' )
0239 print( f' time since the start: \t {(now - tstart).total_seconds():.2f} seconds (cum. {i*chunksize/(now - tstart).total_seconds():.2f} Hz). ' )
0240 tlast = now
0241
0242 fullinfo_chunk=[]
0243 seen_lfns=set()
0244 for file_and_info in chunk:
0245 file,info=file_and_info
0246 dsttype,run,seg,lfn,nevents,first,last,md5,size,time=info
0247
0248
0249 if lfn in seen_lfns:
0250 WARN(f"We already have a file with lfn {lfn}. Deleting {file}.")
0251 Path(file).unlink(missing_ok=True)
0252 continue
0253 seen_lfns.add(lfn)
0254
0255
0256 leaf=None
0257 for leaf_type in leaf_types:
0258 if lfn.startswith(leaf_type):
0259 leaf=leaf_type
0260 break
0261 if leaf is None:
0262 ERROR(f"Unknown file type: {lfn}")
0263 ERROR(f"Full file name: {file}")
0264 exit(-1)
0265
0266
0267 rungroup= rule.job_config.rungroup_tmpl.format(a=100*math.floor(run/100), b=100*math.ceil((run+1)/100))
0268 finaldir = finaldir_tmpl.format( leafdir=leaf, rungroup=rungroup )
0269
0270 if not args.dryrun:
0271 Path(finaldir).mkdir( parents=True, exist_ok=True )
0272
0273 full_file_path = f'{finaldir}/{lfn}'
0274 fullinfo_chunk.append(full_db_info(
0275 origfile=file,
0276 info=info,
0277 lfn=lfn,
0278 full_file_path=full_file_path,
0279 dataset=rule.dataset,
0280 tag=rule.outtriplet,
0281 ))
0282
0283
0284
0285
0286 try:
0287 upsert_filecatalog(fullinfos=fullinfo_chunk,
0288 dryrun=args.dryrun
0289 )
0290 except Exception as e:
0291 WARN(f"dstspider is ignoring the database exception and moving on.")
0292
0293
0294 continue
0295 exit(1)
0296 if not args.dryrun:
0297 for fullinfo in fullinfo_chunk:
0298 try:
0299 os.rename( fullinfo.origfile, fullinfo.full_file_path )
0300
0301 except Exception as e:
0302 WARN(e)
0303
0304
0305
0306 pass
0307
0308 if args.profile:
0309 profiler.disable()
0310 DEBUG("Profiling finished. Printing stats...")
0311 stats = pstats.Stats(profiler)
0312 stats.strip_dirs().sort_stats('time').print_stats(10)
0313
0314 INFO(f"{Path(sys.argv[0]).name} DONE.")
0315
0316
0317
0318 if __name__ == '__main__':
0319 main()
0320 exit(0)