Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 
0003 from pathlib import Path
0004 import collections
0005 import pickle
0006 import json
0007 
0008 import pprint # noqa F401
0009 
0010 from argparsing import monitor_args
0011 from sphenixdbutils import test_mode as dbutils_test_mode
0012 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0013 from sphenixmisc import setup_rot_handler
0014 from sphenixcondortools import base_batchname_from_args, monitor_condor_jobs
0015 import htcondor2 as htcondor # type: ignore
0016 
0017 def main():
0018     args = monitor_args()
0019     #################### Test mode?
0020     test_mode = (
0021             dbutils_test_mode
0022             or args.test_mode
0023             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0024         )
0025 
0026     # Set up submission logging before going any further
0027     sublogdir=setup_rot_handler(args)
0028     slogger.setLevel(args.loglevel)
0029     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0030 
0031     if test_mode:
0032         INFO("Running in testbed mode.")
0033         args.mangle_dirpath = 'production-testbed'
0034     else:
0035         INFO("Running in production mode.")
0036 
0037     batch_name=base_batchname_from_args(args)
0038     jobs=monitor_condor_jobs(batch_name=batch_name, dryrun=args.dryrun)
0039 
0040     # Filter for held jobs (JobStatus == 5)
0041     held_jobs_ads = [ad for ad in jobs.values() if ad.get('JobStatus') == 5]
0042 
0043     if not held_jobs_ads:
0044         INFO(f"Found {len(jobs)} total jobs, but none are currently held.")
0045         return
0046 
0047     INFO(f"Found {len(jobs)} total jobs, {len(held_jobs_ads)} of which are held.")
0048 
0049     held_memory_usage = []
0050     held_request_memory = []
0051     kill_suggestion = []
0052     under_memory_hold_reasons = collections.Counter()
0053     for job_ad in held_jobs_ads:
0054         # MemoryUsage and RequestMemory are in MB
0055         mu = int(job_ad.get('ResidentSetSize', 0))/1024  # Convert from KB to MB
0056         rm = int(job_ad.get('MemoryProvisioned', 0))
0057         held_memory_usage.append(mu)
0058         held_request_memory.append(rm)
0059         # If memory usage is below request, it's interesting to see why it's held.
0060         if mu < rm:
0061             hold_reason = job_ad.get('HoldReason', 'Not Available')
0062             job_id = f"{job_ad.get('ClusterId')}.{job_ad.get('ProcId')}"
0063             DEBUG(f"Job {job_id} held with mu ({mu:.0f}MB) < rm ({rm}MB). Reason: {hold_reason}")
0064             reason_code = job_ad.get('LastHoldReasonCode', 0) # Default to 0 (None)
0065             if reason_code !=26 :
0066                 WARN(f'Job {job_id} held with mu ({mu:.0f}MB) < rm ({rm}MB). Reason Code {reason_code}:\n\t"{hold_reason}"')
0067             under_memory_hold_reasons[reason_code] += 1
0068 
0069         # Now let's kill and resubmit this job
0070         # Fix difference between Submit object and ClassAd keys
0071         job_ad['output']=job_ad.pop('Out')
0072         job_ad['error']=job_ad.pop('Err')
0073         # adjust memory request
0074         new_submit_ad = htcondor.Submit(dict(job_ad))
0075         if args.memory:
0076             new_rm=int(args.memory)
0077         else:
0078             new_rm=int(rm)
0079             new_rm=int(new_rm * args.memory_scale_factor)
0080 
0081             if new_rm > args.max_memory:
0082                 WARN(f"Calculated new memory request {new_rm}MB exceeds maximum of {args.max_memory}MB. Skipping.")
0083                 #kill_suggestion.append(f"{job_ad['ClusterId']}.{job_ad['ProcId']}")
0084                 kill_suggestion.append(job_ad)
0085                 continue
0086         new_submit_ad['RequestMemory'] = str(new_rm)
0087         if args.resubmit:
0088             if not args.dryrun:
0089                 schedd = htcondor.Schedd()
0090                 try:
0091                     # The transaction context manager is deprecated. The following replacement operations are not atomic.
0092                     schedd.act(htcondor.JobAction.Remove, [f"{job_ad['ClusterId']}.{job_ad['ProcId']}"])
0093                     INFO(f"Removed held job {job_ad['ClusterId']}.{job_ad['ProcId']} from queue.")
0094                     submit_result = schedd.submit(new_submit_ad)
0095                     new_queue_id = submit_result.cluster()
0096                     INFO(f"Resubmitted job with increased memory request ({rm}MB -> {new_rm}MB) as {new_queue_id}.")                    
0097                 except Exception as e:
0098                     ERROR(f"Failed to remove and resubmit job {job_ad['ClusterId']}.{job_ad['ProcId']}: {e}")
0099             else:
0100                 INFO(f"(Dry Run) Would remove held job {job_ad['ClusterId']}.{job_ad['ProcId']} and resubmit with RequestMemory={new_rm}MB.")
0101 
0102     if kill_suggestion:
0103         kill_procs=[f"{job_ad['ClusterId']}.{job_ad['ProcId']}" for job_ad in kill_suggestion]
0104         INFO(f"There were {len(kill_suggestion)} jobs that could not be resubmitted due to exceeding max memory.")
0105         if args.kill:
0106             INFO(f"Killing them now as per --kill option.")
0107             if not args.dryrun:
0108                 schedd = htcondor.Schedd()
0109                 # with open(f"{batch_name}_killed_jobs.pkl", "wb") as f:
0110                 #     pickle.dump(kill_suggestion, f)
0111                 # with open(f"{batch_name}_killed_jobs.json", "w") as f:
0112                 #     for job_ad in kill_suggestion:
0113                 #         json.dump(dict(job_ad), f, indent=4)
0114                 try:
0115                     schedd.act(htcondor.JobAction.Remove, kill_procs)
0116                     INFO(f"Killed {len(kill_suggestion)} jobs that exceeded max memory limit of {args.max_memory}MB.")
0117                 except Exception as e:
0118                     ERROR(f"Failed to kill jobs: {e}")
0119         else:
0120             INFO(f"You may want to kill them manually: \n{' '.join(kill_procs)}")
0121 
0122 
0123     INFO(f"{Path(__file__).name} DONE.")
0124 
0125 if __name__ == '__main__':
0126     main()
0127     exit(0)