File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from pathlib import Path
0004 import collections
0005 import pickle
0006 import json
0007
0008 import pprint
0009
0010 from argparsing import monitor_args
0011 from sphenixdbutils import test_mode as dbutils_test_mode
0012 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0013 from sphenixmisc import setup_rot_handler
0014 from sphenixcondortools import base_batchname_from_args, monitor_condor_jobs
0015 import htcondor2 as htcondor
0016
0017 def main():
0018 args = monitor_args()
0019
0020 test_mode = (
0021 dbutils_test_mode
0022 or args.test_mode
0023
0024 )
0025
0026
0027 sublogdir=setup_rot_handler(args)
0028 slogger.setLevel(args.loglevel)
0029 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0030
0031 if test_mode:
0032 INFO("Running in testbed mode.")
0033 args.mangle_dirpath = 'production-testbed'
0034 else:
0035 INFO("Running in production mode.")
0036
0037 batch_name=base_batchname_from_args(args)
0038 jobs=monitor_condor_jobs(batch_name=batch_name, dryrun=args.dryrun)
0039
0040
0041 held_jobs_ads = [ad for ad in jobs.values() if ad.get('JobStatus') == 5]
0042
0043 if not held_jobs_ads:
0044 INFO(f"Found {len(jobs)} total jobs, but none are currently held.")
0045 return
0046
0047 INFO(f"Found {len(jobs)} total jobs, {len(held_jobs_ads)} of which are held.")
0048
0049 held_memory_usage = []
0050 held_request_memory = []
0051 kill_suggestion = []
0052 under_memory_hold_reasons = collections.Counter()
0053 for job_ad in held_jobs_ads:
0054
0055 mu = int(job_ad.get('ResidentSetSize', 0))/1024
0056 rm = int(job_ad.get('MemoryProvisioned', 0))
0057 held_memory_usage.append(mu)
0058 held_request_memory.append(rm)
0059
0060 if mu < rm:
0061 hold_reason = job_ad.get('HoldReason', 'Not Available')
0062 job_id = f"{job_ad.get('ClusterId')}.{job_ad.get('ProcId')}"
0063 DEBUG(f"Job {job_id} held with mu ({mu:.0f}MB) < rm ({rm}MB). Reason: {hold_reason}")
0064 reason_code = job_ad.get('LastHoldReasonCode', 0)
0065 if reason_code !=26 :
0066 WARN(f'Job {job_id} held with mu ({mu:.0f}MB) < rm ({rm}MB). Reason Code {reason_code}:\n\t"{hold_reason}"')
0067 under_memory_hold_reasons[reason_code] += 1
0068
0069
0070
0071 job_ad['output']=job_ad.pop('Out')
0072 job_ad['error']=job_ad.pop('Err')
0073
0074 new_submit_ad = htcondor.Submit(dict(job_ad))
0075 if args.memory:
0076 new_rm=int(args.memory)
0077 else:
0078 new_rm=int(rm)
0079 new_rm=int(new_rm * args.memory_scale_factor)
0080
0081 if new_rm > args.max_memory:
0082 WARN(f"Calculated new memory request {new_rm}MB exceeds maximum of {args.max_memory}MB. Skipping.")
0083
0084 kill_suggestion.append(job_ad)
0085 continue
0086 new_submit_ad['RequestMemory'] = str(new_rm)
0087 if args.resubmit:
0088 if not args.dryrun:
0089 schedd = htcondor.Schedd()
0090 try:
0091
0092 schedd.act(htcondor.JobAction.Remove, [f"{job_ad['ClusterId']}.{job_ad['ProcId']}"])
0093 INFO(f"Removed held job {job_ad['ClusterId']}.{job_ad['ProcId']} from queue.")
0094 submit_result = schedd.submit(new_submit_ad)
0095 new_queue_id = submit_result.cluster()
0096 INFO(f"Resubmitted job with increased memory request ({rm}MB -> {new_rm}MB) as {new_queue_id}.")
0097 except Exception as e:
0098 ERROR(f"Failed to remove and resubmit job {job_ad['ClusterId']}.{job_ad['ProcId']}: {e}")
0099 else:
0100 INFO(f"(Dry Run) Would remove held job {job_ad['ClusterId']}.{job_ad['ProcId']} and resubmit with RequestMemory={new_rm}MB.")
0101
0102 if kill_suggestion:
0103 kill_procs=[f"{job_ad['ClusterId']}.{job_ad['ProcId']}" for job_ad in kill_suggestion]
0104 INFO(f"There were {len(kill_suggestion)} jobs that could not be resubmitted due to exceeding max memory.")
0105 if args.kill:
0106 INFO(f"Killing them now as per --kill option.")
0107 if not args.dryrun:
0108 schedd = htcondor.Schedd()
0109
0110
0111
0112
0113
0114 try:
0115 schedd.act(htcondor.JobAction.Remove, kill_procs)
0116 INFO(f"Killed {len(kill_suggestion)} jobs that exceeded max memory limit of {args.max_memory}MB.")
0117 except Exception as e:
0118 ERROR(f"Failed to kill jobs: {e}")
0119 else:
0120 INFO(f"You may want to kill them manually: \n{' '.join(kill_procs)}")
0121
0122
0123 INFO(f"{Path(__file__).name} DONE.")
0124
0125 if __name__ == '__main__':
0126 main()
0127 exit(0)