Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 import argparse
0003 from pathlib import Path
0004 import pprint # noqa F401
0005 
0006 from argparsing import monitor_args
0007 from sphenixdbutils import test_mode as dbutils_test_mode
0008 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0009 from sphenixprodrules import RuleConfig
0010 from sphenixmatching import MatchConfig
0011 from sphenixmisc import setup_rot_handler, should_I_quit
0012 import htcondor2 as htcondor  # type: ignore
0013 import classad2 as classad # type: ignore
0014 
0015 def monitor_condor_jobs(batch_name: str, dryrun: bool=True) -> dict:
0016     """
0017     Check on the status of held jobs and process them using the htcondor2 bindings.
0018     """
0019     INFO("Polling for all condor jobs using htcondor2 python bindings...")
0020     
0021     try:
0022         schedd = htcondor.Schedd()
0023 
0024         # batch_pattern = f'.*\\.{batch_name}$' ## Assumes batch_name does NOT contain 'main.' prefix
0025         batch_pattern = f'.*{batch_name}$'      ## Assumes batch_name MAY contain any prefix
0026 
0027         # Query all jobs for the batch, we will filter by status locally
0028         constraint = f'regexp("{batch_pattern}", JobBatchName)'
0029         INFO(f"Querying condor with constraint: {constraint}")
0030 
0031         attrs = [
0032             'ClusterId', 'ProcId', # access via job_id = f"{ClusterId}.{ProcId}"
0033                 # Interesting Statistics
0034             'JobStatus',  'QDate', 'CompletionDate', 
0035             'ExitCode', 'HoldReason', 'RemoveReason',
0036             'RemoteHost', 'NumJobStarts',
0037             'ResidentSetSize', 'MemoryProvisioned', 'LastHoldReasonCode',
0038             'EnteredCurrentStatus',
0039                 # Important for cloning
0040             'Owner', 'JobBatchName','Environment', 'JobPrio',
0041             'Cmd', 'Args', 'Iwd',
0042             'JobSubmitFile', 'Out', 'Err', 'UserLog',
0043             'RequestCpus', 'RequestDisk', 'RequestMemory', 'Requestxferslots'
0044         ]
0045 
0046         jobs = schedd.query(constraint=constraint, projection=attrs)
0047 
0048         if not jobs:
0049             INFO("No jobs found for the specified batch name.")
0050             return {}
0051 
0052         INFO(f"Found {len(jobs)} jobs for batch_name {batch_name}.")
0053     except Exception as e:
0054         ERROR(f"An unexpected error occurred during condor query: {e}")
0055         exit(1)
0056 
0057     ad_by_dbid = {}
0058     for ad in jobs:
0059         dbid=ad.get('Args', '').split()[-1]
0060         if not dbid.isdigit():
0061             ERROR(f"Job with Args {ad.get('Args', '')} has non-integer dbid {dbid}. Stop.")
0062             exit(1)
0063         dbid = int(dbid)
0064         if dbid in ad_by_dbid:
0065             ERROR(f"Duplicate dbid {dbid} found in jobs, overwriting previous entry. Stop.")
0066             exit(1)
0067         ad_by_dbid[dbid] = ad
0068     INFO(f"Mapped {len(ad_by_dbid)} jobs by dbid.")
0069     return ad_by_dbid
0070 
0071 # ============================================================================================
0072 def base_batchname_from_args(args: argparse.Namespace) -> str:
0073     if args.base_batchname is not None:
0074         return args.base_batchname
0075 
0076     # Prepare param_overrides for RuleConfig
0077     param_overrides = {}
0078     param_overrides["runs"] = args.runs
0079     param_overrides["runlist"] = args.runlist
0080     param_overrides["nevents"] = 0 # Not relevant, but needed for the RuleConfig ctor
0081 
0082     if args.physicsmode is not None:
0083         param_overrides["physicsmode"] = args.physicsmode
0084 
0085     # filesystem is the base for all output, allow for mangling here
0086     # "production" (in the default filesystem) is replaced
0087     param_overrides["prodmode"] = "production"
0088     if args.mangle_dirpath:
0089         param_overrides["prodmode"] = args.mangle_dirpath
0090 
0091     # Load specific rule from the given yaml file.
0092     try:
0093         rule = RuleConfig.from_yaml_file(
0094             yaml_file=args.config,
0095             rule_name=args.rulename,
0096             param_overrides=param_overrides
0097         )
0098         INFO(f"Successfully loaded rule configuration: {args.rulename}")
0099     except (ValueError, FileNotFoundError) as e:
0100         ERROR(f"Error loading rule configuration: {e}")
0101         exit(1)
0102 
0103     # Create a match configuration from the rule
0104     INFO("Match configuration created.")
0105 
0106     # Call the main monitoring function
0107     batch_name=rule.job_config.batch_name # usually starts with "main." or so. Remove that
0108     batch_name=batch_name.split(".", 1)[-1]
0109     return batch_name
0110 
0111 def main():
0112     args = monitor_args()
0113     #################### Test mode?
0114     test_mode = (
0115             dbutils_test_mode
0116             or args.test_mode
0117             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0118         )
0119 
0120     # Set up submission logging before going any further
0121     sublogdir=setup_rot_handler(args)
0122     slogger.setLevel(args.loglevel)
0123     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0124  
0125     if test_mode:
0126         INFO("Running in testbed mode.")
0127         args.mangle_dirpath = 'production-testbed'
0128     else:
0129         INFO("Running in production mode.")
0130 
0131     monitor_condor_jobs(batch_name=base_batchname_from_args(args), dryrun=args.dryrun)
0132     INFO(f"{Path(__file__).name} DONE.")
0133 
0134 if __name__ == '__main__':
0135     main()
0136     exit(0)