Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 
0003 import argparse
0004 import subprocess
0005 import yaml
0006 import platform
0007 import cProfile
0008 import pstats
0009 import pprint # noqa F401
0010 import sys
0011 from typing import Dict, Any, Tuple
0012 
0013 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0014 from logging.handlers import RotatingFileHandler
0015 from pathlib import Path
0016 from datetime import datetime
0017 from sphenixprodrules import check_params
0018 
0019 from collections import namedtuple
0020 SubmitDstHist = namedtuple('SubmitDstHist',['submit','dstspider','histspider','finishmon'])
0021 
0022 # ============================================================================================
0023 def main():
0024     """
0025     Specifically intended for cron jobs. Steered by a control yaml and based on the hostname,
0026     dispatch submission, spider, and any other processes for production jobs.
0027     """
0028 
0029     ### Minutiae
0030     args = steering_args()
0031     setup_my_rot_handler(args)
0032     slogger.setLevel(args.loglevel)
0033 
0034     hostname=args.hostname.split('.')[0] if isinstance(args.hostname,str) else None
0035     if not hostname:
0036         hostname=platform.node().split('.')[0]
0037         INFO(f"{sys.argv[0]} invoked on {hostname}")
0038     else:
0039         WARN(f"{sys.argv[0]} invoked for {hostname} but from {platform.node().split('.')[0]}")
0040 
0041     if args.profile:
0042         DEBUG( "Profiling is ENABLED.")
0043         profiler = cProfile.Profile()
0044         profiler.enable()
0045 
0046     ### Parse yaml
0047     try:
0048         INFO(f"Reading rules from {args.steerfile}")
0049         with open(args.steerfile, "r") as yamlstream:
0050             yaml_data = yaml.safe_load(yamlstream)
0051     except yaml.YAMLError as yerr:
0052         raise ValueError(f"Error parsing YAML file: {yerr}")
0053     except FileNotFoundError:
0054         ERROR(f"YAML file not found: {args.steerfile}")
0055         exit(1)
0056 
0057     try:
0058         host_data = yaml_data[hostname]
0059     except KeyError:
0060         INFO(f"Host '{hostname}' not found in {args.steerfile}")
0061         exit(2)
0062 
0063     ### defaultlocations is special
0064     # pop removes it so the remainder are rules
0065     # Note: Could be made optional for full path files, but too much fuss to be worth it.
0066     defaultlocations=host_data.pop("defaultlocations",None)
0067     if not defaultlocations:
0068         ERROR(f'Could not find field "defaultlocations" in the yaml for {hostname}')
0069         exit(1)
0070     prettyfs = pprint.pformat(defaultlocations)
0071     DEBUG(f"Default file locations:\n{prettyfs}")
0072     INFO(f"Successfully loaded {len(host_data)} rules for {hostname}")
0073     CHATTY(f"YAML dict for {hostname} is:\n{pprint.pformat(host_data)}")
0074 
0075     ### Walk through the rules.
0076     for rule in host_data:
0077         INFO(f"Working on {rule}")
0078         thisprod,ruleargs,sdh_tuple=collect_yaml_data(host_data=host_data,rule=rule,defaultlocations=defaultlocations,dryrun=args.dryrun)
0079 
0080         ### environment; pass-through arguments
0081         envline=f"source {thisprod}"
0082         ruleargs+=f" --loglevel {args.loglevel}"
0083         if args.dryrun:
0084             ruleargs+=" --dryrun"
0085 
0086         ### Loop through process types
0087         ### Go from fast to slow processes.
0088         ### They'll overlap anyway.
0089         INFO(f"sdh_tuple is {pprint.pformat(sdh_tuple)}")
0090         if sdh_tuple.histspider:
0091             procline=f"histspider.py {ruleargs}"
0092             execline=f"{envline}  &>/dev/null && {procline} &>/dev/null"
0093             DEBUG(f"Executing\n{execline}")
0094             if not args.dryrun:
0095                 subprocess.Popen(f"{execline}",shell=True)
0096 
0097         if sdh_tuple.dstspider:
0098             procline=f"dstspider.py {ruleargs}"
0099             execline=f"{envline}  &>/dev/null && {procline} &>/dev/null"
0100             DEBUG(f"Executing\n{execline}")
0101             if not args.dryrun:
0102                 subprocess.Popen(f"{execline}",shell=True)
0103 
0104         if sdh_tuple.submit:
0105             procline=f"create_submission.py {ruleargs}"
0106             ### Submission is special. Ideally (one day) we'd split up into creation that registers as "submitting"
0107             ### and submission which registers as "submitted". So far, do it in one go.
0108             procline+=" --andgo"
0109             execline=f"{envline}  &>/dev/null && {procline} &>/dev/null"
0110             DEBUG(f"Executing\n{execline}")
0111             if not args.dryrun:
0112                 subprocess.Popen(f"{execline}",shell=True)
0113 
0114         if sdh_tuple.finishmon:
0115             procline=f"monitor_finish.py {ruleargs}"
0116             execline=f"{envline}  &>/dev/null && {procline} &>/dev/null"
0117             DEBUG(f"Executing\n{execline}")
0118             if not args.dryrun:
0119                 subprocess.Popen(f"{execline}",shell=True)
0120 
0121     if args.profile:
0122         profiler.disable()
0123         DEBUG("Profiling finished. Printing stats...")
0124         stats = pstats.Stats(profiler)
0125         stats.strip_dirs().sort_stats('time').print_stats(8)
0126 
0127     INFO("All done.")
0128     exit(0)
0129 
0130 
0131 # ============================================================================================
0132 def collect_yaml_data( host_data: Dict[str, Any], rule: str, defaultlocations: str, dryrun: bool ) -> Tuple[str,str,SubmitDstHist]:
0133     rule_data=host_data[rule]
0134     check_params(  rule_data
0135                  , required=["config"]
0136                  , optional=["runs", "runlist",
0137                              "submit", "dstspider","histspider",
0138                              "finishmon",
0139                              "prodbase", "configbase",
0140                              "nevents",
0141                              "jobmem", "jobprio",
0142                              "force", "force_delete",
0143                              "cut_segment",
0144                              "chunk-size",
0145                              ])
0146     ### Local file location changes?
0147     prodbase=rule_data.get("prodbase",defaultlocations["prodbase"])
0148     configbase=rule_data.get("configbase",defaultlocations["configbase"])
0149     submitdir=rule_data.get("submitdir",defaultlocations["submitdir"])
0150     submitdir=submitdir.format(rule=rule)
0151 
0152     ### location of the this_sphenixprod script
0153     thisprod=f"{prodbase}/this_sphenixprod.sh"
0154     if not Path(thisprod).is_file():
0155         ERROR(f'Init script {thisprod} does not exist.')
0156         exit(1)
0157 
0158     ### construct arguments
0159     ruleargs=f"--rule {rule}"
0160     config=rule_data.get("config", None)
0161     if not config.startswith("/"):
0162         config=f"{configbase}/{config}"
0163     if not Path(config).is_file():
0164         ERROR(f"Cannot find config file {config}")
0165         exit(1)
0166     ruleargs += f" --config {config}"
0167 
0168     if not dryrun:
0169         Path(submitdir).mkdir( parents=True, exist_ok=True )
0170     ruleargs += f" --submitdir {submitdir}"
0171 
0172     runs=rule_data.get("runs", None)
0173     if runs:
0174         runs = map(str,runs)
0175         runs = '" "'.join(runs)
0176         ruleargs += f" --runs {runs}"
0177     runlist=rule_data.get("runlist", None)
0178     if runlist:
0179         ruleargs += f" --runlist {runlist}"
0180     if runs and runlist:
0181         ERROR( 'You cannot specify both "runs" and "runlist"')
0182         exit(1)
0183 
0184     ### More rare extra arguments
0185     nevents=rule_data.get("nevents", None)
0186     if nevents:
0187         ruleargs += f" --nevents {nevents}"
0188     jobmem=rule_data.get("jobmem", None)
0189     if jobmem:
0190         ruleargs += f" --mem {jobmem}"
0191     jobprio=rule_data.get("jobprio", None)
0192     if jobprio:
0193         ruleargs += f" --priority {jobprio}"
0194     cut_segment=rule_data.get("cut_segment", None)
0195     if cut_segment:
0196         ruleargs += f" --cut-segment {cut_segment}"
0197     chunk_size=rule_data.get("chunk-size", 100)
0198     ruleargs += f" --chunk-size {chunk_size}"
0199 
0200     ### Force options
0201     force=rule_data.get("force", False)
0202     force_delete=rule_data.get("force_delete", False)
0203     if force:
0204         WARN('"force" is not a good idea. Uncomment if you\'re sure')
0205         # ruleargs += " --force"
0206     if force_delete:
0207         WARN('"force_delete" is not a good idea. Uncomment if you\'re sure')
0208         #ruleargs += " --force-delete"
0209 
0210     ### Booleans for what to run
0211     sdh_tuple=SubmitDstHist(submit=rule_data.get("submit", False),
0212                             dstspider=rule_data.get("dstspider", True),
0213                             histspider=rule_data.get("histspider", rule_data.get("dstspider", True)),
0214                             finishmon=rule_data.get("finishmon", False)
0215                             )
0216     ## sanity
0217     for k,v in sdh_tuple._asdict().items():
0218         if not isinstance(v,bool):
0219             ERROR(f'Value of "{k}" must be (yaml-)boolean, got "{v}"')
0220             exit(1)
0221 
0222     ## cleanup
0223     while ruleargs.startswith(" "):
0224         ruleargs=ruleargs[1:-1]
0225 
0226     return thisprod,ruleargs,sdh_tuple
0227 
0228 # ============================================================================================
0229 def setup_my_rot_handler(args):
0230     sublogdir='/tmp/sphenixprod/sphenixprod/'
0231 
0232     sublogdir += f"{Path(args.steerfile).name}".replace('.yaml','')
0233     Path(sublogdir).mkdir( parents=True, exist_ok=True )
0234     RotFileHandler = RotatingFileHandler(
0235         filename=f"{sublogdir}/{str(datetime.today().date())}.log",
0236         mode='a',
0237         maxBytes=25*1024*1024, #   maxBytes=5*1024,
0238         backupCount=10,
0239         encoding=None,
0240         delay=0
0241     )
0242     RotFileHandler.setFormatter(CustomFormatter())
0243     slogger.addHandler(RotFileHandler)
0244 
0245     return sublogdir
0246 
0247 # ============================================================================================
0248 def steering_args():
0249     """Handle command line tedium for steering jobs."""
0250     arg_parser = argparse.ArgumentParser( prog='production_control.py',
0251                                           description='"Production manager to dispatch jobs depending on submit node."',
0252                                          )
0253     arg_parser.add_argument( '--steerfile', '-f', dest='steerfile', required=True,
0254                              help='Location of steering instructions per host' )
0255 
0256     arg_parser.add_argument( '--dryrun', '-n',
0257                              help="flag is passed through to the scripts", dest="dryrun", action="store_true")
0258 
0259     arg_parser.add_argument( '--hostname', dest='hostname', default=None,
0260                              help='Act as if running on [hostname]' )
0261 
0262     arg_parser.add_argument( '--profile',help="Enable profiling", action="store_true")
0263 
0264     vgroup = arg_parser.add_argument_group('Logging level')
0265     exclusive_vgroup = vgroup.add_mutually_exclusive_group()
0266     exclusive_vgroup.add_argument( '-v', '--verbose', help="Prints more information per repetition", action='count', default=0)
0267     exclusive_vgroup.add_argument( '-d', '--debug', help="Prints even more information", action="store_true")
0268     exclusive_vgroup.add_argument( '-c', '--chatty', help="Prints the most information", action="store_true")
0269     exclusive_vgroup.add_argument( '--loglevel', dest='loglevel', default='INFO',
0270                                    help="Specific logging level (CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL)" )
0271 
0272     args = arg_parser.parse_args()
0273     if args.verbose==1 :
0274         args.loglevel = 'INFO'
0275     if args.debug or args.verbose==2 :
0276         args.loglevel = 'DEBUG'
0277     if args.chatty or args.verbose==3 :
0278         args.loglevel = 'CHATTY'
0279 
0280     return args
0281 
0282 
0283 # ============================================================================================
0284 
0285 if __name__ == '__main__':
0286     main()
0287     exit(0)