File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from pathlib import Path
0004 import sys
0005
0006 import pprint
0007
0008 from argparsing import submission_args
0009 from sphenixdbutils import test_mode as dbutils_test_mode
0010 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0011 from sphenixprodrules import RuleConfig
0012 from sphenixmatching import MatchConfig
0013 from sphenixmisc import setup_rot_handler, should_I_quit, make_chunks
0014 from sphenixmisc import read_batches,lock_file, unlock_file
0015 from sphenixmisc import shell_command
0016 from sphenixdbutils import cnxn_string_map, dbQuery
0017
0018 def eradicate_runs(match_config: MatchConfig, dryrun: bool=True, delete_files: bool=False):
0019 """ Run this script to remove files and db entries
0020 so that the run can be resubmitted.
0021 (Ex.: With different resource specs)
0022 """
0023
0024
0025
0026
0027 dataset=match_config.dataset
0028 outtriplet=match_config.outtriplet
0029 dsttype=match_config.dsttype
0030 runlist=match_config.runlist_int
0031 DEBUG(dataset)
0032 DEBUG(dsttype)
0033 DEBUG(outtriplet)
0034 filesystem=match_config.filesystem
0035
0036
0037
0038
0039
0040
0041
0042
0043 dstlistname=filesystem['logdir']
0044 dstlistname=dstlistname.split("{")[0]
0045 while dstlistname.endswith("/"):
0046 dstlistname=dstlistname[0:-1]
0047
0048 dstlistname=f"{dstlistname}/{match_config.input_config.rule_name}_dstlist"
0049
0050 if not delete_files:
0051 WARN(f"Not deleting files, because --force-delete not given.")
0052 else:
0053 WARN(f"Deleting files, because --force-delete given.")
0054 if not lock_file(file_path=dstlistname, dryrun=dryrun, max_lock_age=30*60):
0055 ERROR( "Not safe to proceed without intervention.")
0056 exit(1)
0057 rootfiles=match_config.get_output_files(filemask=r"\*root\*",dstlistname=dstlistname,dryrun=dryrun)
0058 nfiles=0
0059 if not dstlistname:
0060 nfiles=len(rootfiles)
0061 else:
0062 if Path(dstlistname).exists():
0063 wccommand=f"wc -l {dstlistname}"
0064 ret = shell_command(wccommand)
0065 nfiles = int(ret[0])
0066
0067 INFO(f"Found {nfiles} existing files to delete.")
0068
0069 filebatches=[]
0070 if nfiles>0:
0071 filebatches=read_batches(dstlistname,100000) if dstlistname else (rootfiles,)
0072 for i,batch in enumerate(filebatches):
0073 INFO(f"Processing batch {i}, length is {len(batch)} lines.")
0074 if delete_files and not dryrun:
0075 for rootfile in batch:
0076 Path(rootfile).unlink(missing_ok=True)
0077 pass
0078
0079 if not dryrun:
0080 unlock_file(file_path=dstlistname,dryrun=dryrun)
0081
0082
0083
0084
0085 existing_status=match_config.get_prod_status(runlist)
0086 existing_status=list(existing_status.keys())
0087 INFO(f"Found {len(existing_status)} output files in the production db")
0088
0089
0090 dbstring = 'statw'
0091 status_query="SELECT id" if dryrun else "DELETE"
0092 status_query+="""
0093 FROM production_status
0094 WHERE
0095 dstfile in
0096 """
0097 chunksize=5000
0098 statusmax=len(existing_status)
0099 for i,statuschunk in enumerate(make_chunks(existing_status,chunksize)):
0100 statuschunk_str="','".join(statuschunk)
0101
0102 DEBUG( f'Removing file #{i*chunksize}/{statusmax} from database production_status')
0103 prod_curs = dbQuery( cnxn_string_map[ dbstring ], status_query+f"( '{statuschunk_str}' )" )
0104 if prod_curs:
0105 prod_curs.commit()
0106 else:
0107 ERROR("Failed to delete file(s) from production database")
0108 exit(1)
0109
0110
0111
0112
0113 existing_lfns=match_config.get_files_in_db(runlist)
0114 INFO(f"Found {len(existing_lfns)} entries in the FileCatalog")
0115
0116
0117 dbstring = 'testw' if dbutils_test_mode else 'fcw'
0118 datasets_table='test_datasets' if dbutils_test_mode else 'datasets'
0119 datasets_query="SELECT" if dryrun else "DELETE"
0120 datasets_query+=f"""
0121 FROM {datasets_table}
0122 WHERE
0123 filename in
0124 """
0125
0126 files_table='test_files' if dbutils_test_mode else 'files'
0127 files_query="SELECT" if dryrun else "DELETE"
0128 files_query+=f"""
0129 FROM {files_table}
0130 WHERE
0131 lfn in
0132 """
0133
0134 chunksize=5000
0135 lfnmax=len(existing_lfns)
0136 for i,lfnchunk in enumerate(make_chunks(existing_lfns,chunksize)):
0137 DEBUG( f'File #{i*chunksize}/{lfnmax}' )
0138 lfnchunk_str="','".join(lfnchunk)
0139
0140 DEBUG( f'Removing from database {files_table}')
0141 files_curs = dbQuery( cnxn_string_map[ dbstring ], files_query+f"( '{lfnchunk_str}' )" )
0142 if files_curs:
0143 files_curs.commit()
0144 else:
0145 ERROR("Failed to delete file(s) from files database")
0146 exit(1)
0147
0148 DEBUG( f'Removing from database {datasets_table}' )
0149 datasets_curs = dbQuery( cnxn_string_map[ dbstring ], datasets_query+f"( '{lfnchunk_str}' )" )
0150 if datasets_curs:
0151 datasets_curs.commit()
0152 else:
0153 ERROR("Failed to delete file(s) from datasets database")
0154 exit(1)
0155
0156 return
0157
0158 def main():
0159 args = submission_args()
0160
0161
0162 test_mode = (
0163 dbutils_test_mode
0164 or args.test_mode
0165
0166 )
0167
0168
0169 sublogdir=setup_rot_handler(args)
0170 slogger.setLevel(args.loglevel)
0171
0172
0173 if should_I_quit(args=args, myname=sys.argv[0]):
0174 DEBUG("Stop.")
0175 exit(0)
0176 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0177
0178 if test_mode:
0179 INFO("Running in testbed mode.")
0180 args.mangle_dirpath = 'production-testbed'
0181 else:
0182 INFO("Running in production mode.")
0183
0184
0185 param_overrides = {}
0186 param_overrides["runs"] = args.runs
0187 param_overrides["runlist"] = args.runlist
0188 param_overrides["nevents"] = 0
0189
0190 if args.physicsmode is not None:
0191 param_overrides["physicsmode"] = args.physicsmode
0192
0193
0194
0195 param_overrides["prodmode"] = "production"
0196 if args.mangle_dirpath:
0197 param_overrides["prodmode"] = args.mangle_dirpath
0198
0199
0200 try:
0201 rule = RuleConfig.from_yaml_file(
0202 yaml_file=args.config,
0203 rule_name=args.rulename,
0204 param_overrides=param_overrides
0205 )
0206 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0207 except (ValueError, FileNotFoundError) as e:
0208 ERROR(f"Error loading rule configuration: {e}")
0209 exit(1)
0210
0211
0212 match_config = MatchConfig.from_rule_config(rule)
0213 INFO("Match configuration created.")
0214
0215
0216 eradicate_runs(match_config, dryrun=args.dryrun,delete_files=args.force_delete)
0217
0218 INFO(f"{Path(__file__).name} DONE.")
0219
0220 if __name__ == '__main__':
0221 main()
0222 exit(0)