File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from pathlib import Path
0004 from datetime import datetime
0005 import cProfile
0006 import pstats
0007 import subprocess
0008 import sys
0009 import re
0010
0011 import pprint
0012
0013 import argparse
0014 from argparsing import submission_args
0015 from sphenixmisc import setup_rot_handler, should_I_quit
0016 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0017 from sphenixprodrules import RuleConfig
0018 from sphenixdbutils import test_mode as dbutils_test_mode
0019 from sphenixdbutils import cnxn_string_map, dbQuery
0020
0021
0022
0023 def locate_submitfiles(rule: RuleConfig, args: argparse.Namespace, allruns: bool=False):
0024
0025 submitdir = Path(f'{args.submitdir}').resolve()
0026 subbase = f'{rule.dsttype}_{rule.dataset}_{rule.outtriplet}'
0027 INFO(f'Submission files located in {submitdir}')
0028 INFO(f'Submission files based on {subbase}')
0029
0030 sub_files = list(Path(submitdir).glob(f'{subbase}*.sub'))
0031 sub_files = list(map(str,sub_files))
0032 DEBUG(f"[locate_submitfiles] Submission files before run constraint:\n{pprint.pformat(sub_files)}")
0033 runlist=list(map(str,rule.runlist_int))
0034
0035
0036 if allruns:
0037 INFO("Ignoring run constraints, using all submission files.")
0038 else:
0039
0040 INFO("Selecting submission files based on runlist")
0041 sub_files = {file for file in sub_files if any( f'_{runnumber}' in file for runnumber in runlist) }
0042
0043 sub_files = sorted(sub_files,reverse=True)
0044 DEBUG(f"[locate_submitfiles] Submission files AFTER run constraint:\n{pprint.pformat(sub_files)}")
0045 if sub_files == []:
0046 INFO("No submission files found.")
0047 return sub_files
0048
0049
0050
0051 def execute_submission(rule: RuleConfig, args: argparse.Namespace, allruns: bool=False):
0052 """ Look for job files and submit condor jobs if the current load is acceptable.
0053 Update production database to "submitted".
0054 Locking and deleting is used to avoid double-submission.
0055 """
0056
0057 sub_files=locate_submitfiles(rule, args, allruns)
0058 if sub_files == []:
0059 INFO("No submission files found.")
0060
0061 submitted_jobs=0
0062 for sub_file in sub_files:
0063 in_file=re.sub(r".sub$",".in",str(sub_file))
0064
0065 if not Path(in_file).is_file():
0066 WARN(f"Deleting {sub_file} as it doesn't have a corresponding .in file")
0067 Path(sub_file).unlink()
0068
0069
0070
0071 dbids=[]
0072 try:
0073 with open(in_file,'r') as f:
0074 for line in f:
0075 dbids.append(str(line.strip().split(" ")[-1]))
0076 except Exception as e:
0077 ERROR(f"Error while parsing {in_file}:\n{e}")
0078 exit(1)
0079
0080 dbids_str=", ".join(dbids)
0081 now_str=str(datetime.now().replace(microsecond=0))
0082 update_prod_state = f"""
0083 UPDATE production_status
0084 SET status='submitted',submitted='{now_str}'
0085 WHERE id in
0086 ( {dbids_str} )
0087 ;
0088 """
0089 INFO(f"Updating db for {sub_file}")
0090 CHATTY(f"{update_prod_state}")
0091 prod_curs = dbQuery( cnxn_string_map['statw'], update_prod_state )
0092 prod_curs.commit()
0093
0094 INFO(f"Submitting {sub_file}\n\t\t && Removing {in_file}")
0095 if not args.dryrun:
0096 subprocess.run(f"condor_submit {sub_file} && rm {sub_file} {in_file}",shell=True)
0097 submitted_jobs+=len(dbids)
0098
0099 INFO(f"Received a total of {len(sub_files)} submission files.")
0100 INFO(f"Submitted a total of {submitted_jobs} jobs.")
0101
0102
0103
0104
0105
0106
0107
0108
0109
0110 def main():
0111
0112 args = submission_args()
0113
0114
0115 test_mode = (
0116 dbutils_test_mode
0117 or args.test_mode
0118
0119 )
0120
0121
0122 sublogdir=setup_rot_handler(args)
0123 slogger.setLevel(args.loglevel)
0124
0125
0126 if should_I_quit(args=args, myname=sys.argv[0]):
0127 DEBUG("Stop.")
0128 exit(0)
0129 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0130
0131 if args.profile:
0132 DEBUG( "Profiling is ENABLED.")
0133 profiler = cProfile.Profile()
0134 profiler.enable()
0135
0136 if test_mode:
0137 INFO("Running in testbed mode.")
0138 args.mangle_dirpath = 'production-testbed'
0139 else:
0140 INFO("Running in production mode.")
0141
0142
0143
0144
0145
0146
0147
0148
0149
0150 param_overrides = {}
0151 param_overrides["runs"]=args.runs
0152 param_overrides["runlist"]=args.runlist
0153 param_overrides["prodmode"] = None
0154 param_overrides["nevents"] = 0
0155
0156 CHATTY(f"Rule substitutions: {param_overrides}")
0157 INFO("Now loading and building rule configuration.")
0158
0159
0160 try:
0161 rule = RuleConfig.from_yaml_file( yaml_file=args.config, rule_name=args.rulename, param_overrides=param_overrides )
0162 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0163 except (ValueError, FileNotFoundError) as e:
0164 ERROR(f"Error: {e}")
0165 exit(1)
0166
0167
0168
0169
0170 filesystem = rule.job_config.filesystem
0171 CHATTY(f"Filesystem: {filesystem}")
0172
0173
0174 execute_submission(rule, args)
0175
0176 if args.profile:
0177 profiler.disable()
0178 DEBUG("Profiling finished. Printing stats...")
0179 stats = pstats.Stats(profiler)
0180 stats.strip_dirs().sort_stats('time').print_stats(10)
0181
0182 INFO(f"{Path(sys.argv[0]).name} DONE.")
0183
0184
0185
0186 if __name__ == '__main__':
0187 main()
0188 exit(0)