File indexing completed on 2026-04-25 08:29:08
0001
0002 import argparse
0003 from pathlib import Path
0004 import pprint
0005
0006 from argparsing import monitor_args
0007 from sphenixdbutils import test_mode as dbutils_test_mode
0008 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0009 from sphenixprodrules import RuleConfig
0010 from sphenixmatching import MatchConfig
0011 from sphenixmisc import setup_rot_handler, should_I_quit
0012 import htcondor2 as htcondor
0013 import classad2 as classad
0014
0015 def monitor_condor_jobs(batch_name: str, dryrun: bool=True) -> dict:
0016 """
0017 Check on the status of held jobs and process them using the htcondor2 bindings.
0018 """
0019 INFO("Polling for all condor jobs using htcondor2 python bindings...")
0020
0021 try:
0022 schedd = htcondor.Schedd()
0023
0024
0025 batch_pattern = f'.*{batch_name}$'
0026
0027
0028 constraint = f'regexp("{batch_pattern}", JobBatchName)'
0029 INFO(f"Querying condor with constraint: {constraint}")
0030
0031 attrs = [
0032 'ClusterId', 'ProcId',
0033
0034 'JobStatus', 'QDate', 'CompletionDate',
0035 'ExitCode', 'HoldReason', 'RemoveReason',
0036 'RemoteHost', 'NumJobStarts',
0037 'ResidentSetSize', 'MemoryProvisioned', 'LastHoldReasonCode',
0038 'EnteredCurrentStatus',
0039
0040 'Owner', 'JobBatchName','Environment', 'JobPrio',
0041 'Cmd', 'Args', 'Iwd',
0042 'JobSubmitFile', 'Out', 'Err', 'UserLog',
0043 'RequestCpus', 'RequestDisk', 'RequestMemory', 'Requestxferslots'
0044 ]
0045
0046 jobs = schedd.query(constraint=constraint, projection=attrs)
0047
0048 if not jobs:
0049 INFO("No jobs found for the specified batch name.")
0050 return {}
0051
0052 INFO(f"Found {len(jobs)} jobs for batch_name {batch_name}.")
0053 except Exception as e:
0054 ERROR(f"An unexpected error occurred during condor query: {e}")
0055 exit(1)
0056
0057 ad_by_dbid = {}
0058 for ad in jobs:
0059 dbid=ad.get('Args', '').split()[-1]
0060 if not dbid.isdigit():
0061 ERROR(f"Job with Args {ad.get('Args', '')} has non-integer dbid {dbid}. Stop.")
0062 exit(1)
0063 dbid = int(dbid)
0064 if dbid in ad_by_dbid:
0065 ERROR(f"Duplicate dbid {dbid} found in jobs, overwriting previous entry. Stop.")
0066 exit(1)
0067 ad_by_dbid[dbid] = ad
0068 INFO(f"Mapped {len(ad_by_dbid)} jobs by dbid.")
0069 return ad_by_dbid
0070
0071
0072 def base_batchname_from_args(args: argparse.Namespace) -> str:
0073 if args.base_batchname is not None:
0074 return args.base_batchname
0075
0076
0077 param_overrides = {}
0078 param_overrides["runs"] = args.runs
0079 param_overrides["runlist"] = args.runlist
0080 param_overrides["nevents"] = 0
0081
0082 if args.physicsmode is not None:
0083 param_overrides["physicsmode"] = args.physicsmode
0084
0085
0086
0087 param_overrides["prodmode"] = "production"
0088 if args.mangle_dirpath:
0089 param_overrides["prodmode"] = args.mangle_dirpath
0090
0091
0092 try:
0093 rule = RuleConfig.from_yaml_file(
0094 yaml_file=args.config,
0095 rule_name=args.rulename,
0096 param_overrides=param_overrides
0097 )
0098 INFO(f"Successfully loaded rule configuration: {args.rulename}")
0099 except (ValueError, FileNotFoundError) as e:
0100 ERROR(f"Error loading rule configuration: {e}")
0101 exit(1)
0102
0103
0104 INFO("Match configuration created.")
0105
0106
0107 batch_name=rule.job_config.batch_name
0108 batch_name=batch_name.split(".", 1)[-1]
0109 return batch_name
0110
0111 def main():
0112 args = monitor_args()
0113
0114 test_mode = (
0115 dbutils_test_mode
0116 or args.test_mode
0117
0118 )
0119
0120
0121 sublogdir=setup_rot_handler(args)
0122 slogger.setLevel(args.loglevel)
0123 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0124
0125 if test_mode:
0126 INFO("Running in testbed mode.")
0127 args.mangle_dirpath = 'production-testbed'
0128 else:
0129 INFO("Running in production mode.")
0130
0131 monitor_condor_jobs(batch_name=base_batchname_from_args(args), dryrun=args.dryrun)
0132 INFO(f"{Path(__file__).name} DONE.")
0133
0134 if __name__ == '__main__':
0135 main()
0136 exit(0)