File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from pathlib import Path
0004 from datetime import datetime
0005 import yaml
0006 import cProfile
0007 import pstats
0008 import sys
0009 import shutil
0010 import os
0011
0012
0013 import pprint
0014
0015 from argparsing import submission_args
0016 from sphenixmisc import setup_rot_handler, should_I_quit, make_chunks
0017 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0018 from sphenixprodrules import RuleConfig
0019 from sphenixmatching import MatchConfig, parse_lfn, parse_spiderstuff
0020 from sphenixdbutils import test_mode as dbutils_test_mode
0021 from sphenixdbutils import long_filedb_info, filedb_info, full_db_info, upsert_filecatalog, update_proddb
0022 from sphenixmisc import binary_contains_bisect,shell_command,lock_file,unlock_file
0023
0024
0025
0026 def main():
0027
0028 args = submission_args()
0029
0030
0031 test_mode = (
0032 dbutils_test_mode
0033 or args.test_mode
0034
0035 )
0036
0037
0038 sublogdir=setup_rot_handler(args)
0039 slogger.setLevel(args.loglevel)
0040
0041
0042 if should_I_quit(args=args, myname=sys.argv[0]):
0043 DEBUG("Stop.")
0044 exit(0)
0045 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0046
0047 if args.profile:
0048 DEBUG( "Profiling is ENABLED.")
0049 profiler = cProfile.Profile()
0050 profiler.enable()
0051
0052 INFO("Starting dstspider.")
0053 INFO(sys.argv)
0054
0055 if test_mode:
0056 INFO("Running in testbed mode.")
0057 args.mangle_dirpath = 'production-testbed'
0058 else:
0059 INFO("Running in production mode.")
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069 param_overrides = {}
0070 param_overrides["runs"]=args.runs
0071 param_overrides["runlist"]=args.runlist
0072 param_overrides["nevents"] = 0
0073
0074
0075 if args.physicsmode is not None:
0076 param_overrides["physicsmode"] = args.physicsmode
0077
0078
0079
0080 param_overrides["prodmode"] = "production"
0081 if args.mangle_dirpath:
0082 param_overrides["prodmode"] = args.mangle_dirpath
0083
0084 CHATTY(f"Rule substitutions: {param_overrides}")
0085 INFO("Now loading and building rule configuration.")
0086
0087
0088 try:
0089 rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0090 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0091 except (ValueError, FileNotFoundError) as e:
0092 ERROR(f"Error: {e}")
0093 exit(1)
0094
0095 CHATTY("Rule configuration:")
0096 CHATTY(yaml.dump(rule.dict))
0097 filesystem = rule.job_config.filesystem
0098
0099
0100 match_config = MatchConfig.from_rule_config(rule)
0101 CHATTY("Match configuration:")
0102 CHATTY(yaml.dump(match_config.dict))
0103
0104
0105
0106 dstlistname=filesystem['histdir']
0107 dstlistname=dstlistname.split("{")[0]
0108 while dstlistname.endswith("/"):
0109 dstlistname=dstlistname[0:-1]
0110
0111 dstlistname=f"{dstlistname}/{args.rulename}_dstlist"
0112
0113
0114 if not lock_file(dstlistname,args.dryrun):
0115 exit(0)
0116
0117 INFO(f"Looking for existing filelist {dstlistname}")
0118 if Path(dstlistname).exists():
0119 INFO( " ... found.")
0120 else:
0121 INFO(" ... not found. Creating a new one.")
0122 Path(dstlistname).parent.mkdir( parents=True, exist_ok=True )
0123 match_config.get_output_files(r"\*root:\*",dstlistname,args.dryrun)
0124
0125 if not Path(dstlistname).is_file():
0126 INFO("List file not found.")
0127 exit(0)
0128
0129 wccommand=f"wc -l {dstlistname}"
0130 ret = shell_command(wccommand)
0131 INFO(f"List contains {ret[0]} files.")
0132
0133
0134 nfiles_to_process=500000
0135 exhausted=False
0136 dstfiles=[]
0137 tmpname=f"{dstlistname}.tmp"
0138 with open(dstlistname,"r") as infile, open(f"{dstlistname}.tmp", "w") as smallerdstlist:
0139 for _ in range(nfiles_to_process):
0140 line=infile.readline()
0141 if line:
0142 dstfiles.append(line.strip())
0143 else:
0144 exhausted=True
0145 break
0146 for line in infile:
0147 smallerdstlist.write(line)
0148 if not args.dryrun:
0149 shutil.move(tmpname,dstlistname)
0150 if exhausted:
0151 INFO("Used up all previously found dst files. Next call will create a new list")
0152 Path(dstlistname).unlink(missing_ok=True)
0153 else:
0154 Path(tmpname).unlink(missing_ok=True)
0155
0156
0157 unlock_file(dstlistname,args.dryrun)
0158
0159
0160 mvfiles_info=[]
0161 for file in dstfiles:
0162 lfn=Path(file).name
0163 dsttype,run,seg,_=parse_lfn(lfn,rule)
0164 if binary_contains_bisect(rule.runlist_int,run):
0165 fullpath,nevents,first,last,md5,size,ctime,dbid = parse_spiderstuff(file)
0166 if dbid <= 0:
0167 ERROR("dbid is {dbid}. Can happen for legacy files, but it shouldn't currently.")
0168 exit(0)
0169 info=filedb_info(dsttype,run,seg,fullpath,nevents,first,last,md5,size,ctime)
0170 mvfiles_info.append( (file,info) )
0171
0172 INFO(f"{len(mvfiles_info)} total root files to be processed.")
0173
0174
0175 tstart = datetime.now()
0176 tlast = tstart
0177 chunksize=2000
0178 fmax=len(mvfiles_info)
0179
0180 chunked_mvfiles = make_chunks(mvfiles_info, chunksize)
0181 for i, chunk in enumerate(chunked_mvfiles):
0182 now = datetime.now()
0183 print( f'DST #{i*chunksize}/{fmax}, time since previous output:\t {(now - tlast).total_seconds():.2f} seconds ({chunksize/(now - tlast).total_seconds():.2f} Hz). ' )
0184 print( f' time since the start: \t {(now - tstart).total_seconds():.2f} seconds (cum. {i*chunksize/(now - tstart).total_seconds():.2f} Hz). ' )
0185 tlast = now
0186
0187 fullinfo_chunk=[]
0188 seen_lfns=set()
0189 for file_and_info in chunk:
0190 file,info=file_and_info
0191 dsttype,run,seg,lfn,nevents,first,last,md5,size,time=info
0192
0193
0194 if lfn in seen_lfns:
0195 existing = str(Path(file).parent)+'/'+lfn
0196 INFO(f"We already have a file with lfn {lfn}. Deleting {existing}.")
0197 if not args.dryrun:
0198 Path(existing).unlink(missing_ok=True)
0199 continue
0200 seen_lfns.add(lfn)
0201
0202 fileparent=Path(file).parent
0203 full_file_path = f'{fileparent}/{lfn}'
0204 fullinfo_chunk.append(full_db_info(
0205 origfile=file,
0206 info=info,
0207 lfn=lfn,
0208 full_file_path=full_file_path,
0209 dataset=rule.dataset,
0210 tag=rule.outtriplet,
0211 ))
0212
0213
0214
0215
0216 try:
0217 upsert_filecatalog(fullinfos=fullinfo_chunk,
0218 dryrun=args.dryrun
0219 )
0220 except Exception as e:
0221 WARN( f"dstspider is ignoring the database exception and moving on: {e}")
0222
0223
0224 continue
0225 exit(1)
0226
0227 if not args.dryrun:
0228 for fullinfo in fullinfo_chunk:
0229 try:
0230 os.rename( fullinfo.origfile, fullinfo.full_file_path )
0231
0232 except Exception as e:
0233 WARN(e)
0234
0235
0236
0237 pass
0238
0239 if args.profile:
0240 profiler.disable()
0241 DEBUG("Profiling finished. Printing stats...")
0242 stats = pstats.Stats(profiler)
0243 stats.strip_dirs().sort_stats('time').print_stats(10)
0244
0245 INFO(f"{Path(sys.argv[0]).name} DONE.")
0246
0247
0248
0249 if __name__ == '__main__':
0250 main()
0251 exit(0)