Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 
0003 from pathlib import Path
0004 import sys
0005 
0006 import pprint # noqa F401
0007 
0008 from argparsing import submission_args
0009 from sphenixdbutils import test_mode as dbutils_test_mode
0010 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0011 from sphenixprodrules import RuleConfig
0012 from sphenixmatching import MatchConfig
0013 from sphenixmisc import setup_rot_handler, should_I_quit, make_chunks
0014 from sphenixmisc import read_batches,lock_file, unlock_file
0015 from sphenixmisc import shell_command
0016 from sphenixdbutils import cnxn_string_map, dbQuery
0017 
0018 def eradicate_runs(match_config: MatchConfig, dryrun: bool=True, delete_files: bool=False):
0019     """ Run this script to remove files and db entries
0020     so that the run can be resubmitted.
0021     (Ex.: With different resource specs)
0022     """
0023     # Note: Deletion of physical files is unconnected from the data base
0024     #       because we cannot be sure the two are consistent.
0025     
0026     # Unique identiers for what to process
0027     dataset=match_config.dataset
0028     outtriplet=match_config.outtriplet
0029     dsttype=match_config.dsttype
0030     runlist=match_config.runlist_int
0031     DEBUG(dataset)
0032     DEBUG(dsttype)
0033     DEBUG(outtriplet)
0034     filesystem=match_config.filesystem
0035     
0036     # existing_lfns=match_config.get_files_in_db(runlist)
0037     # exit()
0038     # # print(existing_lfns)
0039 
0040     ### 0. TODO: Identify and kill condor jobs. Basically impossible without also at least running on the submission node.
0041 
0042     ### 1. Delete output ON DISK
0043     dstlistname=filesystem['logdir']
0044     dstlistname=dstlistname.split("{")[0]
0045     while dstlistname.endswith("/"):
0046         dstlistname=dstlistname[0:-1]
0047     #dstlistname=f"{dstlistname}/{args.rulename}_{rule.dsttype}_dstlist"
0048     dstlistname=f"{dstlistname}/{match_config.input_config.rule_name}_dstlist"
0049 
0050     if not delete_files:
0051         WARN(f"Not deleting files, because --force-delete not given.")
0052     else: 
0053         WARN(f"Deleting files, because --force-delete given.")    
0054         if not lock_file(file_path=dstlistname, dryrun=dryrun, max_lock_age=30*60):
0055             ERROR( "Not safe to proceed without intervention.")
0056             exit(1)
0057         rootfiles=match_config.get_output_files(filemask=r"\*root\*",dstlistname=dstlistname,dryrun=dryrun)
0058         nfiles=0
0059         if not dstlistname:
0060             nfiles=len(rootfiles)
0061         else:
0062             if Path(dstlistname).exists():
0063                 wccommand=f"wc -l {dstlistname}"
0064                 ret = shell_command(wccommand)
0065                 nfiles = int(ret[0])
0066 
0067         INFO(f"Found {nfiles} existing files to delete.")
0068 
0069         filebatches=[]
0070         if nfiles>0:
0071             filebatches=read_batches(dstlistname,100000) if dstlistname else (rootfiles,)
0072         for i,batch in enumerate(filebatches):
0073             INFO(f"Processing batch {i}, length is {len(batch)} lines.")
0074             if delete_files and not dryrun:
0075                 for rootfile in batch:
0076                     Path(rootfile).unlink(missing_ok=True)
0077                     pass
0078 
0079     if not dryrun:
0080         unlock_file(file_path=dstlistname,dryrun=dryrun)
0081         #Path(dstlistname).unlink(missing_ok=True)
0082 
0083     ### 2. Select from production DB
0084     # We could do this together with the next step, for individual lfns.
0085     existing_status=match_config.get_prod_status(runlist)
0086     existing_status=list(existing_status.keys())
0087     INFO(f"Found {len(existing_status)} output files in the production db")
0088 
0089     ### 2a. Delete from production db.
0090     dbstring = 'statw'
0091     status_query="SELECT id" if dryrun else "DELETE"
0092     status_query+="""
0093     FROM production_status
0094         WHERE
0095     dstfile in
0096     """
0097     chunksize=5000
0098     statusmax=len(existing_status)
0099     for i,statuschunk in enumerate(make_chunks(existing_status,chunksize)):
0100         statuschunk_str="','".join(statuschunk)
0101         
0102         DEBUG( f'Removing file #{i*chunksize}/{statusmax} from database production_status')
0103         prod_curs = dbQuery( cnxn_string_map[ dbstring ], status_query+f"( '{statuschunk_str}' )" )
0104         if prod_curs:
0105             prod_curs.commit()
0106         else:
0107             ERROR("Failed to delete file(s) from production database")
0108             exit(1)
0109     
0110         
0111     ### 3. Select lfns in DB
0112     # This is necessary because the files db has no fields to select by runnumber etc.
0113     existing_lfns=match_config.get_files_in_db(runlist)
0114     INFO(f"Found {len(existing_lfns)} entries in the FileCatalog")
0115 
0116     ### 4. Delete from datasets and files
0117     dbstring = 'testw' if dbutils_test_mode else 'fcw'
0118     datasets_table='test_datasets' if dbutils_test_mode else 'datasets'
0119     datasets_query="SELECT" if dryrun else "DELETE"
0120     datasets_query+=f"""    
0121     FROM {datasets_table}
0122         WHERE
0123     filename in
0124     """
0125 
0126     files_table='test_files' if dbutils_test_mode else 'files'
0127     files_query="SELECT" if dryrun else "DELETE"
0128     files_query+=f"""    
0129     FROM {files_table}
0130         WHERE
0131     lfn in
0132     """
0133     
0134     chunksize=5000
0135     lfnmax=len(existing_lfns)
0136     for i,lfnchunk in enumerate(make_chunks(existing_lfns,chunksize)):
0137         DEBUG( f'File #{i*chunksize}/{lfnmax}' )
0138         lfnchunk_str="','".join(lfnchunk)
0139         
0140         DEBUG( f'Removing from database {files_table}')
0141         files_curs = dbQuery( cnxn_string_map[ dbstring ], files_query+f"( '{lfnchunk_str}' )" )
0142         if files_curs:
0143             files_curs.commit()
0144         else:
0145             ERROR("Failed to delete file(s) from files database")
0146             exit(1)
0147             
0148         DEBUG( f'Removing from database {datasets_table}' )
0149         datasets_curs = dbQuery( cnxn_string_map[ dbstring ], datasets_query+f"( '{lfnchunk_str}' )" )
0150         if datasets_curs:
0151             datasets_curs.commit()
0152         else:
0153             ERROR("Failed to delete file(s) from datasets database")
0154             exit(1)
0155 
0156     return
0157 
0158 def main():
0159     args = submission_args()
0160 
0161     #################### Test mode?
0162     test_mode = (
0163             dbutils_test_mode
0164             or args.test_mode
0165             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0166         )
0167 
0168     # Set up submission logging before going any further
0169     sublogdir=setup_rot_handler(args)
0170     slogger.setLevel(args.loglevel)
0171 
0172     # Exit without fuss if we are already running
0173     if should_I_quit(args=args, myname=sys.argv[0]):
0174         DEBUG("Stop.")
0175         exit(0)
0176     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0177 
0178     if test_mode:
0179         INFO("Running in testbed mode.")
0180         args.mangle_dirpath = 'production-testbed'
0181     else:
0182         INFO("Running in production mode.")
0183 
0184     # Prepare param_overrides for RuleConfig
0185     param_overrides = {}
0186     param_overrides["runs"] = args.runs
0187     param_overrides["runlist"] = args.runlist
0188     param_overrides["nevents"] = 0 # Not relevant for eradication, but RuleConfig expects it.
0189 
0190     if args.physicsmode is not None:
0191         param_overrides["physicsmode"] = args.physicsmode
0192 
0193     # filesystem is the base for all output, allow for mangling here
0194     # "production" (in the default filesystem) is replaced
0195     param_overrides["prodmode"] = "production"
0196     if args.mangle_dirpath:
0197         param_overrides["prodmode"] = args.mangle_dirpath
0198 
0199     # Load specific rule from the given yaml file.
0200     try:
0201         rule = RuleConfig.from_yaml_file(
0202             yaml_file=args.config,
0203             rule_name=args.rulename,
0204             param_overrides=param_overrides
0205         )
0206         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0207     except (ValueError, FileNotFoundError) as e:
0208         ERROR(f"Error loading rule configuration: {e}")
0209         exit(1)
0210 
0211     # Create a match configuration from the rule
0212     match_config = MatchConfig.from_rule_config(rule)
0213     INFO("Match configuration created.")
0214 
0215     # Call the main eradication function
0216     eradicate_runs(match_config, dryrun=args.dryrun,delete_files=args.force_delete)
0217 
0218     INFO(f"{Path(__file__).name} DONE.")
0219 
0220 if __name__ == '__main__':
0221     main()
0222     exit(0)