Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import datetime
0002 import os
0003 import socket
0004 import sys
0005 import time
0006 
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009 
0010 from pandajedi.jediconfig import jedi_config
0011 from pandajedi.jedicore import Interaction
0012 from pandajedi.jedicore.FactoryBase import FactoryBase
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0015 
0016 from .JediKnight import JediKnight
0017 
0018 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0019 
0020 
0021 # worker class to do post-processing
0022 class PostProcessor(JediKnight, FactoryBase):
0023     # constructor
0024     def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0025         self.vos = self.parseInit(vos)
0026         self.prodSourceLabels = self.parseInit(prodSourceLabels)
0027         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-post"
0028         JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0029         FactoryBase.__init__(self, self.vos, self.prodSourceLabels, logger, jedi_config.postprocessor.modConfig)
0030 
0031     # main
0032     def start(self):
0033         # start base classes
0034         JediKnight.start(self)
0035         FactoryBase.initializeMods(self, self.taskBufferIF, self.ddmIF)
0036         # go into main loop
0037         while True:
0038             startTime = naive_utcnow()
0039             try:
0040                 # get logger
0041                 tmpLog = MsgWrapper(logger)
0042                 tmpLog.info("start")
0043                 # loop over all vos
0044                 for vo in self.vos:
0045                     # loop over all sourceLabels
0046                     for prodSourceLabel in self.prodSourceLabels:
0047                         # prepare tasks to be finished
0048                         tmpLog.info(f"preparing tasks to be finished for vo={vo} label={prodSourceLabel}")
0049                         tmp_ret_list = self.taskBufferIF.prepareTasksToBeFinished_JEDI(vo, prodSourceLabel, jedi_config.postprocessor.nTasks, pid=self.pid)
0050                         if tmp_ret_list is None:
0051                             # failed
0052                             tmpLog.error("failed to prepare tasks")
0053                         # get tasks to be finished
0054                         tmpLog.info("getting tasks to be finished")
0055                         tmpList = self.taskBufferIF.getTasksToBeFinished_JEDI(
0056                             vo, prodSourceLabel, self.pid, jedi_config.postprocessor.nTasks, target_tasks=tmp_ret_list
0057                         )
0058                         if tmpList is None:
0059                             # failed
0060                             tmpLog.error("failed to get tasks to be finished")
0061                         else:
0062                             tmpLog.info(f"got {len(tmpList)} tasks")
0063                             # put to a locked list
0064                             taskList = ListWithLock(tmpList)
0065                             # make thread pool
0066                             threadPool = ThreadPool()
0067                             # make workers
0068                             nWorker = jedi_config.postprocessor.nWorkers
0069                             for iWorker in range(nWorker):
0070                                 thr = PostProcessorThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self)
0071                                 thr.start()
0072                             # join
0073                             threadPool.join()
0074                 tmpLog.info("done")
0075             except Exception:
0076                 errtype, errvalue = sys.exc_info()[:2]
0077                 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}")
0078             # sleep if needed
0079             loopCycle = 60
0080             timeDelta = naive_utcnow() - startTime
0081             sleepPeriod = loopCycle - timeDelta.seconds
0082             if sleepPeriod > 0:
0083                 time.sleep(sleepPeriod)
0084 
0085 
0086 # thread for real worker
0087 class PostProcessorThread(WorkerThread):
0088     # constructor
0089     def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory):
0090         # initialize woker with no semaphore
0091         WorkerThread.__init__(self, None, threadPool, logger)
0092         # attributres
0093         self.taskList = taskList
0094         self.taskBufferIF = taskbufferIF
0095         self.ddmIF = ddmIF
0096         self.implFactory = implFactory
0097 
0098     # post process tasks
0099     def post_process_tasks(self, task_list):
0100         for taskSpec in task_list:
0101             # make logger
0102             tmpLog = MsgWrapper(self.logger, f"<jediTaskID={taskSpec.jediTaskID}>")
0103             tmpLog.info("start")
0104             tmpStat = Interaction.SC_SUCCEEDED
0105             # get impl
0106             impl = self.implFactory.instantiateImpl(taskSpec.vo, taskSpec.prodSourceLabel, None, self.taskBufferIF, self.ddmIF)
0107             if impl is None:
0108                 # post processor is undefined
0109                 tmpLog.error(f"post-processor is undefined for vo={taskSpec.vo} sourceLabel={taskSpec.prodSourceLabel}")
0110                 tmpStat = Interaction.SC_FATAL
0111             # execute
0112             if tmpStat == Interaction.SC_SUCCEEDED:
0113                 tmpLog.info(f"post-process with {impl.__class__.__name__}")
0114                 try:
0115                     tmpStat = impl.doPostProcess(taskSpec, tmpLog)
0116                 except Exception as e:
0117                     tmpLog.error(f"post-process failed with {str(e)}")
0118                     tmpStat = Interaction.SC_FATAL
0119             # done
0120             if tmpStat == Interaction.SC_FATAL or (tmpStat == Interaction.SC_FAILED and taskSpec.status in ["toabort", "tobroken"]):
0121                 # task is broken
0122                 tmpErrStr = "post-process permanently failed"
0123                 tmpLog.error(tmpErrStr)
0124                 taskSpec.status = "broken"
0125                 taskSpec.setErrDiag(tmpErrStr)
0126                 taskSpec.lockedBy = None
0127                 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID})
0128             elif tmpStat == Interaction.SC_FAILED:
0129                 tmpErrStr = "post-processing temporarily failed"
0130                 taskSpec.setErrDiag(tmpErrStr, True)
0131                 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID})
0132                 tmpLog.info(f"set task_status={taskSpec.status} since {taskSpec.errorDialog}")
0133                 tmpLog.info("done")
0134                 continue
0135             # final procedure
0136             try:
0137                 impl.doFinalProcedure(taskSpec, tmpLog)
0138             except Exception as e:
0139                 tmpLog.error(f"final procedure failed with {str(e)}")
0140             # done
0141             tmpLog.info("done")
0142 
0143     # main
0144     def runImpl(self):
0145         while True:
0146             try:
0147                 # get a part of list
0148                 nTasks = 10
0149                 taskList = self.taskList.get(nTasks)
0150                 # no more datasets
0151                 if len(taskList) == 0:
0152                     self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0153                     return
0154                 # post process tasks
0155                 self.post_process_tasks(taskList)
0156             except Exception:
0157                 errtype, errvalue = sys.exc_info()[:2]
0158                 logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}")
0159 
0160 
0161 # launch
0162 
0163 
0164 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0165     p = PostProcessor(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0166     p.start()