File indexing completed on 2026-04-10 08:38:59
0001 import datetime
0002 import sys
0003 import time
0004
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.PandaUtils import naive_utcnow
0007
0008 from pandajedi.jediconfig import jedi_config
0009 from pandajedi.jedicore import Interaction
0010 from pandajedi.jedicore.FactoryBase import FactoryBase
0011 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0012 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0013
0014 from .JediKnight import JediKnight
0015
0016 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0017
0018
0019
0020 class TaskBroker(JediKnight, FactoryBase):
0021
0022 def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0023 self.vos = self.parseInit(vos)
0024 self.prodSourceLabels = self.parseInit(prodSourceLabels)
0025 JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0026 FactoryBase.__init__(self, self.vos, self.prodSourceLabels, logger, jedi_config.taskbroker.modConfig)
0027
0028
0029 def start(self):
0030
0031 JediKnight.start(self)
0032 FactoryBase.initializeMods(self, self.taskBufferIF, self.ddmIF)
0033
0034 while True:
0035 startTime = naive_utcnow()
0036 try:
0037
0038 tmpLog = MsgWrapper(logger)
0039 tmpLog.debug("start TaskBroker")
0040
0041 workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0042 resource_types = self.taskBufferIF.load_resource_types()
0043
0044
0045 for vo in self.vos:
0046
0047 for prodSourceLabel in self.prodSourceLabels:
0048
0049 for workQueue in workQueueMapper.getAlignedQueueList(vo, prodSourceLabel):
0050 for resource_type in resource_types:
0051 wq_name = "_".join(workQueue.queue_name.split(" "))
0052 msgLabel = f"vo={vo} label={prodSourceLabel} queue={wq_name} resource_type={resource_type.resource_name}: "
0053 tmpLog.debug(msgLabel + "start")
0054
0055 tmpList = self.taskBufferIF.getTasksToCheckAssignment_JEDI(vo, prodSourceLabel, workQueue, resource_type.resource_name)
0056 if tmpList is None:
0057
0058 tmpLog.error(msgLabel + "failed to get the list of tasks to check")
0059 else:
0060 tmpLog.debug(msgLabel + f"got tasks_to_check={len(tmpList)}")
0061
0062 taskList = ListWithLock(tmpList)
0063
0064 threadPool = ThreadPool()
0065
0066 nWorker = jedi_config.taskbroker.nWorkers
0067 for iWorker in range(nWorker):
0068 thr = TaskCheckerThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self, vo, prodSourceLabel)
0069 thr.start()
0070
0071 threadPool.join()
0072
0073 tmpList = self.taskBufferIF.getTasksToAssign_JEDI(vo, prodSourceLabel, workQueue, resource_type.resource_name)
0074 if tmpList is None:
0075
0076 tmpLog.error(msgLabel + "failed to get the list of tasks to assign")
0077 else:
0078 tmpLog.debug(msgLabel + f"got tasks_to_assign={len(tmpList)}")
0079
0080 taskList = ListWithLock(tmpList)
0081
0082 threadPool = ThreadPool()
0083
0084 nWorker = jedi_config.taskbroker.nWorkers
0085 for iWorker in range(nWorker):
0086 thr = TaskBrokerThread(
0087 taskList,
0088 threadPool,
0089 self.taskBufferIF,
0090 self.ddmIF,
0091 self,
0092 vo,
0093 prodSourceLabel,
0094 workQueue,
0095 resource_type.resource_name,
0096 )
0097 thr.start()
0098
0099 threadPool.join()
0100 tmpLog.debug(msgLabel + "done")
0101 except Exception:
0102 errtype, errvalue = sys.exc_info()[:2]
0103 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}")
0104 tmpLog.debug("done")
0105
0106 loopCycle = jedi_config.taskbroker.loopCycle
0107 timeDelta = naive_utcnow() - startTime
0108 sleepPeriod = loopCycle - timeDelta.seconds
0109 if sleepPeriod > 0:
0110 time.sleep(sleepPeriod)
0111
0112 self.randomSleep(max_val=loopCycle)
0113
0114
0115
0116 class TaskCheckerThread(WorkerThread):
0117
0118 def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory, vo, prodSourceLabel):
0119
0120 WorkerThread.__init__(self, None, threadPool, logger)
0121
0122 self.taskList = taskList
0123 self.taskBufferIF = taskbufferIF
0124 self.ddmIF = ddmIF.getInterface(vo)
0125 self.implFactory = implFactory
0126 self.vo = vo
0127 self.prodSourceLabel = prodSourceLabel
0128
0129
0130 def runImpl(self):
0131 while True:
0132 try:
0133
0134 nTasks = 100
0135 taskList = self.taskList.get(nTasks)
0136 totalTasks, idxTasks = self.taskList.stat()
0137
0138 if len(taskList) == 0:
0139 self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0140 return
0141
0142 tmpLog = MsgWrapper(self.logger)
0143 tmpLog.info(f"start TaskCheckerThread {idxTasks}/{totalTasks} for jediTaskID={taskList}")
0144 tmpStat = Interaction.SC_SUCCEEDED
0145
0146 taskSpecList = []
0147 for jediTaskID in taskList:
0148 tmpRet, taskSpec = self.taskBufferIF.getTaskWithID_JEDI(jediTaskID, False)
0149 if tmpRet and taskSpec is not None:
0150 taskSpecList.append(taskSpec)
0151 else:
0152 tmpLog.error(f"failed to get taskSpec for jediTaskID={jediTaskID}")
0153 if taskSpecList != []:
0154
0155 if tmpStat == Interaction.SC_SUCCEEDED:
0156 tmpLog.info("getting Impl")
0157 try:
0158 impl = self.implFactory.getImpl(self.vo, self.prodSourceLabel)
0159 if impl is None:
0160
0161 tmpLog.error(f"task broker is undefined for vo={self.vo} sourceLabel={self.prodSourceLabel}")
0162 tmpStat = Interaction.SC_FAILED
0163 except Exception:
0164 errtype, errvalue = sys.exc_info()[:2]
0165 tmpLog.error(f"getImpl failed with {errtype.__name__}:{errvalue}")
0166 tmpStat = Interaction.SC_FAILED
0167
0168 if tmpStat == Interaction.SC_SUCCEEDED:
0169 tmpLog.info(f"check with {impl.__class__.__name__}")
0170 try:
0171 tmpStat, taskCloudMap = impl.doCheck(taskSpecList)
0172 except Exception:
0173 errtype, errvalue = sys.exc_info()[:2]
0174 tmpLog.error(f"doCheck failed with {errtype.__name__}:{errvalue}")
0175 tmpStat = Interaction.SC_FAILED
0176
0177 if tmpStat != Interaction.SC_SUCCEEDED:
0178 tmpLog.error("failed to check assignment")
0179 else:
0180 tmpRet = self.taskBufferIF.setCloudToTasks_JEDI(taskCloudMap)
0181 tmpLog.info(f"done with {tmpRet} for {str(taskCloudMap)}")
0182 except Exception:
0183 errtype, errvalue = sys.exc_info()[:2]
0184 logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}")
0185
0186
0187
0188 class TaskBrokerThread(WorkerThread):
0189
0190 def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory, vo, prodSourceLabel, workQueue, resource_name):
0191
0192 WorkerThread.__init__(self, None, threadPool, logger)
0193
0194 self.taskList = taskList
0195 self.taskBufferIF = taskbufferIF
0196 self.ddmIF = ddmIF.getInterface(vo)
0197 self.implFactory = implFactory
0198 self.vo = vo
0199 self.prodSourceLabel = prodSourceLabel
0200 self.workQueue = workQueue
0201 self.resource_name = resource_name
0202
0203
0204 def runImpl(self):
0205 while True:
0206 try:
0207
0208 nTasks = 100
0209 taskList = self.taskList.get(nTasks)
0210 totalTasks, idxTasks = self.taskList.stat()
0211
0212 if len(taskList) == 0:
0213 self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0214 return
0215
0216 tmpLog = MsgWrapper(self.logger)
0217 tmpLog.info(f"start TaskBrokerThread {idxTasks}/{totalTasks} for jediTaskID={taskList}")
0218 tmpStat = Interaction.SC_SUCCEEDED
0219
0220 tmpListToAssign = []
0221 for tmpTaskItem in taskList:
0222 tmpListItem = self.taskBufferIF.getTasksToBeProcessed_JEDI(
0223 None, None, None, None, None, simTasks=[tmpTaskItem], readMinFiles=True, fullSimulation=True
0224 )
0225 if tmpListItem is None:
0226
0227 tmpLog.error(f"failed to get the input chunks for jediTaskID={tmpTaskItem}")
0228 tmpStat = Interaction.SC_FAILED
0229 break
0230 tmpListToAssign += tmpListItem
0231
0232 if tmpStat == Interaction.SC_SUCCEEDED:
0233 tmpLog.info("getting Impl")
0234 try:
0235 impl = self.implFactory.getImpl(self.vo, self.prodSourceLabel)
0236 if impl is None:
0237
0238 tmpLog.error(f"task broker is undefined for vo={self.vo} sourceLabel={self.prodSourceLabel}")
0239 tmpStat = Interaction.SC_FAILED
0240 except Exception:
0241 errtype, errvalue = sys.exc_info()[:2]
0242 tmpLog.error(f"getImpl failed with {errtype.__name__}:{errvalue}")
0243 tmpStat = Interaction.SC_FAILED
0244
0245 if tmpStat == Interaction.SC_SUCCEEDED:
0246 tmpLog.info(f"brokerage with {impl.__class__.__name__} for {len(tmpListToAssign)} tasks ")
0247 try:
0248 tmpStat = impl.doBrokerage(tmpListToAssign, self.vo, self.prodSourceLabel, self.workQueue, self.resource_name)
0249 except Exception:
0250 errtype, errvalue = sys.exc_info()[:2]
0251 tmpLog.error(f"doBrokerage failed with {errtype.__name__}:{errvalue}")
0252 tmpStat = Interaction.SC_FAILED
0253
0254 if tmpStat != Interaction.SC_SUCCEEDED:
0255 tmpLog.error("failed")
0256 else:
0257 tmpLog.info("done")
0258 except Exception:
0259 errtype, errvalue = sys.exc_info()[:2]
0260 logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}")
0261
0262
0263
0264
0265
0266 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0267 p = TaskBroker(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0268 p.start()