File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import time
0011
0012 from pilot.common.errorcodes import ErrorCodes
0013 from pilot.info import JobData
0014 from pilot.util.auxiliary import set_pilot_state, is_string
0015
0016 import logging
0017 logger = logging.getLogger(__name__)
0018
0019 errors = ErrorCodes()
0020
0021
0022 def declare_failed_by_kill(job, queue, sig):
0023 """
0024 Declare the job failed by a kill signal and put it in a suitable failed queue.
0025 E.g. queue=queues.failed_data_in, if the kill signal was received during stage-in.
0026
0027 :param job: job object.
0028 :param queue: queue object.
0029 :param sig: signal.
0030 :return:
0031 """
0032
0033 set_pilot_state(job=job, state="failed")
0034 error_code = errors.get_kill_signal_error_code(sig)
0035 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error_code)
0036
0037
0038 put_in_queue(job, queue)
0039
0040
0041 def scan_for_jobs(queues):
0042 """
0043 Scan queues until at least one queue has a job object. abort if it takes too long time
0044
0045 :param queues:
0046 :return: found jobs (list of job objects).
0047 """
0048
0049 t0 = time.time()
0050 found_job = False
0051 jobs = None
0052
0053 while time.time() - t0 < 30:
0054 for q in queues._fields:
0055 _q = getattr(queues, q)
0056 jobs = list(_q.queue)
0057 if len(jobs) > 0:
0058 logger.info('found %d job(s) in queue %s after %d s - will begin queue monitoring' %
0059 (len(jobs), q, time.time() - t0))
0060 found_job = True
0061 break
0062 if found_job:
0063 break
0064 else:
0065 time.sleep(0.1)
0066
0067 return jobs
0068
0069
0070 def get_queuedata_from_job(queues):
0071 """
0072 Return the queuedata object from a job in the given queues object.
0073 This function is useful if queuedata is needed from a function that does not know about the job object.
0074 E.g. the pilot monitor does not know about the job object, but still knows
0075 about the queues from which a job object can be extracted and therefore the queuedata.
0076
0077 :param queues: queues object.
0078 :return: queuedata object.
0079 """
0080
0081 queuedata = None
0082
0083
0084 jobs = scan_for_jobs(queues)
0085 if jobs:
0086 for job in jobs:
0087 queuedata = job.infosys.queuedata
0088 break
0089
0090 return queuedata
0091
0092
0093 def abort_jobs_in_queues(queues, sig):
0094 """
0095 Find all jobs in the queues and abort them.
0096
0097 :param queues: queues object.
0098 :param sig: detected kill signal.
0099 :return:
0100 """
0101
0102 jobs_list = []
0103
0104
0105 for q in queues._fields:
0106 _q = getattr(queues, q)
0107 jobs = list(_q.queue)
0108 for job in jobs:
0109 if is_string(job):
0110 continue
0111 if job not in jobs_list:
0112 jobs_list.append(job)
0113
0114 logger.info('found %d job(s) in %d queues' % (len(jobs_list), len(queues._fields)))
0115 for job in jobs_list:
0116 try:
0117 logger.info('aborting job %s' % job.jobid)
0118 declare_failed_by_kill(job, queues.failed_jobs, sig)
0119 except Exception as e:
0120 logger.warning('failed to declare job as failed: %s' % e)
0121
0122
0123 def queue_report(queues):
0124 """
0125
0126 :param queues:
0127 :return:
0128 """
0129
0130 for q in queues._fields:
0131 _q = getattr(queues, q)
0132 jobs = list(_q.queue)
0133 logger.info('queue %s has %d job(s)' % (q, len(jobs)))
0134
0135
0136 def put_in_queue(obj, queue):
0137 """
0138 Put the given object in the given queue.
0139
0140 :param obj: object.
0141 :param queue: queue object.
0142 :return:
0143 """
0144
0145
0146 if isinstance(obj, JobData):
0147 obj.add_size(obj.get_size())
0148
0149
0150 if obj not in [_obj for _obj in list(queue.queue)]:
0151 queue.put(obj)
0152
0153
0154 def purge_queue(queue):
0155 """
0156 Empty given queue.
0157
0158 :param queue:
0159 :return:
0160 """
0161
0162 while not queue.empty():
0163 try:
0164 queue.get(False)
0165 except queue.Empty:
0166 continue
0167 queue.task_done()
0168 logger.debug('queue purged')