Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import datetime
0002 import sys
0003 import time
0004 
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.PandaUtils import naive_utcnow
0007 
0008 from pandajedi.jediconfig import jedi_config
0009 from pandajedi.jedicore import Interaction
0010 from pandajedi.jedicore.FactoryBase import FactoryBase
0011 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0012 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0013 
0014 from .JediKnight import JediKnight
0015 
0016 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0017 
0018 
0019 # worker class to refine TASK_PARAM to fill JEDI tables
0020 class TaskBroker(JediKnight, FactoryBase):
0021     # constructor
0022     def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0023         self.vos = self.parseInit(vos)
0024         self.prodSourceLabels = self.parseInit(prodSourceLabels)
0025         JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0026         FactoryBase.__init__(self, self.vos, self.prodSourceLabels, logger, jedi_config.taskbroker.modConfig)
0027 
0028     # main
0029     def start(self):
0030         # start base classes
0031         JediKnight.start(self)
0032         FactoryBase.initializeMods(self, self.taskBufferIF, self.ddmIF)
0033         # go into main loop
0034         while True:
0035             startTime = naive_utcnow()
0036             try:
0037                 # get logger
0038                 tmpLog = MsgWrapper(logger)
0039                 tmpLog.debug("start TaskBroker")
0040                 # get work queue mapper
0041                 workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0042                 resource_types = self.taskBufferIF.load_resource_types()
0043 
0044                 # loop over all vos
0045                 for vo in self.vos:
0046                     # loop over all sourceLabels
0047                     for prodSourceLabel in self.prodSourceLabels:
0048                         # loop over all work queues
0049                         for workQueue in workQueueMapper.getAlignedQueueList(vo, prodSourceLabel):
0050                             for resource_type in resource_types:
0051                                 wq_name = "_".join(workQueue.queue_name.split(" "))
0052                                 msgLabel = f"vo={vo} label={prodSourceLabel} queue={wq_name} resource_type={resource_type.resource_name}: "
0053                                 tmpLog.debug(msgLabel + "start")
0054                                 # get the list of tasks to check
0055                                 tmpList = self.taskBufferIF.getTasksToCheckAssignment_JEDI(vo, prodSourceLabel, workQueue, resource_type.resource_name)
0056                                 if tmpList is None:
0057                                     # failed
0058                                     tmpLog.error(msgLabel + "failed to get the list of tasks to check")
0059                                 else:
0060                                     tmpLog.debug(msgLabel + f"got tasks_to_check={len(tmpList)}")
0061                                     # put to a locked list
0062                                     taskList = ListWithLock(tmpList)
0063                                     # make thread pool
0064                                     threadPool = ThreadPool()
0065                                     # make workers
0066                                     nWorker = jedi_config.taskbroker.nWorkers
0067                                     for iWorker in range(nWorker):
0068                                         thr = TaskCheckerThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self, vo, prodSourceLabel)
0069                                         thr.start()
0070                                     # join
0071                                     threadPool.join()
0072                                 # get the list of tasks to assign
0073                                 tmpList = self.taskBufferIF.getTasksToAssign_JEDI(vo, prodSourceLabel, workQueue, resource_type.resource_name)
0074                                 if tmpList is None:
0075                                     # failed
0076                                     tmpLog.error(msgLabel + "failed to get the list of tasks to assign")
0077                                 else:
0078                                     tmpLog.debug(msgLabel + f"got tasks_to_assign={len(tmpList)}")
0079                                     # put to a locked list
0080                                     taskList = ListWithLock(tmpList)
0081                                     # make thread pool
0082                                     threadPool = ThreadPool()
0083                                     # make workers
0084                                     nWorker = jedi_config.taskbroker.nWorkers
0085                                     for iWorker in range(nWorker):
0086                                         thr = TaskBrokerThread(
0087                                             taskList,
0088                                             threadPool,
0089                                             self.taskBufferIF,
0090                                             self.ddmIF,
0091                                             self,
0092                                             vo,
0093                                             prodSourceLabel,
0094                                             workQueue,
0095                                             resource_type.resource_name,
0096                                         )
0097                                         thr.start()
0098                                     # join
0099                                     threadPool.join()
0100                                 tmpLog.debug(msgLabel + "done")
0101             except Exception:
0102                 errtype, errvalue = sys.exc_info()[:2]
0103                 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}")
0104             tmpLog.debug("done")
0105             # sleep if needed
0106             loopCycle = jedi_config.taskbroker.loopCycle
0107             timeDelta = naive_utcnow() - startTime
0108             sleepPeriod = loopCycle - timeDelta.seconds
0109             if sleepPeriod > 0:
0110                 time.sleep(sleepPeriod)
0111             # randomize cycle
0112             self.randomSleep(max_val=loopCycle)
0113 
0114 
0115 # thread for real worker
0116 class TaskCheckerThread(WorkerThread):
0117     # constructor
0118     def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory, vo, prodSourceLabel):
0119         # initialize woker with no semaphore
0120         WorkerThread.__init__(self, None, threadPool, logger)
0121         # attributres
0122         self.taskList = taskList
0123         self.taskBufferIF = taskbufferIF
0124         self.ddmIF = ddmIF.getInterface(vo)
0125         self.implFactory = implFactory
0126         self.vo = vo
0127         self.prodSourceLabel = prodSourceLabel
0128 
0129     # main
0130     def runImpl(self):
0131         while True:
0132             try:
0133                 # get a part of list
0134                 nTasks = 100
0135                 taskList = self.taskList.get(nTasks)
0136                 totalTasks, idxTasks = self.taskList.stat()
0137                 # no more datasets
0138                 if len(taskList) == 0:
0139                     self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0140                     return
0141                 # make logger
0142                 tmpLog = MsgWrapper(self.logger)
0143                 tmpLog.info(f"start TaskCheckerThread {idxTasks}/{totalTasks} for jediTaskID={taskList}")
0144                 tmpStat = Interaction.SC_SUCCEEDED
0145                 # get TaskSpecs
0146                 taskSpecList = []
0147                 for jediTaskID in taskList:
0148                     tmpRet, taskSpec = self.taskBufferIF.getTaskWithID_JEDI(jediTaskID, False)
0149                     if tmpRet and taskSpec is not None:
0150                         taskSpecList.append(taskSpec)
0151                     else:
0152                         tmpLog.error(f"failed to get taskSpec for jediTaskID={jediTaskID}")
0153                 if taskSpecList != []:
0154                     # get impl
0155                     if tmpStat == Interaction.SC_SUCCEEDED:
0156                         tmpLog.info("getting Impl")
0157                         try:
0158                             impl = self.implFactory.getImpl(self.vo, self.prodSourceLabel)
0159                             if impl is None:
0160                                 # task brokerage is undefined
0161                                 tmpLog.error(f"task broker is undefined for vo={self.vo} sourceLabel={self.prodSourceLabel}")
0162                                 tmpStat = Interaction.SC_FAILED
0163                         except Exception:
0164                             errtype, errvalue = sys.exc_info()[:2]
0165                             tmpLog.error(f"getImpl failed with {errtype.__name__}:{errvalue}")
0166                             tmpStat = Interaction.SC_FAILED
0167                     # check
0168                     if tmpStat == Interaction.SC_SUCCEEDED:
0169                         tmpLog.info(f"check with {impl.__class__.__name__}")
0170                         try:
0171                             tmpStat, taskCloudMap = impl.doCheck(taskSpecList)
0172                         except Exception:
0173                             errtype, errvalue = sys.exc_info()[:2]
0174                             tmpLog.error(f"doCheck failed with {errtype.__name__}:{errvalue}")
0175                             tmpStat = Interaction.SC_FAILED
0176                     # update
0177                     if tmpStat != Interaction.SC_SUCCEEDED:
0178                         tmpLog.error("failed to check assignment")
0179                     else:
0180                         tmpRet = self.taskBufferIF.setCloudToTasks_JEDI(taskCloudMap)
0181                         tmpLog.info(f"done with {tmpRet} for {str(taskCloudMap)}")
0182             except Exception:
0183                 errtype, errvalue = sys.exc_info()[:2]
0184                 logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}")
0185 
0186 
0187 # thread for real worker
0188 class TaskBrokerThread(WorkerThread):
0189     # constructor
0190     def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory, vo, prodSourceLabel, workQueue, resource_name):
0191         # initialize woker with no semaphore
0192         WorkerThread.__init__(self, None, threadPool, logger)
0193         # attributres
0194         self.taskList = taskList
0195         self.taskBufferIF = taskbufferIF
0196         self.ddmIF = ddmIF.getInterface(vo)
0197         self.implFactory = implFactory
0198         self.vo = vo
0199         self.prodSourceLabel = prodSourceLabel
0200         self.workQueue = workQueue
0201         self.resource_name = resource_name
0202 
0203     # main
0204     def runImpl(self):
0205         while True:
0206             try:
0207                 # get a part of list
0208                 nTasks = 100
0209                 taskList = self.taskList.get(nTasks)
0210                 totalTasks, idxTasks = self.taskList.stat()
0211                 # no more datasets
0212                 if len(taskList) == 0:
0213                     self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0214                     return
0215                 # make logger
0216                 tmpLog = MsgWrapper(self.logger)
0217                 tmpLog.info(f"start TaskBrokerThread {idxTasks}/{totalTasks} for jediTaskID={taskList}")
0218                 tmpStat = Interaction.SC_SUCCEEDED
0219                 # get TaskSpecs
0220                 tmpListToAssign = []
0221                 for tmpTaskItem in taskList:
0222                     tmpListItem = self.taskBufferIF.getTasksToBeProcessed_JEDI(
0223                         None, None, None, None, None, simTasks=[tmpTaskItem], readMinFiles=True, fullSimulation=True
0224                     )
0225                     if tmpListItem is None:
0226                         # failed
0227                         tmpLog.error(f"failed to get the input chunks for jediTaskID={tmpTaskItem}")
0228                         tmpStat = Interaction.SC_FAILED
0229                         break
0230                     tmpListToAssign += tmpListItem
0231                 # get impl
0232                 if tmpStat == Interaction.SC_SUCCEEDED:
0233                     tmpLog.info("getting Impl")
0234                     try:
0235                         impl = self.implFactory.getImpl(self.vo, self.prodSourceLabel)
0236                         if impl is None:
0237                             # task refiner is undefined
0238                             tmpLog.error(f"task broker is undefined for vo={self.vo} sourceLabel={self.prodSourceLabel}")
0239                             tmpStat = Interaction.SC_FAILED
0240                     except Exception:
0241                         errtype, errvalue = sys.exc_info()[:2]
0242                         tmpLog.error(f"getImpl failed with {errtype.__name__}:{errvalue}")
0243                         tmpStat = Interaction.SC_FAILED
0244                 # brokerage
0245                 if tmpStat == Interaction.SC_SUCCEEDED:
0246                     tmpLog.info(f"brokerage with {impl.__class__.__name__} for {len(tmpListToAssign)} tasks ")
0247                     try:
0248                         tmpStat = impl.doBrokerage(tmpListToAssign, self.vo, self.prodSourceLabel, self.workQueue, self.resource_name)
0249                     except Exception:
0250                         errtype, errvalue = sys.exc_info()[:2]
0251                         tmpLog.error(f"doBrokerage failed with {errtype.__name__}:{errvalue}")
0252                         tmpStat = Interaction.SC_FAILED
0253                 # register
0254                 if tmpStat != Interaction.SC_SUCCEEDED:
0255                     tmpLog.error("failed")
0256                 else:
0257                     tmpLog.info("done")
0258             except Exception:
0259                 errtype, errvalue = sys.exc_info()[:2]
0260                 logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}")
0261 
0262 
0263 # launch
0264 
0265 
0266 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0267     p = TaskBroker(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0268     p.start()