Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/bin/env python
0002 
0003 from pathlib import Path
0004 from datetime import datetime
0005 import cProfile
0006 import pstats
0007 import subprocess
0008 import sys
0009 import re
0010 
0011 import pprint # noqa F401
0012 
0013 import argparse
0014 from argparsing import submission_args
0015 from sphenixmisc import setup_rot_handler, should_I_quit
0016 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0017 from sphenixprodrules import RuleConfig
0018 from sphenixdbutils import test_mode as dbutils_test_mode
0019 from sphenixdbutils import cnxn_string_map, dbQuery
0020 
0021 
0022 # ============================================================================================
0023 def locate_submitfiles(rule: RuleConfig, args: argparse.Namespace, allruns: bool=False):
0024     ### Outsourced because this function is independently useful
0025     submitdir = Path(f'{args.submitdir}').resolve()
0026     subbase = f'{rule.dsttype}_{rule.dataset}_{rule.outtriplet}'
0027     INFO(f'Submission files located in {submitdir}')
0028     INFO(f'Submission files based on {subbase}')
0029 
0030     sub_files = list(Path(submitdir).glob(f'{subbase}*.sub'))
0031     sub_files = list(map(str,sub_files))
0032     DEBUG(f"[locate_submitfiles] Submission files before run constraint:\n{pprint.pformat(sub_files)}")
0033     runlist=list(map(str,rule.runlist_int))
0034 
0035     # Only use those who match the run condition - the pythonic way
0036     if allruns:
0037         INFO("Ignoring run constraints, using all submission files.")
0038     else:
0039         #INFO(f"Selecting submission files for runs: {runlist}")
0040         INFO("Selecting submission files based on runlist")
0041         sub_files = {file for file in sub_files if any( f'_{runnumber}' in file for runnumber in runlist) }
0042 
0043     sub_files = sorted(sub_files,reverse=True) # latest runs first
0044     DEBUG(f"[locate_submitfiles] Submission files AFTER run constraint:\n{pprint.pformat(sub_files)}")
0045     if sub_files == []:
0046         INFO("No submission files found.")
0047     return sub_files
0048 
0049 
0050 # ============================================================================================
0051 def execute_submission(rule: RuleConfig, args: argparse.Namespace, allruns: bool=False):
0052     """ Look for job files and submit condor jobs if the current load is acceptable.
0053     Update production database to "submitted".
0054     Locking and deleting is used to avoid double-submission.
0055     """
0056 
0057     sub_files=locate_submitfiles(rule, args, allruns)
0058     if sub_files == []:
0059         INFO("No submission files found.")
0060 
0061     submitted_jobs=0
0062     for sub_file in sub_files:
0063         in_file=re.sub(r".sub$",".in",str(sub_file))
0064         ### Catch problems or skipped runs
0065         if not Path(in_file).is_file():
0066             WARN(f"Deleting {sub_file} as it doesn't have a corresponding .in file")
0067             Path(sub_file).unlink()
0068 
0069         ### Update production database
0070         # Extract dbids
0071         dbids=[]
0072         try:
0073             with open(in_file,'r') as f:
0074                 for line in f:
0075                     dbids.append(str(line.strip().split(" ")[-1]))
0076         except Exception as e:
0077             ERROR(f"Error while parsing {in_file}:\n{e}")
0078             exit(1)
0079         #        submitted_jobs+=len(dbids)
0080         dbids_str=", ".join(dbids)
0081         now_str=str(datetime.now().replace(microsecond=0))
0082         update_prod_state = f"""
0083 UPDATE production_status
0084    SET status='submitted',submitted='{now_str}'
0085 WHERE id in
0086 ( {dbids_str} )
0087 ;
0088 """
0089         INFO(f"Updating db for {sub_file}")
0090         CHATTY(f"{update_prod_state}")
0091         prod_curs = dbQuery( cnxn_string_map['statw'], update_prod_state )
0092         prod_curs.commit()
0093 
0094         INFO(f"Submitting {sub_file}\n\t\t && Removing {in_file}")
0095         if not args.dryrun:
0096             subprocess.run(f"condor_submit {sub_file} && rm {sub_file} {in_file}",shell=True)
0097             submitted_jobs+=len(dbids)
0098 
0099     INFO(f"Received a total of {len(sub_files)} submission files.")
0100     INFO(f"Submitted a total of {submitted_jobs} jobs.")
0101     # Remove submission directory if empty
0102     # TODO: Test 
0103     # INFO(f"Trying to remove {submitdir}.")
0104     # if not args.dryrun:
0105     #     if len(list(submitdir.iterdir())) == 0:
0106     #         submitdir.rmdir()
0107 
0108 
0109 # ============================================================================================
0110 def main():
0111     ### digest arguments
0112     args = submission_args()
0113 
0114     #################### Test mode?
0115     test_mode = (
0116             dbutils_test_mode
0117             or args.test_mode
0118             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0119         )
0120 
0121     # Set up submission logging before going any further
0122     sublogdir=setup_rot_handler(args)
0123     slogger.setLevel(args.loglevel)
0124 
0125     # Exit without fuss if we are already running
0126     if should_I_quit(args=args, myname=sys.argv[0]):
0127         DEBUG("Stop.")
0128         exit(0)
0129     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0130 
0131     if args.profile:
0132         DEBUG( "Profiling is ENABLED.")
0133         profiler = cProfile.Profile()
0134         profiler.enable()
0135 
0136     if test_mode:
0137         INFO("Running in testbed mode.")
0138         args.mangle_dirpath = 'production-testbed'
0139     else:
0140         INFO("Running in production mode.")
0141 
0142     #################### Rule has steering parameters and two subclasses for input and job specifics
0143     # Rule is instantiated via the yaml reader.
0144 
0145     ### Parse command line arguments into a substitution dictionary
0146     # This dictionary is passed to the ctor to override/customize yaml file parameters
0147     # Note: The following could all be hidden away in the RuleConfig ctor
0148     # but this way, CLI arguments are used by the function that received them and
0149     # constraint constructions are visibly handled away from the RuleConfig class
0150     param_overrides = {}
0151     param_overrides["runs"]=args.runs
0152     param_overrides["runlist"]=args.runlist
0153     param_overrides["prodmode"] = None  # Not relevant, but needed for the RuleConfig ctor
0154     param_overrides["nevents"] = 0 # Not relevant, but needed for the RuleConfig ctor
0155 
0156     CHATTY(f"Rule substitutions: {param_overrides}")
0157     INFO("Now loading and building rule configuration.")
0158 
0159     #################### Load specific rule from the given yaml file.
0160     try:
0161         rule =  RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0162         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0163     except (ValueError, FileNotFoundError) as e:
0164         ERROR(f"Error: {e}")
0165         exit(1)
0166 
0167     # CHATTY("Rule configuration:")
0168     # CHATTY(yaml.dump(rule.dict))
0169 
0170     filesystem = rule.job_config.filesystem
0171     CHATTY(f"Filesystem: {filesystem}")
0172 
0173     ### And go
0174     execute_submission(rule, args)
0175 
0176     if args.profile:
0177         profiler.disable()
0178         DEBUG("Profiling finished. Printing stats...")
0179         stats = pstats.Stats(profiler)
0180         stats.strip_dirs().sort_stats('time').print_stats(10)
0181 
0182     INFO(f"{Path(sys.argv[0]).name} DONE.")
0183 
0184 # ============================================================================================
0185 
0186 if __name__ == '__main__':
0187     main()
0188     exit(0)