File indexing completed on 2026-04-25 08:29:08
0001
0002
0003 from pathlib import Path
0004 from datetime import datetime, timedelta
0005 import pprint
0006
0007 from argparsing import monitor_args
0008 from sphenixdbutils import test_mode as dbutils_test_mode
0009 from simpleLogger import slogger, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0010 from sphenixmisc import setup_rot_handler
0011 from sphenixcondortools import base_batchname_from_args, monitor_condor_jobs
0012 import random
0013 import htcondor2 as htcondor
0014
0015 def main():
0016 args = monitor_args()
0017
0018 test_mode = (
0019 dbutils_test_mode
0020 or args.test_mode
0021
0022 )
0023
0024
0025 sublogdir=setup_rot_handler(args)
0026 slogger.setLevel(args.loglevel)
0027 INFO(f"Logging to {sublogdir}, level {args.loglevel}")
0028
0029 if test_mode:
0030 INFO("Running in testbed mode.")
0031 args.mangle_dirpath = 'production-testbed'
0032 else:
0033 INFO("Running in production mode.")
0034
0035 batch_name=base_batchname_from_args(args)
0036 jobs=monitor_condor_jobs(batch_name=batch_name, dryrun=args.dryrun)
0037
0038
0039
0040
0041
0042
0043
0044
0045 filtered_jobs_ads = []
0046 cutoff = datetime.now() - timedelta(hours=28)
0047
0048 for ad in jobs.values():
0049
0050
0051
0052
0053
0054
0055
0056 filtered_jobs_ads.append(ad)
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077 if not filtered_jobs_ads:
0078 INFO(f"Found {len(jobs)} total jobs, but none qualify.")
0079 return
0080 INFO(f"Found {len(jobs)} total jobs; filtered {len(filtered_jobs_ads)} for further treatment")
0081
0082 for job_ad in filtered_jobs_ads:
0083
0084
0085 job_ad['output']=job_ad.pop('Out')
0086 job_ad['error']=job_ad.pop('Err')
0087 new_submit_ad = htcondor.Submit(dict(job_ad))
0088
0089
0090 new_submit_ad['RequestCpus'] = '1'
0091
0092 if args.resubmit:
0093
0094
0095
0096
0097
0098 if not args.dryrun:
0099 schedd = htcondor.Schedd()
0100 try:
0101
0102 schedd.act(htcondor.JobAction.Remove, [f"{job_ad['ClusterId']}.{job_ad['ProcId']}"])
0103 INFO(f"Removed held job {job_ad['ClusterId']}.{job_ad['ProcId']} from queue.")
0104 submit_result = schedd.submit(new_submit_ad)
0105 new_queue_id = submit_result.cluster()
0106
0107 except Exception as e:
0108 ERROR(f"Failed to remove and resubmit job {job_ad['ClusterId']}.{job_ad['ProcId']}: {e}")
0109 else:
0110 INFO(f"(Dry Run) Would remove and resubmit job {job_ad['ClusterId']}.{job_ad['ProcId']}.")
0111
0112 INFO(f"{Path(__file__).name} DONE.")
0113
0114 if __name__ == '__main__':
0115 main()
0116 exit(0)