Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:40

0001 #!/usr/bin/env python
0002 
0003 import pyodbc
0004 from pathlib import Path
0005 from datetime import datetime  # noqa: F401
0006 import yaml
0007 import cProfile
0008 import subprocess
0009 import sys
0010 import shutil
0011 
0012 # from dataclasses import fields
0013 import pprint # noqa F401
0014 
0015 from argparsing import submission_args
0016 from sphenixmisc import setup_rot_handler, should_I_quit
0017 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0018 from sphenixprodrules import RuleConfig,list_to_condition
0019 from sphenixprodrules import parse_lfn
0020 from sphenixdbutils import test_mode as dbutils_test_mode
0021 from sphenixdbutils import cnxn_string_map
0022 from sphenixmisc import remove_empty_directories, binary_contains_bisect, make_chunks
0023 
0024 # ============================================================================================
0025 def delQuery( cnxn_string, query ):
0026     if 'delete' not in query:
0027         WARN(f'delQuery called without "delete". Query: {query}')
0028 
0029     DEBUG(f'[cnxn_string] {cnxn_string}')
0030     DEBUG(f'[query      ]\n{query}')
0031     conn = pyodbc.connect( cnxn_string )
0032     curs = conn.cursor()
0033     curs.execute( query )
0034     curs.commit()
0035     return(curs.rowcount)
0036 
0037 # ============================================================================================
0038 
0039 def main():
0040     """Instantiate a given rule and remove all logfiles, histograms, dsts, database entries etc.
0041     TODO: The first hundred or so lines are shared with protospider and create_submission.
0042     Refactor when there's downtime.
0043 """ 
0044 
0045     ### digest arguments
0046     args = submission_args()
0047 
0048     # This is a dangerous operation. Make sure the user means it.
0049     if not args.dryrun:
0050         answer = input("This is not a drill. Do you want to continue? (yes/no): ")
0051         if answer.lower() != "yes":
0052             print("Exiting. Smart.")
0053             exit(0)
0054         else:
0055             print("Here we go deleting then.")
0056 
0057     #################### Test mode?
0058     test_mode = (
0059             dbutils_test_mode
0060             or args.test_mode
0061             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0062         )
0063 
0064     # Set up submission logging before going any further
0065     sublogdir=setup_rot_handler(args)
0066     slogger.setLevel(args.loglevel)
0067     
0068     # Exit without fuss if we are already running 
0069     if should_I_quit(args=args, myname=sys.argv[0]):
0070         DEBUG("Stop.")
0071         exit(0)
0072     
0073     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0074 
0075     if test_mode:
0076         INFO("Running in testbed mode.")
0077         args.mangle_dirpath = 'production-testbed'
0078     else:
0079         INFO("Running in production mode.")
0080 
0081     #################### Rule has steering parameters and two subclasses for input and job specifics
0082     # Rule is instantiated via the yaml reader.
0083 
0084     ### Parse command line arguments into a substitution dictionary
0085     # This dictionary is passed to the ctor to override/customize yaml file parameters
0086     # Note: The following could all be hidden away in the RuleConfig ctor
0087     # but this way, CLI arguments are used by the function that received them and
0088     # constraint constructions are visibly handled away from the RuleConfig class
0089     param_overrides = {}
0090     param_overrides["runs"]=args.runs
0091     param_overrides["runlist"]=args.runlist
0092     param_overrides["nevents"] = 0 # Not relevant, but needed for the RuleConfig ctor
0093         
0094     # Rest of the input substitutions
0095     if args.physicsmode is not None:
0096         param_overrides["physicsmode"] = args.physicsmode # e.g. physics
0097 
0098     if args.mangle_dstname:
0099         DEBUG("Mangling DST name")
0100         param_overrides['DST']=args.mangle_dstname
0101 
0102     # filesystem is the base for all output, allow for mangling here
0103     # "production" (in the default filesystem) is replaced
0104     param_overrides["prodmode"] = "production"
0105     if args.mangle_dirpath:
0106         param_overrides["prodmode"] = args.mangle_dirpath
0107 
0108     CHATTY(f"Rule substitutions: {param_overrides}")
0109     INFO("Now loading and building rule configuration.")
0110 
0111     #################### Load specific rule from the given yaml file.
0112     try:
0113         rule =  RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0114         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0115     except (ValueError, FileNotFoundError) as e:
0116         ERROR(f"Error: {e}")
0117         exit(1)
0118 
0119     CHATTY("Rule configuration:")
0120     CHATTY(yaml.dump(rule.dict))
0121     
0122     ### Which find command to use for lustre?
0123     # Lustre's robin hood, rbh-find, doesn't offer advantages for our usecase, and it is more cumbersome to use.
0124     # But "lfs find" is preferrable to the regular kind.
0125     lfind = shutil.which('lfs')
0126     if lfind is None:
0127         WARN("'lfs find' not found.")
0128         lfind = shutil.which('find')
0129     else:
0130         lfind = f'{lfind} find'
0131     INFO(f'Using "{lfind}.')
0132     
0133     ######## Now clean up
0134     ### Condor jobs:
0135     condor_batchname=rule.job_config.batch_name
0136     # This is not necessary, just information
0137     condor_running_command=f"condor_q -const 'JobBatchName==\"{condor_batchname}\"' -format '%d\\n'  ClusterId |wc -l"
0138     condor_running=0
0139     try:
0140         condor_running = subprocess.run(condor_running_command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()[0]
0141         DEBUG("Command successful!")
0142     except subprocess.CalledProcessError as e:
0143         print("Command failed with exit code:", e.returncode)
0144     finally:
0145         pass
0146 
0147     WARN(f"About to kill {condor_running} condor jobs for JobBatchName==\"{condor_batchname}\"" )
0148     condor_rm_command=f"condor_rm -long -const 'JobBatchName==\"{condor_batchname}\"' | grep -c ^job_"
0149     WARN(f"{condor_rm_command}")
0150     if not args.dryrun:
0151         condor_rm='0'
0152         try:
0153             condor_rm = subprocess.run(condor_rm_command, shell=True, check=True, capture_output=True).stdout.decode('utf-8')
0154             DEBUG("Command successful!")
0155         except subprocess.CalledProcessError as e:
0156             print("Command failed with exit code:", e.returncode)
0157         finally:
0158             pass
0159         WARN(f"Killed {condor_rm} jobs using {condor_rm_command}" )
0160 
0161     ### Submission directory. Hacky.
0162     submission_dir = Path('./tosubmit').resolve() 
0163     subbase = f'{rule.rulestem}_{rule.outstub}_{rule.outdataset}'
0164     INFO(f'Submission files based on {subbase}')
0165     existing_sub_files =  list(Path(submission_dir).glob(f'{subbase}*.in'))
0166     existing_sub_files += list(Path(submission_dir).glob(f'{subbase}*.sub'))
0167     if existing_sub_files:
0168         WARN(f"Removing {int(len(existing_sub_files)/2)} existing submission file pairs for base: {subbase}")
0169         for f_to_delete in existing_sub_files: 
0170             CHATTY(f"Deleting: {f_to_delete}")
0171             if not args.dryrun:
0172                 Path(f_to_delete).unlink(missing_ok=True)
0173     if Path(submission_dir).is_dir() and not any(Path(submission_dir).iterdir()):
0174         WARN(f"Submission directory is empty. Removing {submission_dir}")
0175         if not args.dryrun:
0176             Path(submission_dir).rmdir()
0177 
0178     ############# DSTs still in the lake
0179     filesystem = rule.job_config.filesystem
0180     DEBUG(f"Filesystem: {filesystem}")
0181     dstbase = f'{rule.rulestem}\*{rule.outstub}_{rule.outdataset}\*'
0182     INFO(f'DST files filtered as {dstbase}')
0183 
0184     lakelocation=filesystem['outdir']
0185     INFO(f"Original output directory: {lakelocation}")
0186     findcommand = f"{lfind} {lakelocation} -type f -name {dstbase}\*.root\*"
0187     findcommand=findcommand.replace('\*\*','\*') # cleanup eventual double asterisks
0188     INFO(f"Find command: {findcommand}")
0189     lakefiles=[]
0190     if Path(lakelocation).is_dir():
0191         lakefiles = subprocess.run(findcommand, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0192     print(f"Found {len(lakefiles)} matching dsts sans runnumber cut in the lake.")
0193     del_lakefiles=[]
0194     for f_to_delete in lakefiles:
0195         lfn=Path(f_to_delete).name
0196         _,run,_,_=parse_lfn(lfn,rule)
0197         if binary_contains_bisect(rule.runlist_int,run):
0198             del_lakefiles.append(f_to_delete)
0199     WARN(f"Removing {len(del_lakefiles)} .root {lakelocation}")
0200     for f_to_delete in del_lakefiles: 
0201         CHATTY(f"Deleting: {f_to_delete}")
0202         if not args.dryrun:
0203             Path(f_to_delete).unlink(missing_ok=True) # could unlink the entire directory instead?
0204 
0205     findcommand = f"{lfind} {lakelocation} -type f -name {dstbase}\*.finished\*"
0206     findcommand=findcommand.replace('\*\*','\*') # cleanup eventual double asterisks
0207     INFO(f"Find command: {findcommand}")
0208     ## DEBUG
0209     finishedlakefiles = []
0210     if Path(lakelocation).is_dir():
0211         finishedlakefiles = subprocess.run(findcommand, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0212     print(f"Found {len(finishedlakefiles)} matching .finished files in the lake.")
0213             
0214     del_lakefiles=[]
0215     for f_to_delete in finishedlakefiles:
0216         lfn=Path(f_to_delete).name
0217         _,run,_,_=parse_lfn(lfn,rule)
0218         if binary_contains_bisect(rule.runlist_int,run):
0219             del_lakefiles.append(f_to_delete)
0220     WARN(f"Removing {len(del_lakefiles)} .finished files in the lake at {lakelocation}")
0221     for f_to_delete in del_lakefiles: 
0222         CHATTY(f"Deleting: {f_to_delete}")
0223         if not args.dryrun:
0224             Path(f_to_delete).unlink(missing_ok=True) # could unlink the entire directory instead?
0225 
0226     # Clean up directories
0227     if Path(lakelocation).is_dir() and not any(Path(lakelocation).iterdir()):
0228         WARN(f"DST lake is empty. Removing {lakelocation}")
0229         if not args.dryrun:
0230             Path(lakelocation).rmdir()            
0231 
0232     ### DSTS, final destination
0233     finaldir_tmpl=filesystem['finaldir']
0234     INFO(f"Final destination template: {finaldir_tmpl}")
0235 
0236     ## Complicated way: Extract hostname == leaf, construct directory and dst names
0237     # # Extract information encoded in the file name
0238     # input_stem = inputs_from_output[rule.rulestem]
0239     # INFO(f"Input stem: {input_stem}")
0240     # outstub = rule.outstub
0241     # INFO(f"Output stub: {outstub}")
0242     # dst_type_template = f'{rule.rulestem}'
0243     # if 'raw' in rule.input_config.db:
0244     #     dst_type_template += '_{host}'
0245     # dst_types = { f'{dst_type_template}'.format(host=host) for host in input_stem.keys() }    
0246     # INFO(f"Destination type template: {dst_type_template}")
0247     # INFO(f"Destination types: {dst_types}")
0248     # ...
0249     
0250     ## Simple way: Replace all placeholders in the final destination template with '*'
0251     ## Search by filename
0252     try:
0253         finaldir_glob = finaldir_tmpl.format(leafdir='*',rungroup='*')
0254     except Exception as e:
0255         ERROR(f"Trying to globify {finaldir_tmpl} failed. Error:\n{e}")
0256         exit(-1)
0257     final_dsts_command=f"{lfind} {finaldir_glob} -type f -name {dstbase}\*"
0258     INFO(f"Find command for moved DSTs: {final_dsts_command}")
0259     all_final_dsts =[]
0260     try:
0261         all_final_dsts = subprocess.run(final_dsts_command, shell=True, check=True, capture_output=True)
0262         all_final_dsts = all_final_dsts.stdout.decode('utf-8').splitlines()
0263         DEBUG("Command successful! len(all_final_dsts)={len(all_final_dsts)}")
0264     except subprocess.CalledProcessError as e:
0265         print("Command failed with exit code:", e.returncode)
0266     finally:
0267         pass
0268     
0269     del_final_dsts = []
0270     for dst in all_final_dsts:
0271         lfn=Path(dst).name
0272         _,run,_,_=parse_lfn(lfn,rule)
0273         if binary_contains_bisect(rule.runlist_int,run):
0274             del_final_dsts.append(dst)
0275     WARN(f"Removing {len(del_final_dsts)} of the {len(all_final_dsts)} DSTs found by:\n{final_dsts_command}")
0276     for f_to_delete in del_final_dsts:
0277         CHATTY(f"Deleting: {f_to_delete}")
0278         if not args.dryrun:
0279             Path(f_to_delete).unlink(missing_ok=True)
0280 
0281     ### Update databases accordingly
0282     ## Note: We are only using the actually deleted filenames.    
0283     ## It would be more thorough to do it by a more general rule, but that's complicated b/c you have to dissect lfn
0284     chunk_size = 500
0285     chunked_dsts = list(make_chunks(del_final_dsts, chunk_size))
0286     dbstring = 'testw' if test_mode else 'fcw'
0287     files_table='test_files' if test_mode else 'files'
0288     datasets_table='test_datasets' if test_mode else 'datasets'
0289     WARN(f"Deleting {len(del_final_dsts)} DST rows from table {files_table} and from table {datasets_table}")
0290 
0291     ## files
0292     del_files_tmpl="""
0293     select count(lfn) from {files_table} 
0294     where lfn in ({lfns})
0295     """
0296     ## datasets
0297     del_datasets_tmpl="""
0298     select count(filename) from {datasets_table} 
0299     where filename in ({lfns})
0300     """
0301     if not args.dryrun:
0302         del_files_tmpl=del_files_tmpl.replace("select count(lfn) from", "delete from")
0303         del_datasets_tmpl=del_datasets_tmpl.replace("select count(filename) from", "delete from") 
0304 
0305     for i, chunk in enumerate(chunked_dsts):
0306         lfns=[]
0307         for dst in chunk:
0308             lfns.append(f"'{Path(dst).name}'")
0309         del_files_db=del_files_tmpl.format(
0310             files_table=files_table,
0311             lfns=",".join(lfns)
0312         )
0313         del_datasets_db=del_datasets_tmpl.format(
0314             datasets_table=datasets_table,
0315             lfns=",".join(lfns)
0316         )
0317         CHATTY(del_files_db )
0318         CHATTY(del_datasets_db )
0319         if not args.dryrun:
0320             response = delQuery( cnxn_string_map[ dbstring ], del_files_db )
0321             DEBUG(f"Delete chunk {i} from files db, response: {response}")
0322             response = delQuery( cnxn_string_map[ dbstring ], del_datasets_db )
0323             DEBUG(f"Delete chunk {i} from datasets db, response: {response}")
0324             
0325     ### Clean up empty directories on lustre
0326     # With lfs find on lustre, "-empty" doesn't work. Rely on the cleaner to check that
0327     # Very generous find, but we're only cleaning up empties after all
0328     final_dirs_command=f"{lfind} {finaldir_glob} -type d"
0329     INFO(f"Find command: {final_dirs_command}")
0330     
0331     all_final_dirs =[]
0332     try:
0333         all_final_dirs = subprocess.run(final_dirs_command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0334         DEBUG("Command successful!")
0335     except subprocess.CalledProcessError as e:
0336         # If the spider never ran, the directories may not exist
0337         print("Command failed with exit code:", e.returncode)
0338     finally:
0339         pass
0340 
0341     INFO(f"{final_dirs_command} found {len(all_final_dirs)} directories. Removing the empty ones.")
0342     if not args.dryrun:
0343         remove_empty_directories( set(all_final_dirs) )
0344 
0345     # More surgical, less flexible
0346     # for del_dir in all_final_dirs: # Only one level deep!
0347     #     for daughter in Path(del_dir).iterdir():            
0348     #         if Path(daughter).is_dir() and not any(Path(daughter).iterdir()):
0349     #             DEBUG(f"Deleting {daughter}")
0350     #             Path(daughter).rmdir()
0351     #     if not any(Path(del_dir).iterdir()):
0352     #         DEBUG(f"Deleting {del_dir}")
0353     #         Path(del_dir).rmdir()
0354 
0355     ######### Take care of out, err, log, hist
0356     ## Simple way: Replace all placeholders in the destination template with '*'
0357     ## Lazy assumption: they all live in the same place. Check that.
0358     ## Note: The fact that Path(...).parent works for a path with placeholders is a bit weird but it does.
0359     datadir_tmpl=Path(filesystem['histdir']).parent
0360     if datadir_tmpl!=Path(filesystem['logdir']).parent or datadir_tmpl!=Path(filesystem['condor']).parent:
0361         ERROR("Assumption that the root of histdir, logdir, condor is the same failed.")
0362         print(f"histdir: {filesystem['histdir']}")
0363         print(f"logdir:  {filesystem['logdir']}")
0364         print(f"condor:  {filesystem['condor']}")
0365 
0366     try:
0367         datadir_glob = str(datadir_tmpl).format(leafdir='*',rungroup='*')
0368     except Exception as e:
0369         ERROR(f"Trying to globify {datadir_glob} failed. Error:\n{e}")
0370         exit(-1)
0371 
0372     final_data_command=f"find {datadir_glob} -type f -name {dstbase}\*.out -o -name {dstbase}\*.err -o -name {dstbase}\*.condor -o -name HIST_{dstbase}\*.root"
0373     INFO(final_data_command)
0374     all_final_data=[]
0375     try:
0376         all_final_data = subprocess.run(final_data_command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()    
0377         DEBUG("Command successful!")
0378     except subprocess.CalledProcessError as e:
0379         print("Command failed with exit code:", e.returncode)
0380     finally:
0381         pass
0382     WARN(f"Found {len(all_final_data)} histogram and log files.")
0383     del_final_data = []
0384     for data in all_final_data:
0385         lfn=Path(data).name
0386         _,run,seg,end=parse_lfn(lfn,rule)
0387         if binary_contains_bisect(rule.runlist_int,run):
0388             del_final_data.append(data)
0389             
0390     WARN(f"Removing {len(del_final_data)} of the {len(all_final_data)} log and histo files found by:\n{final_data_command}")
0391     for f_to_delete in del_final_data:
0392         CHATTY(f"Deleting: {f_to_delete}")
0393         if not args.dryrun:
0394             Path(f_to_delete).unlink(missing_ok=True)
0395     
0396     # And remove them from databases
0397     histnumber= sum('HIST_' in s for s in del_final_data)
0398     WARN(f"Deleting {histnumber} histogram files from rows from table {files_table} and from table {datasets_table}. Also deleting the other data files if they somehow made it in.")
0399     chunked_data = list(make_chunks(del_final_data, chunk_size))
0400     for i, chunk in enumerate(chunked_data):
0401         lfns=[]
0402         for data in chunk:
0403             lfns.append(f"'{Path(data).name}'")
0404         del_files_db=del_files_tmpl.format(
0405             files_table=files_table,
0406             lfns=",".join(lfns)
0407         )
0408         del_datasets_db=del_datasets_tmpl.format(
0409             datasets_table=datasets_table,
0410             lfns=",".join(lfns)
0411         )
0412         CHATTY(del_files_db )
0413         CHATTY(del_datasets_db )
0414         if not args.dryrun:
0415             response = delQuery( cnxn_string_map[ dbstring ], del_files_db )
0416             DEBUG(f"Delete chunk {i} from files db, response: {response}")
0417             response = delQuery( cnxn_string_map[ dbstring ], del_datasets_db )
0418             DEBUG(f"Delete chunk {i} from datasets db, response: {response}")
0419 
0420     ### Clean up empty directories on /sphenix/data/data02
0421     datatrunk=datadir_glob.replace("/*","")
0422     datatrunk=f"{datatrunk}/{rule.rulestem}*"
0423     data_dirs_find=f"find {datatrunk} -type d -empty"
0424     empty_data_dirs=[]
0425     try:
0426         empty_data_dirs = subprocess.run(data_dirs_find, shell=True, check=True, capture_output=True).stdout.decode('utf-8').splitlines()
0427         DEBUG("Command successful!")
0428     except subprocess.CalledProcessError as e:
0429         print("Command failed with exit code:", e.returncode)
0430     finally:
0431         pass
0432     INFO(f"{len(empty_data_dirs)} empty leaf directories found with {data_dirs_find}. Removing.")
0433     if not args.dryrun:
0434         remove_empty_directories( set(empty_data_dirs) )
0435 
0436     sqldstbase=dstbase.replace("\*","%")
0437     prodrun_condition=list_to_condition(rule.runlist_int,"run")
0438     ### Finally, clean the production database
0439     del_prod_state = f"""
0440 delete from production_status 
0441 where 
0442 dstname like '{sqldstbase}'
0443 and {prodrun_condition}
0444 returning *
0445 """
0446     WARN(del_prod_state+";")
0447     if not args.dryrun:
0448         response = delQuery( cnxn_string_map[ "statw" ], del_prod_state )
0449         DEBUG(f"Delete states from prod db, response: {response}")
0450 
0451     INFO("Done.")
0452     if not args.dryrun:
0453         WARN("You should run this again in a minute or two, to catch files still trickling in from killed jobs.")
0454     exit(0)
0455         
0456 # ============================================================================================
0457 
0458 if __name__ == '__main__':
0459     ERROR("This script is currently not functional. It needs an overhaul following the logic in dstspider, histspider.")
0460     ERROR("Furthermore, it doesn't work well at the scale of full production.")
0461     exit(1)
0462 
0463 
0464     # main()
0465     # exit(0)
0466 
0467     cProfile.run('main()', '/tmp/sphenixprod.prof')
0468     import pstats
0469     p = pstats.Stats('/tmp/sphenixprod.prof')
0470     p.strip_dirs().sort_stats('time').print_stats(10)
0471