File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 import argparse
0004 import subprocess
0005 import yaml
0006 import platform
0007 import cProfile
0008 import pstats
0009 import pprint
0010 import sys
0011 from typing import Dict, Any, Tuple
0012
0013 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0014 from logging.handlers import RotatingFileHandler
0015 from pathlib import Path
0016 from datetime import datetime
0017 from sphenixprodrules import check_params
0018
0019 from collections import namedtuple
0020 SubmitDstHist = namedtuple('SubmitDstHist',['submit','dstspider','histspider','finishmon'])
0021
0022
0023 def main():
0024 """
0025 Specifically intended for cron jobs. Steered by a control yaml and based on the hostname,
0026 dispatch submission, spider, and any other processes for production jobs.
0027 """
0028
0029
0030 args = steering_args()
0031 setup_my_rot_handler(args)
0032 slogger.setLevel(args.loglevel)
0033
0034 hostname=args.hostname.split('.')[0] if isinstance(args.hostname,str) else None
0035 if not hostname:
0036 hostname=platform.node().split('.')[0]
0037 INFO(f"{sys.argv[0]} invoked on {hostname}")
0038 else:
0039 WARN(f"{sys.argv[0]} invoked for {hostname} but from {platform.node().split('.')[0]}")
0040
0041 if args.profile:
0042 DEBUG( "Profiling is ENABLED.")
0043 profiler = cProfile.Profile()
0044 profiler.enable()
0045
0046
0047 try:
0048 INFO(f"Reading rules from {args.steerfile}")
0049 with open(args.steerfile, "r") as yamlstream:
0050 yaml_data = yaml.safe_load(yamlstream)
0051 except yaml.YAMLError as yerr:
0052 raise ValueError(f"Error parsing YAML file: {yerr}")
0053 except FileNotFoundError:
0054 ERROR(f"YAML file not found: {args.steerfile}")
0055 exit(1)
0056
0057 try:
0058 host_data = yaml_data[hostname]
0059 except KeyError:
0060 INFO(f"Host '{hostname}' not found in {args.steerfile}")
0061 exit(2)
0062
0063
0064
0065
0066 defaultlocations=host_data.pop("defaultlocations",None)
0067 if not defaultlocations:
0068 ERROR(f'Could not find field "defaultlocations" in the yaml for {hostname}')
0069 exit(1)
0070 prettyfs = pprint.pformat(defaultlocations)
0071 DEBUG(f"Default file locations:\n{prettyfs}")
0072 INFO(f"Successfully loaded {len(host_data)} rules for {hostname}")
0073 CHATTY(f"YAML dict for {hostname} is:\n{pprint.pformat(host_data)}")
0074
0075
0076 for rule in host_data:
0077 INFO(f"Working on {rule}")
0078 thisprod,ruleargs,sdh_tuple=collect_yaml_data(host_data=host_data,rule=rule,defaultlocations=defaultlocations,dryrun=args.dryrun)
0079
0080
0081 envline=f"source {thisprod}"
0082 ruleargs+=f" --loglevel {args.loglevel}"
0083 if args.dryrun:
0084 ruleargs+=" --dryrun"
0085
0086
0087
0088
0089 INFO(f"sdh_tuple is {pprint.pformat(sdh_tuple)}")
0090 if sdh_tuple.histspider:
0091 procline=f"histspider.py {ruleargs}"
0092 execline=f"{envline} &>/dev/null && {procline} &>/dev/null"
0093 DEBUG(f"Executing\n{execline}")
0094 if not args.dryrun:
0095 subprocess.Popen(f"{execline}",shell=True)
0096
0097 if sdh_tuple.dstspider:
0098 procline=f"dstspider.py {ruleargs}"
0099 execline=f"{envline} &>/dev/null && {procline} &>/dev/null"
0100 DEBUG(f"Executing\n{execline}")
0101 if not args.dryrun:
0102 subprocess.Popen(f"{execline}",shell=True)
0103
0104 if sdh_tuple.submit:
0105 procline=f"create_submission.py {ruleargs}"
0106
0107
0108 procline+=" --andgo"
0109 execline=f"{envline} &>/dev/null && {procline} &>/dev/null"
0110 DEBUG(f"Executing\n{execline}")
0111 if not args.dryrun:
0112 subprocess.Popen(f"{execline}",shell=True)
0113
0114 if sdh_tuple.finishmon:
0115 procline=f"monitor_finish.py {ruleargs}"
0116 execline=f"{envline} &>/dev/null && {procline} &>/dev/null"
0117 DEBUG(f"Executing\n{execline}")
0118 if not args.dryrun:
0119 subprocess.Popen(f"{execline}",shell=True)
0120
0121 if args.profile:
0122 profiler.disable()
0123 DEBUG("Profiling finished. Printing stats...")
0124 stats = pstats.Stats(profiler)
0125 stats.strip_dirs().sort_stats('time').print_stats(8)
0126
0127 INFO("All done.")
0128 exit(0)
0129
0130
0131
0132 def collect_yaml_data( host_data: Dict[str, Any], rule: str, defaultlocations: str, dryrun: bool ) -> Tuple[str,str,SubmitDstHist]:
0133 rule_data=host_data[rule]
0134 check_params( rule_data
0135 , required=["config"]
0136 , optional=["runs", "runlist",
0137 "submit", "dstspider","histspider",
0138 "finishmon",
0139 "prodbase", "configbase",
0140 "nevents",
0141 "jobmem", "jobprio",
0142 "force", "force_delete",
0143 "cut_segment",
0144 "chunk-size",
0145 ])
0146
0147 prodbase=rule_data.get("prodbase",defaultlocations["prodbase"])
0148 configbase=rule_data.get("configbase",defaultlocations["configbase"])
0149 submitdir=rule_data.get("submitdir",defaultlocations["submitdir"])
0150 submitdir=submitdir.format(rule=rule)
0151
0152
0153 thisprod=f"{prodbase}/this_sphenixprod.sh"
0154 if not Path(thisprod).is_file():
0155 ERROR(f'Init script {thisprod} does not exist.')
0156 exit(1)
0157
0158
0159 ruleargs=f"--rule {rule}"
0160 config=rule_data.get("config", None)
0161 if not config.startswith("/"):
0162 config=f"{configbase}/{config}"
0163 if not Path(config).is_file():
0164 ERROR(f"Cannot find config file {config}")
0165 exit(1)
0166 ruleargs += f" --config {config}"
0167
0168 if not dryrun:
0169 Path(submitdir).mkdir( parents=True, exist_ok=True )
0170 ruleargs += f" --submitdir {submitdir}"
0171
0172 runs=rule_data.get("runs", None)
0173 if runs:
0174 runs = map(str,runs)
0175 runs = '" "'.join(runs)
0176 ruleargs += f" --runs {runs}"
0177 runlist=rule_data.get("runlist", None)
0178 if runlist:
0179 ruleargs += f" --runlist {runlist}"
0180 if runs and runlist:
0181 ERROR( 'You cannot specify both "runs" and "runlist"')
0182 exit(1)
0183
0184
0185 nevents=rule_data.get("nevents", None)
0186 if nevents:
0187 ruleargs += f" --nevents {nevents}"
0188 jobmem=rule_data.get("jobmem", None)
0189 if jobmem:
0190 ruleargs += f" --mem {jobmem}"
0191 jobprio=rule_data.get("jobprio", None)
0192 if jobprio:
0193 ruleargs += f" --priority {jobprio}"
0194 cut_segment=rule_data.get("cut_segment", None)
0195 if cut_segment:
0196 ruleargs += f" --cut-segment {cut_segment}"
0197 chunk_size=rule_data.get("chunk-size", 100)
0198 ruleargs += f" --chunk-size {chunk_size}"
0199
0200
0201 force=rule_data.get("force", False)
0202 force_delete=rule_data.get("force_delete", False)
0203 if force:
0204 WARN('"force" is not a good idea. Uncomment if you\'re sure')
0205
0206 if force_delete:
0207 WARN('"force_delete" is not a good idea. Uncomment if you\'re sure')
0208
0209
0210
0211 sdh_tuple=SubmitDstHist(submit=rule_data.get("submit", False),
0212 dstspider=rule_data.get("dstspider", True),
0213 histspider=rule_data.get("histspider", rule_data.get("dstspider", True)),
0214 finishmon=rule_data.get("finishmon", False)
0215 )
0216
0217 for k,v in sdh_tuple._asdict().items():
0218 if not isinstance(v,bool):
0219 ERROR(f'Value of "{k}" must be (yaml-)boolean, got "{v}"')
0220 exit(1)
0221
0222
0223 while ruleargs.startswith(" "):
0224 ruleargs=ruleargs[1:-1]
0225
0226 return thisprod,ruleargs,sdh_tuple
0227
0228
0229 def setup_my_rot_handler(args):
0230 sublogdir='/tmp/sphenixprod/sphenixprod/'
0231
0232 sublogdir += f"{Path(args.steerfile).name}".replace('.yaml','')
0233 Path(sublogdir).mkdir( parents=True, exist_ok=True )
0234 RotFileHandler = RotatingFileHandler(
0235 filename=f"{sublogdir}/{str(datetime.today().date())}.log",
0236 mode='a',
0237 maxBytes=25*1024*1024,
0238 backupCount=10,
0239 encoding=None,
0240 delay=0
0241 )
0242 RotFileHandler.setFormatter(CustomFormatter())
0243 slogger.addHandler(RotFileHandler)
0244
0245 return sublogdir
0246
0247
0248 def steering_args():
0249 """Handle command line tedium for steering jobs."""
0250 arg_parser = argparse.ArgumentParser( prog='production_control.py',
0251 description='"Production manager to dispatch jobs depending on submit node."',
0252 )
0253 arg_parser.add_argument( '--steerfile', '-f', dest='steerfile', required=True,
0254 help='Location of steering instructions per host' )
0255
0256 arg_parser.add_argument( '--dryrun', '-n',
0257 help="flag is passed through to the scripts", dest="dryrun", action="store_true")
0258
0259 arg_parser.add_argument( '--hostname', dest='hostname', default=None,
0260 help='Act as if running on [hostname]' )
0261
0262 arg_parser.add_argument( '--profile',help="Enable profiling", action="store_true")
0263
0264 vgroup = arg_parser.add_argument_group('Logging level')
0265 exclusive_vgroup = vgroup.add_mutually_exclusive_group()
0266 exclusive_vgroup.add_argument( '-v', '--verbose', help="Prints more information per repetition", action='count', default=0)
0267 exclusive_vgroup.add_argument( '-d', '--debug', help="Prints even more information", action="store_true")
0268 exclusive_vgroup.add_argument( '-c', '--chatty', help="Prints the most information", action="store_true")
0269 exclusive_vgroup.add_argument( '--loglevel', dest='loglevel', default='INFO',
0270 help="Specific logging level (CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL)" )
0271
0272 args = arg_parser.parse_args()
0273 if args.verbose==1 :
0274 args.loglevel = 'INFO'
0275 if args.debug or args.verbose==2 :
0276 args.loglevel = 'DEBUG'
0277 if args.chatty or args.verbose==3 :
0278 args.loglevel = 'CHATTY'
0279
0280 return args
0281
0282
0283
0284
0285 if __name__ == '__main__':
0286 main()
0287 exit(0)