Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 #!/usr/bin/env python
0002 
0003 from pathlib import Path
0004 from datetime import datetime, timedelta
0005 import pprint # noqa F401
0006 
0007 from argparsing import monitor_args
0008 from sphenixdbutils import test_mode as dbutils_test_mode
0009 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0010 from sphenixmisc import setup_rot_handler
0011 from sphenixcondortools import base_batchname_from_args, monitor_condor_jobs
0012 import random
0013 import htcondor2 as htcondor # type: ignore
0014 
0015 def main():
0016     args = monitor_args()
0017     #################### Test mode?
0018     test_mode = (
0019             dbutils_test_mode
0020             or args.test_mode
0021             # or ( hasattr(rule, 'test_mode') and rule.test_mode ) ## allow in the yaml file?
0022         )
0023 
0024     # Set up submission logging before going any further
0025     sublogdir=setup_rot_handler(args)
0026     slogger.setLevel(args.loglevel)
0027     INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0028 
0029     if test_mode:
0030         INFO("Running in testbed mode.")
0031         args.mangle_dirpath = 'production-testbed'
0032     else:
0033         INFO("Running in production mode.")
0034 
0035     batch_name=base_batchname_from_args(args)
0036     jobs=monitor_condor_jobs(batch_name=batch_name, dryrun=args.dryrun)
0037 
0038     # for ad in jobs.values():
0039     #     print(datetime.fromtimestamp(ad.get('EnteredCurrentStatus')))
0040     #     exit()
0041     # exit()
0042 
0043     # filtered_jobs_ads = jobs.values()
0044     # Filter for any desired quality here
0045     filtered_jobs_ads = []
0046     cutoff = datetime.now() - timedelta(hours=28)
0047     # minrun=78300
0048     for ad in jobs.values():
0049         # if ad.get('JobStatus') == 2:
0050         #     continue  # Only consider jobs not running
0051 
0052         # t = datetime.fromtimestamp(ad.get('EnteredCurrentStatus'))
0053         # if t > cutoff:
0054         #     filtered_jobs_ads.append(ad)
0055 
0056         filtered_jobs_ads.append(ad)
0057         # # Get argument from job ad
0058         # args_str = ad.get('Args', '')
0059         # args_list = args_str.split()
0060         # run=int(args_list[4])
0061         # if run < minrun:
0062         #     DEBUG(f"Job {ad['ClusterId']}.{ad['ProcId']} run {run} < {minrun}, killing.")
0063         #     filtered_jobs_ads.append(ad)
0064         #     continue
0065         # else:
0066         #     DEBUG(f"Job {ad['ClusterId']}.{ad['ProcId']} run {run} OK.")
0067         #     pass
0068         
0069         # segment=int(args_list[5])
0070         # if segment % 10 != 0:
0071         #     DEBUG(f"Job {ad['ClusterId']}.{ad['ProcId']} segment {segment} % 10 != 0, killing.")
0072         #     filtered_jobs_ads.append(ad)
0073         # else:
0074         #     DEBUG(f"Job {ad['ClusterId']}.{ad['ProcId']} segment {segment} OK.")
0075         #     pass
0076         
0077     if not filtered_jobs_ads:
0078         INFO(f"Found {len(jobs)} total jobs, but none qualify.")
0079         return
0080     INFO(f"Found {len(jobs)} total jobs; filtered {len(filtered_jobs_ads)} for further treatment")
0081     
0082     for job_ad in filtered_jobs_ads:
0083         # Now let's kill and resubmit this job
0084         # Fix difference between Submit object and ClassAd keys
0085         job_ad['output']=job_ad.pop('Out')
0086         job_ad['error']=job_ad.pop('Err')
0087         new_submit_ad = htcondor.Submit(dict(job_ad))
0088 
0089         # Change what you want changed. Eg, nCPU
0090         new_submit_ad['RequestCpus'] = '1'
0091         # new_submit_ad['JobPrio'] = '2'
0092         if args.resubmit:
0093             # # Extra conditions here
0094             # if random.uniform(0,1) < 0.85:
0095             #     DEBUG(f"Process {job_ad['ClusterId']}.{job_ad['ProcId']} kept running.")
0096             #     continue
0097 
0098             if not args.dryrun:
0099                 schedd = htcondor.Schedd()
0100                 try:
0101                     # The transaction context manager is deprecated. The following replacement operations are not atomic.
0102                     schedd.act(htcondor.JobAction.Remove, [f"{job_ad['ClusterId']}.{job_ad['ProcId']}"])
0103                     INFO(f"Removed held job {job_ad['ClusterId']}.{job_ad['ProcId']} from queue.")
0104                     submit_result = schedd.submit(new_submit_ad)
0105                     new_queue_id = submit_result.cluster()
0106                     # INFO(f"   ...  and resubmitted as {new_queue_id}.")
0107                 except Exception as e:
0108                     ERROR(f"Failed to remove and resubmit job {job_ad['ClusterId']}.{job_ad['ProcId']}: {e}")
0109             else:
0110                 INFO(f"(Dry Run) Would remove and resubmit job {job_ad['ClusterId']}.{job_ad['ProcId']}.")
0111 
0112     INFO(f"{Path(__file__).name} DONE.")
0113 
0114 if __name__ == '__main__':
0115     main()
0116     exit(0)