File indexing completed on 2026-04-10 08:39:01
0001 """
0002 activate job
0003
0004 """
0005
0006 import datetime
0007
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011
0012
0013 _logger = PandaLogger().getLogger("activator")
0014
0015
0016 class Activator:
0017 """
0018 A class used to activate jobs.
0019
0020 Attributes
0021 ----------
0022 task_buffer : TaskBuffer
0023 The task buffer that contains the jobs.
0024 dataset : DatasetSpec
0025 The dataset to be activated.
0026 enforce : bool
0027 A flag to enforce activation.
0028
0029 Methods
0030 -------
0031 run():
0032 Starts the thread to activate jobs.
0033 """
0034
0035
0036 def __init__(self, taskBuffer, dataset, enforce: bool = False):
0037 """
0038 Constructs all the necessary attributes for the Activator object.
0039
0040 Parameters
0041 ----------
0042 taskBuffer : TaskBuffer
0043 The task buffer that contains the jobs.
0044 dataset : DatasetSpec
0045 The dataset to be activated.
0046 enforce : bool, optional
0047 A flag to enforce activation (default is False).
0048 """
0049 self.dataset = dataset
0050 self.task_buffer = taskBuffer
0051 self.enforce = enforce
0052
0053
0054 def run(self):
0055 """
0056 Starts the thread to activate jobs.
0057 """
0058 tmp_log = LogWrapper(_logger, f"run-{naive_utcnow().isoformat('/')}-{self.dataset.name}")
0059 if self.dataset.status in ["completed", "deleting", "deleted"] and not self.enforce:
0060 tmp_log.debug(f"skip: {self.dataset.name}")
0061 else:
0062
0063 panda_ids = self.task_buffer.updateInFilesReturnPandaIDs(self.dataset.name, "ready")
0064 tmp_log.debug(f"IDs: {panda_ids}")
0065 if len(panda_ids) != 0:
0066
0067 jobs = self.task_buffer.peekJobs(panda_ids, fromActive=False, fromArchived=False, fromWaiting=False)
0068
0069 activate_jobs = []
0070 for job in jobs:
0071 if job is not None and job.jobStatus != "unknown":
0072 activate_jobs.append(job)
0073
0074 self.task_buffer.activateJobs(activate_jobs)
0075
0076 if self.dataset.type == "dispatch":
0077 self.dataset.status = "completed"
0078 self.task_buffer.updateDatasets([self.dataset])
0079 tmp_log.debug(f"end: {self.dataset.name}")