File indexing completed on 2026-04-27 07:41:40
0001
0002
0003 import pyodbc
0004 from pathlib import Path
0005 from datetime import datetime
0006 import yaml
0007 import cProfile
0008 import subprocess
0009 import sys
0010 import shutil
0011
0012
0013 import pprint
0014
0015 from argparsing import submission_args
0016 from sphenixmisc import setup_rot_handler, should_I_quit
0017 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0018 from sphenixprodrules import RuleConfig,list_to_condition
0019 from sphenixprodrules import parse_lfn
0020 from sphenixdbutils import test_mode as dbutils_test_mode
0021 from sphenixdbutils import cnxn_string_map
0022 from sphenixmisc import remove_empty_directories, binary_contains_bisect, make_chunks
0023
0024
0025 def delQuery( cnxn_string, query ):
0026 if 'delete' not in query:
0027 WARN(f'delQuery called without "delete". Query: {query}')
0028
0029 DEBUG(f'[cnxn_string] {cnxn_string}')
0030 DEBUG(f'[query ]\n{query}')
0031 conn = pyodbc.connect( cnxn_string )
0032 curs = conn.cursor()
0033 curs.execute( query )
0034 curs.commit()
0035 return(curs.rowcount)
0036
0037
0038
0039 def main():
0040 """Instantiate a given rule and remove all logfiles, histograms, dsts, database entries etc.
0041 TODO: The first hundred or so lines are shared with protospider and create_submission.
0042 Refactor when there's downtime.
0043 """
0044
0045
0046 args = submission_args()
0047
0048
0049 if not args.dryrun:
0050 answer = input("This is not a drill. Do you want to continue? (yes/no): ")
0051 if answer.lower() != "yes":
0052 print("Exiting. Smart.")
0053 exit(0)
0054 else:
0055 print("Here we go deleting then.")
0056
0057
0058 test_mode = (
0059 dbutils_test_mode
0060 or args.test_mode
0061
0062 )
0063
0064
0065 sublogdir=setup_rot_handler(args)
0066 slogger.setLevel(args.loglevel)
0067
0068
0069 if should_I_quit(args=args, myname=sys.argv[0]):
0070 DEBUG("Stop.")
0071 exit(0)
0072
0073 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0074
0075 if test_mode:
0076 INFO("Running in testbed mode.")
0077 args.mangle_dirpath = 'production-testbed'
0078 else:
0079 INFO("Running in production mode.")
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089 param_overrides = {}
0090 param_overrides["runs"]=args.runs
0091 param_overrides["runlist"]=args.runlist
0092 param_overrides["nevents"] = 0
0093
0094
0095 if args.physicsmode is not None:
0096 param_overrides["physicsmode"] = args.physicsmode
0097
0098 if args.mangle_dstname:
0099 DEBUG("Mangling DST name")
0100 param_overrides['DST']=args.mangle_dstname
0101
0102
0103
0104 param_overrides["prodmode"] = "production"
0105 if args.mangle_dirpath:
0106 param_overrides["prodmode"] = args.mangle_dirpath
0107
0108 CHATTY(f"Rule substitutions: {param_overrides}")
0109 INFO("Now loading and building rule configuration.")
0110
0111
0112 try:
0113 rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0114 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0115 except (ValueError, FileNotFoundError) as e:
0116 ERROR(f"Error: {e}")
0117 exit(1)
0118
0119 CHATTY("Rule configuration:")
0120 CHATTY(yaml.dump(rule.dict))
0121
0122
0123
0124
0125 lfind = shutil.which('lfs')
0126 if lfind is None:
0127 WARN("'lfs find' not found.")
0128 lfind = shutil.which('find')
0129 else:
0130 lfind = f'{lfind} find'
0131 INFO(f'Using "{lfind}.')
0132
0133
0134
0135 condor_batchname=rule.job_config.batch_name
0136
0137 condor_running_command=f"condor_q -const 'JobBatchName==\"{condor_batchname}\"' -format '%d\\n' ClusterId |wc -l"
0138 condor_running=0
0139 try:
0140 condor_running = subprocess.run(condor_running_command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()[0]
0141 DEBUG("Command successful!")
0142 except subprocess.CalledProcessError as e:
0143 print("Command failed with exit code:", e.returncode)
0144 finally:
0145 pass
0146
0147 WARN(f"About to kill {condor_running} condor jobs for JobBatchName==\"{condor_batchname}\"" )
0148 condor_rm_command=f"condor_rm -long -const 'JobBatchName==\"{condor_batchname}\"' | grep -c ^job_"
0149 WARN(f"{condor_rm_command}")
0150 if not args.dryrun:
0151 condor_rm='0'
0152 try:
0153 condor_rm = subprocess.run(condor_rm_command, shell=True, check=True, capture_output=True).stdout.decode('utf-8')
0154 DEBUG("Command successful!")
0155 except subprocess.CalledProcessError as e:
0156 print("Command failed with exit code:", e.returncode)
0157 finally:
0158 pass
0159 WARN(f"Killed {condor_rm} jobs using {condor_rm_command}" )
0160
0161
0162 submission_dir = Path('./tosubmit').resolve()
0163 subbase = f'{rule.rulestem}_{rule.outstub}_{rule.outdataset}'
0164 INFO(f'Submission files based on {subbase}')
0165 existing_sub_files = list(Path(submission_dir).glob(f'{subbase}*.in'))
0166 existing_sub_files += list(Path(submission_dir).glob(f'{subbase}*.sub'))
0167 if existing_sub_files:
0168 WARN(f"Removing {int(len(existing_sub_files)/2)} existing submission file pairs for base: {subbase}")
0169 for f_to_delete in existing_sub_files:
0170 CHATTY(f"Deleting: {f_to_delete}")
0171 if not args.dryrun:
0172 Path(f_to_delete).unlink(missing_ok=True)
0173 if Path(submission_dir).is_dir() and not any(Path(submission_dir).iterdir()):
0174 WARN(f"Submission directory is empty. Removing {submission_dir}")
0175 if not args.dryrun:
0176 Path(submission_dir).rmdir()
0177
0178
0179 filesystem = rule.job_config.filesystem
0180 DEBUG(f"Filesystem: {filesystem}")
0181 dstbase = f'{rule.rulestem}\*{rule.outstub}_{rule.outdataset}\*'
0182 INFO(f'DST files filtered as {dstbase}')
0183
0184 lakelocation=filesystem['outdir']
0185 INFO(f"Original output directory: {lakelocation}")
0186 findcommand = f"{lfind} {lakelocation} -type f -name {dstbase}\*.root\*"
0187 findcommand=findcommand.replace('\*\*','\*')
0188 INFO(f"Find command: {findcommand}")
0189 lakefiles=[]
0190 if Path(lakelocation).is_dir():
0191 lakefiles = subprocess.run(findcommand, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0192 print(f"Found {len(lakefiles)} matching dsts sans runnumber cut in the lake.")
0193 del_lakefiles=[]
0194 for f_to_delete in lakefiles:
0195 lfn=Path(f_to_delete).name
0196 _,run,_,_=parse_lfn(lfn,rule)
0197 if binary_contains_bisect(rule.runlist_int,run):
0198 del_lakefiles.append(f_to_delete)
0199 WARN(f"Removing {len(del_lakefiles)} .root {lakelocation}")
0200 for f_to_delete in del_lakefiles:
0201 CHATTY(f"Deleting: {f_to_delete}")
0202 if not args.dryrun:
0203 Path(f_to_delete).unlink(missing_ok=True)
0204
0205 findcommand = f"{lfind} {lakelocation} -type f -name {dstbase}\*.finished\*"
0206 findcommand=findcommand.replace('\*\*','\*')
0207 INFO(f"Find command: {findcommand}")
0208
0209 finishedlakefiles = []
0210 if Path(lakelocation).is_dir():
0211 finishedlakefiles = subprocess.run(findcommand, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0212 print(f"Found {len(finishedlakefiles)} matching .finished files in the lake.")
0213
0214 del_lakefiles=[]
0215 for f_to_delete in finishedlakefiles:
0216 lfn=Path(f_to_delete).name
0217 _,run,_,_=parse_lfn(lfn,rule)
0218 if binary_contains_bisect(rule.runlist_int,run):
0219 del_lakefiles.append(f_to_delete)
0220 WARN(f"Removing {len(del_lakefiles)} .finished files in the lake at {lakelocation}")
0221 for f_to_delete in del_lakefiles:
0222 CHATTY(f"Deleting: {f_to_delete}")
0223 if not args.dryrun:
0224 Path(f_to_delete).unlink(missing_ok=True)
0225
0226
0227 if Path(lakelocation).is_dir() and not any(Path(lakelocation).iterdir()):
0228 WARN(f"DST lake is empty. Removing {lakelocation}")
0229 if not args.dryrun:
0230 Path(lakelocation).rmdir()
0231
0232
0233 finaldir_tmpl=filesystem['finaldir']
0234 INFO(f"Final destination template: {finaldir_tmpl}")
0235
0236
0237
0238
0239
0240
0241
0242
0243
0244
0245
0246
0247
0248
0249
0250
0251
0252 try:
0253 finaldir_glob = finaldir_tmpl.format(leafdir='*',rungroup='*')
0254 except Exception as e:
0255 ERROR(f"Trying to globify {finaldir_tmpl} failed. Error:\n{e}")
0256 exit(-1)
0257 final_dsts_command=f"{lfind} {finaldir_glob} -type f -name {dstbase}\*"
0258 INFO(f"Find command for moved DSTs: {final_dsts_command}")
0259 all_final_dsts =[]
0260 try:
0261 all_final_dsts = subprocess.run(final_dsts_command, shell=True, check=True, capture_output=True)
0262 all_final_dsts = all_final_dsts.stdout.decode('utf-8').splitlines()
0263 DEBUG("Command successful! len(all_final_dsts)={len(all_final_dsts)}")
0264 except subprocess.CalledProcessError as e:
0265 print("Command failed with exit code:", e.returncode)
0266 finally:
0267 pass
0268
0269 del_final_dsts = []
0270 for dst in all_final_dsts:
0271 lfn=Path(dst).name
0272 _,run,_,_=parse_lfn(lfn,rule)
0273 if binary_contains_bisect(rule.runlist_int,run):
0274 del_final_dsts.append(dst)
0275 WARN(f"Removing {len(del_final_dsts)} of the {len(all_final_dsts)} DSTs found by:\n{final_dsts_command}")
0276 for f_to_delete in del_final_dsts:
0277 CHATTY(f"Deleting: {f_to_delete}")
0278 if not args.dryrun:
0279 Path(f_to_delete).unlink(missing_ok=True)
0280
0281
0282
0283
0284 chunk_size = 500
0285 chunked_dsts = list(make_chunks(del_final_dsts, chunk_size))
0286 dbstring = 'testw' if test_mode else 'fcw'
0287 files_table='test_files' if test_mode else 'files'
0288 datasets_table='test_datasets' if test_mode else 'datasets'
0289 WARN(f"Deleting {len(del_final_dsts)} DST rows from table {files_table} and from table {datasets_table}")
0290
0291
0292 del_files_tmpl="""
0293 select count(lfn) from {files_table}
0294 where lfn in ({lfns})
0295 """
0296
0297 del_datasets_tmpl="""
0298 select count(filename) from {datasets_table}
0299 where filename in ({lfns})
0300 """
0301 if not args.dryrun:
0302 del_files_tmpl=del_files_tmpl.replace("select count(lfn) from", "delete from")
0303 del_datasets_tmpl=del_datasets_tmpl.replace("select count(filename) from", "delete from")
0304
0305 for i, chunk in enumerate(chunked_dsts):
0306 lfns=[]
0307 for dst in chunk:
0308 lfns.append(f"'{Path(dst).name}'")
0309 del_files_db=del_files_tmpl.format(
0310 files_table=files_table,
0311 lfns=",".join(lfns)
0312 )
0313 del_datasets_db=del_datasets_tmpl.format(
0314 datasets_table=datasets_table,
0315 lfns=",".join(lfns)
0316 )
0317 CHATTY(del_files_db )
0318 CHATTY(del_datasets_db )
0319 if not args.dryrun:
0320 response = delQuery( cnxn_string_map[ dbstring ], del_files_db )
0321 DEBUG(f"Delete chunk {i} from files db, response: {response}")
0322 response = delQuery( cnxn_string_map[ dbstring ], del_datasets_db )
0323 DEBUG(f"Delete chunk {i} from datasets db, response: {response}")
0324
0325
0326
0327
0328 final_dirs_command=f"{lfind} {finaldir_glob} -type d"
0329 INFO(f"Find command: {final_dirs_command}")
0330
0331 all_final_dirs =[]
0332 try:
0333 all_final_dirs = subprocess.run(final_dirs_command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0334 DEBUG("Command successful!")
0335 except subprocess.CalledProcessError as e:
0336
0337 print("Command failed with exit code:", e.returncode)
0338 finally:
0339 pass
0340
0341 INFO(f"{final_dirs_command} found {len(all_final_dirs)} directories. Removing the empty ones.")
0342 if not args.dryrun:
0343 remove_empty_directories( set(all_final_dirs) )
0344
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355
0356
0357
0358
0359 datadir_tmpl=Path(filesystem['histdir']).parent
0360 if datadir_tmpl!=Path(filesystem['logdir']).parent or datadir_tmpl!=Path(filesystem['condor']).parent:
0361 ERROR("Assumption that the root of histdir, logdir, condor is the same failed.")
0362 print(f"histdir: {filesystem['histdir']}")
0363 print(f"logdir: {filesystem['logdir']}")
0364 print(f"condor: {filesystem['condor']}")
0365
0366 try:
0367 datadir_glob = str(datadir_tmpl).format(leafdir='*',rungroup='*')
0368 except Exception as e:
0369 ERROR(f"Trying to globify {datadir_glob} failed. Error:\n{e}")
0370 exit(-1)
0371
0372 final_data_command=f"find {datadir_glob} -type f -name {dstbase}\*.out -o -name {dstbase}\*.err -o -name {dstbase}\*.condor -o -name HIST_{dstbase}\*.root"
0373 INFO(final_data_command)
0374 all_final_data=[]
0375 try:
0376 all_final_data = subprocess.run(final_data_command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0377 DEBUG("Command successful!")
0378 except subprocess.CalledProcessError as e:
0379 print("Command failed with exit code:", e.returncode)
0380 finally:
0381 pass
0382 WARN(f"Found {len(all_final_data)} histogram and log files.")
0383 del_final_data = []
0384 for data in all_final_data:
0385 lfn=Path(data).name
0386 _,run,seg,end=parse_lfn(lfn,rule)
0387 if binary_contains_bisect(rule.runlist_int,run):
0388 del_final_data.append(data)
0389
0390 WARN(f"Removing {len(del_final_data)} of the {len(all_final_data)} log and histo files found by:\n{final_data_command}")
0391 for f_to_delete in del_final_data:
0392 CHATTY(f"Deleting: {f_to_delete}")
0393 if not args.dryrun:
0394 Path(f_to_delete).unlink(missing_ok=True)
0395
0396
0397 histnumber= sum('HIST_' in s for s in del_final_data)
0398 WARN(f"Deleting {histnumber} histogram files from rows from table {files_table} and from table {datasets_table}. Also deleting the other data files if they somehow made it in.")
0399 chunked_data = list(make_chunks(del_final_data, chunk_size))
0400 for i, chunk in enumerate(chunked_data):
0401 lfns=[]
0402 for data in chunk:
0403 lfns.append(f"'{Path(data).name}'")
0404 del_files_db=del_files_tmpl.format(
0405 files_table=files_table,
0406 lfns=",".join(lfns)
0407 )
0408 del_datasets_db=del_datasets_tmpl.format(
0409 datasets_table=datasets_table,
0410 lfns=",".join(lfns)
0411 )
0412 CHATTY(del_files_db )
0413 CHATTY(del_datasets_db )
0414 if not args.dryrun:
0415 response = delQuery( cnxn_string_map[ dbstring ], del_files_db )
0416 DEBUG(f"Delete chunk {i} from files db, response: {response}")
0417 response = delQuery( cnxn_string_map[ dbstring ], del_datasets_db )
0418 DEBUG(f"Delete chunk {i} from datasets db, response: {response}")
0419
0420
0421 datatrunk=datadir_glob.replace("/*","")
0422 datatrunk=f"{datatrunk}/{rule.rulestem}*"
0423 data_dirs_find=f"find {datatrunk} -type d -empty"
0424 empty_data_dirs=[]
0425 try:
0426 empty_data_dirs = subprocess.run(data_dirs_find, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0427 DEBUG("Command successful!")
0428 except subprocess.CalledProcessError as e:
0429 print("Command failed with exit code:", e.returncode)
0430 finally:
0431 pass
0432 INFO(f"{len(empty_data_dirs)} empty leaf directories found with {data_dirs_find}. Removing.")
0433 if not args.dryrun:
0434 remove_empty_directories( set(empty_data_dirs) )
0435
0436 sqldstbase=dstbase.replace("\*","%")
0437 prodrun_condition=list_to_condition(rule.runlist_int,"run")
0438
0439 del_prod_state = f"""
0440 delete from production_status
0441 where
0442 dstname like '{sqldstbase}'
0443 and {prodrun_condition}
0444 returning *
0445 """
0446 WARN(del_prod_state+";")
0447 if not args.dryrun:
0448 response = delQuery( cnxn_string_map[ "statw" ], del_prod_state )
0449 DEBUG(f"Delete states from prod db, response: {response}")
0450
0451 INFO("Done.")
0452 if not args.dryrun:
0453 WARN("You should run this again in a minute or two, to catch files still trickling in from killed jobs.")
0454 exit(0)
0455
0456
0457
0458 if __name__ == '__main__':
0459 ERROR("This script is currently not functional. It needs an overhaul following the logic in dstspider, histspider.")
0460 ERROR("Furthermore, it doesn't work well at the scale of full production.")
0461 exit(1)
0462
0463
0464
0465
0466
0467 cProfile.run('main()', '/tmp/sphenixprod.prof')
0468 import pstats
0469 p = pstats.Stats('/tmp/sphenixprod.prof')
0470 p.strip_dirs().sort_stats('time').print_stats(10)
0471