Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2019
0009 
0010 import time
0011 
0012 from pilot.common.errorcodes import ErrorCodes
0013 from pilot.info import JobData
0014 from pilot.util.auxiliary import set_pilot_state, is_string
0015 
0016 import logging
0017 logger = logging.getLogger(__name__)
0018 
0019 errors = ErrorCodes()
0020 
0021 
0022 def declare_failed_by_kill(job, queue, sig):
0023     """
0024     Declare the job failed by a kill signal and put it in a suitable failed queue.
0025     E.g. queue=queues.failed_data_in, if the kill signal was received during stage-in.
0026 
0027     :param job: job object.
0028     :param queue: queue object.
0029     :param sig: signal.
0030     :return:
0031     """
0032 
0033     set_pilot_state(job=job, state="failed")
0034     error_code = errors.get_kill_signal_error_code(sig)
0035     job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error_code)
0036 
0037     #queue.put(job)
0038     put_in_queue(job, queue)
0039 
0040 
0041 def scan_for_jobs(queues):
0042     """
0043     Scan queues until at least one queue has a job object. abort if it takes too long time
0044 
0045     :param queues:
0046     :return: found jobs (list of job objects).
0047     """
0048 
0049     t0 = time.time()
0050     found_job = False
0051     jobs = None
0052 
0053     while time.time() - t0 < 30:
0054         for q in queues._fields:
0055             _q = getattr(queues, q)
0056             jobs = list(_q.queue)
0057             if len(jobs) > 0:
0058                 logger.info('found %d job(s) in queue %s after %d s - will begin queue monitoring' %
0059                             (len(jobs), q, time.time() - t0))
0060                 found_job = True
0061                 break
0062         if found_job:
0063             break
0064         else:
0065             time.sleep(0.1)
0066 
0067     return jobs
0068 
0069 
0070 def get_queuedata_from_job(queues):
0071     """
0072     Return the queuedata object from a job in the given queues object.
0073     This function is useful if queuedata is needed from a function that does not know about the job object.
0074     E.g. the pilot monitor does not know about the job object, but still knows
0075     about the queues from which a job object can be extracted and therefore the queuedata.
0076 
0077     :param queues: queues object.
0078     :return: queuedata object.
0079     """
0080 
0081     queuedata = None
0082 
0083     # extract jobs from the queues
0084     jobs = scan_for_jobs(queues)
0085     if jobs:
0086         for job in jobs:
0087             queuedata = job.infosys.queuedata
0088             break
0089 
0090     return queuedata
0091 
0092 
0093 def abort_jobs_in_queues(queues, sig):
0094     """
0095     Find all jobs in the queues and abort them.
0096 
0097     :param queues: queues object.
0098     :param sig: detected kill signal.
0099     :return:
0100     """
0101 
0102     jobs_list = []
0103 
0104     # loop over all queues and find all jobs
0105     for q in queues._fields:
0106         _q = getattr(queues, q)
0107         jobs = list(_q.queue)
0108         for job in jobs:
0109             if is_string(job):  # this will be the case for the completed_jobids queue
0110                 continue
0111             if job not in jobs_list:
0112                 jobs_list.append(job)
0113 
0114     logger.info('found %d job(s) in %d queues' % (len(jobs_list), len(queues._fields)))
0115     for job in jobs_list:
0116         try:
0117             logger.info('aborting job %s' % job.jobid)
0118             declare_failed_by_kill(job, queues.failed_jobs, sig)
0119         except Exception as e:
0120             logger.warning('failed to declare job as failed: %s' % e)
0121 
0122 
0123 def queue_report(queues):
0124     """
0125 
0126     :param queues:
0127     :return:
0128     """
0129 
0130     for q in queues._fields:
0131         _q = getattr(queues, q)
0132         jobs = list(_q.queue)
0133         logger.info('queue %s has %d job(s)' % (q, len(jobs)))
0134 
0135 
0136 def put_in_queue(obj, queue):
0137     """
0138     Put the given object in the given queue.
0139 
0140     :param obj: object.
0141     :param queue: queue object.
0142     :return:
0143     """
0144 
0145     # update job object size (currently not used)
0146     if isinstance(obj, JobData):
0147         obj.add_size(obj.get_size())
0148 
0149     # only put the object in the queue if it is not there already
0150     if obj not in [_obj for _obj in list(queue.queue)]:
0151         queue.put(obj)
0152 
0153 
0154 def purge_queue(queue):
0155     """
0156     Empty given queue.
0157 
0158     :param queue:
0159     :return:
0160     """
0161 
0162     while not queue.empty():
0163         try:
0164             queue.get(False)
0165         except queue.Empty:
0166             continue
0167         queue.task_done()
0168     logger.debug('queue purged')