Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 """
0002 activate job
0003 
0004 """
0005 
0006 import datetime
0007 
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 
0012 # logger
0013 _logger = PandaLogger().getLogger("activator")
0014 
0015 
0016 class Activator:
0017     """
0018     A class used to activate jobs.
0019 
0020     Attributes
0021     ----------
0022     task_buffer : TaskBuffer
0023         The task buffer that contains the jobs.
0024     dataset : DatasetSpec
0025         The dataset to be activated.
0026     enforce : bool
0027         A flag to enforce activation.
0028 
0029     Methods
0030     -------
0031     run():
0032         Starts the thread to activate jobs.
0033     """
0034 
0035     # constructor
0036     def __init__(self, taskBuffer, dataset, enforce: bool = False):
0037         """
0038         Constructs all the necessary attributes for the Activator object.
0039 
0040         Parameters
0041         ----------
0042             taskBuffer : TaskBuffer
0043                 The task buffer that contains the jobs.
0044             dataset : DatasetSpec
0045                 The dataset to be activated.
0046             enforce : bool, optional
0047                 A flag to enforce activation (default is False).
0048         """
0049         self.dataset = dataset
0050         self.task_buffer = taskBuffer
0051         self.enforce = enforce
0052 
0053     # main
0054     def run(self):
0055         """
0056         Starts the thread to activate jobs.
0057         """
0058         tmp_log = LogWrapper(_logger, f"run-{naive_utcnow().isoformat('/')}-{self.dataset.name}")
0059         if self.dataset.status in ["completed", "deleting", "deleted"] and not self.enforce:
0060             tmp_log.debug(f"skip: {self.dataset.name}")
0061         else:
0062             # update input files
0063             panda_ids = self.task_buffer.updateInFilesReturnPandaIDs(self.dataset.name, "ready")
0064             tmp_log.debug(f"IDs: {panda_ids}")
0065             if len(panda_ids) != 0:
0066                 # get job
0067                 jobs = self.task_buffer.peekJobs(panda_ids, fromActive=False, fromArchived=False, fromWaiting=False)
0068                 # remove None and unknown
0069                 activate_jobs = []
0070                 for job in jobs:
0071                     if job is not None and job.jobStatus != "unknown":
0072                         activate_jobs.append(job)
0073                 # activate
0074                 self.task_buffer.activateJobs(activate_jobs)
0075             # update dataset in DB
0076             if self.dataset.type == "dispatch":
0077                 self.dataset.status = "completed"
0078                 self.task_buffer.updateDatasets([self.dataset])
0079         tmp_log.debug(f"end: {self.dataset.name}")