File indexing completed on 2026-04-10 08:38:59
0001 import datetime
0002 import os
0003 import socket
0004 import sys
0005 import time
0006
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009
0010 from pandajedi.jediconfig import jedi_config
0011 from pandajedi.jedicore import Interaction
0012 from pandajedi.jedicore.FactoryBase import FactoryBase
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0015
0016 from .JediKnight import JediKnight
0017
0018 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0019
0020
0021
0022 class PostProcessor(JediKnight, FactoryBase):
0023
0024 def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels):
0025 self.vos = self.parseInit(vos)
0026 self.prodSourceLabels = self.parseInit(prodSourceLabels)
0027 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-post"
0028 JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger)
0029 FactoryBase.__init__(self, self.vos, self.prodSourceLabels, logger, jedi_config.postprocessor.modConfig)
0030
0031
0032 def start(self):
0033
0034 JediKnight.start(self)
0035 FactoryBase.initializeMods(self, self.taskBufferIF, self.ddmIF)
0036
0037 while True:
0038 startTime = naive_utcnow()
0039 try:
0040
0041 tmpLog = MsgWrapper(logger)
0042 tmpLog.info("start")
0043
0044 for vo in self.vos:
0045
0046 for prodSourceLabel in self.prodSourceLabels:
0047
0048 tmpLog.info(f"preparing tasks to be finished for vo={vo} label={prodSourceLabel}")
0049 tmp_ret_list = self.taskBufferIF.prepareTasksToBeFinished_JEDI(vo, prodSourceLabel, jedi_config.postprocessor.nTasks, pid=self.pid)
0050 if tmp_ret_list is None:
0051
0052 tmpLog.error("failed to prepare tasks")
0053
0054 tmpLog.info("getting tasks to be finished")
0055 tmpList = self.taskBufferIF.getTasksToBeFinished_JEDI(
0056 vo, prodSourceLabel, self.pid, jedi_config.postprocessor.nTasks, target_tasks=tmp_ret_list
0057 )
0058 if tmpList is None:
0059
0060 tmpLog.error("failed to get tasks to be finished")
0061 else:
0062 tmpLog.info(f"got {len(tmpList)} tasks")
0063
0064 taskList = ListWithLock(tmpList)
0065
0066 threadPool = ThreadPool()
0067
0068 nWorker = jedi_config.postprocessor.nWorkers
0069 for iWorker in range(nWorker):
0070 thr = PostProcessorThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self)
0071 thr.start()
0072
0073 threadPool.join()
0074 tmpLog.info("done")
0075 except Exception:
0076 errtype, errvalue = sys.exc_info()[:2]
0077 tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}")
0078
0079 loopCycle = 60
0080 timeDelta = naive_utcnow() - startTime
0081 sleepPeriod = loopCycle - timeDelta.seconds
0082 if sleepPeriod > 0:
0083 time.sleep(sleepPeriod)
0084
0085
0086
0087 class PostProcessorThread(WorkerThread):
0088
0089 def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory):
0090
0091 WorkerThread.__init__(self, None, threadPool, logger)
0092
0093 self.taskList = taskList
0094 self.taskBufferIF = taskbufferIF
0095 self.ddmIF = ddmIF
0096 self.implFactory = implFactory
0097
0098
0099 def post_process_tasks(self, task_list):
0100 for taskSpec in task_list:
0101
0102 tmpLog = MsgWrapper(self.logger, f"<jediTaskID={taskSpec.jediTaskID}>")
0103 tmpLog.info("start")
0104 tmpStat = Interaction.SC_SUCCEEDED
0105
0106 impl = self.implFactory.instantiateImpl(taskSpec.vo, taskSpec.prodSourceLabel, None, self.taskBufferIF, self.ddmIF)
0107 if impl is None:
0108
0109 tmpLog.error(f"post-processor is undefined for vo={taskSpec.vo} sourceLabel={taskSpec.prodSourceLabel}")
0110 tmpStat = Interaction.SC_FATAL
0111
0112 if tmpStat == Interaction.SC_SUCCEEDED:
0113 tmpLog.info(f"post-process with {impl.__class__.__name__}")
0114 try:
0115 tmpStat = impl.doPostProcess(taskSpec, tmpLog)
0116 except Exception as e:
0117 tmpLog.error(f"post-process failed with {str(e)}")
0118 tmpStat = Interaction.SC_FATAL
0119
0120 if tmpStat == Interaction.SC_FATAL or (tmpStat == Interaction.SC_FAILED and taskSpec.status in ["toabort", "tobroken"]):
0121
0122 tmpErrStr = "post-process permanently failed"
0123 tmpLog.error(tmpErrStr)
0124 taskSpec.status = "broken"
0125 taskSpec.setErrDiag(tmpErrStr)
0126 taskSpec.lockedBy = None
0127 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID})
0128 elif tmpStat == Interaction.SC_FAILED:
0129 tmpErrStr = "post-processing temporarily failed"
0130 taskSpec.setErrDiag(tmpErrStr, True)
0131 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID})
0132 tmpLog.info(f"set task_status={taskSpec.status} since {taskSpec.errorDialog}")
0133 tmpLog.info("done")
0134 continue
0135
0136 try:
0137 impl.doFinalProcedure(taskSpec, tmpLog)
0138 except Exception as e:
0139 tmpLog.error(f"final procedure failed with {str(e)}")
0140
0141 tmpLog.info("done")
0142
0143
0144 def runImpl(self):
0145 while True:
0146 try:
0147
0148 nTasks = 10
0149 taskList = self.taskList.get(nTasks)
0150
0151 if len(taskList) == 0:
0152 self.logger.debug(f"{self.__class__.__name__} terminating since no more items")
0153 return
0154
0155 self.post_process_tasks(taskList)
0156 except Exception:
0157 errtype, errvalue = sys.exc_info()[:2]
0158 logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}")
0159
0160
0161
0162
0163
0164 def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None):
0165 p = PostProcessor(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels)
0166 p.start()