Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 import datetime
0002 
0003 from pandaharvester.harvesterbody.agent_base import AgentBase
0004 from pandaharvester.harvesterconfig import harvester_config
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0007 from pandaharvester.harvestercore.file_spec import FileSpec
0008 from pandaharvester.harvestercore.job_spec import JobSpec
0009 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0010 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0011 
0012 # logger
0013 _logger = core_utils.setup_logger("preparator")
0014 
0015 
0016 # class to prepare jobs
0017 class Preparator(AgentBase):
0018     # constructor
0019     def __init__(self, communicator, queue_config_mapper, single_mode=False):
0020         AgentBase.__init__(self, single_mode)
0021         self.dbProxy = DBProxy()
0022         self.communicator = communicator
0023         self.queueConfigMapper = queue_config_mapper
0024         self.pluginFactory = PluginFactory()
0025 
0026     # main loop
0027 
0028     def run(self):
0029         lockedBy = f"preparator-{self.get_pid()}"
0030         while True:
0031             sw = core_utils.get_stopwatch()
0032             mainLog = self.make_logger(_logger, f"id={lockedBy}", method_name="run")
0033             mainLog.debug("try to get jobs to check")
0034             # get jobs to check preparation
0035             try:
0036                 maxFilesPerJob = harvester_config.preparator.maxFilesPerJobToCheck
0037                 if maxFilesPerJob <= 0:
0038                     maxFilesPerJob = None
0039             except Exception:
0040                 maxFilesPerJob = None
0041             jobsToCheck = self.dbProxy.get_jobs_in_sub_status(
0042                 "preparing",
0043                 harvester_config.preparator.maxJobsToCheck,
0044                 "preparatorTime",
0045                 "lockedBy",
0046                 harvester_config.preparator.checkInterval,
0047                 harvester_config.preparator.lockInterval,
0048                 lockedBy,
0049                 max_files_per_job=maxFilesPerJob,
0050                 ng_file_status_list=["ready"],
0051             )
0052             mainLog.debug(f"got {len(jobsToCheck)} jobs to check")
0053             # loop over all jobs
0054             for jobSpec in jobsToCheck:
0055                 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0056                 try:
0057                     tmpLog.debug("start checking")
0058                     # configID
0059                     configID = jobSpec.configID
0060                     if not core_utils.dynamic_plugin_change():
0061                         configID = None
0062                     # get queue
0063                     if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0064                         tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0065                         continue
0066                     queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, jobSpec.configID)
0067                     oldSubStatus = jobSpec.subStatus
0068                     # get plugin
0069                     if jobSpec.auxInput in [None, JobSpec.AUX_allTriggered]:
0070                         preparatorCore = self.pluginFactory.get_plugin(queueConfig.preparator)
0071                     else:
0072                         preparatorCore = self.pluginFactory.get_plugin(queueConfig.aux_preparator)
0073                     if preparatorCore is None:
0074                         # not found
0075                         tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0076                         continue
0077                     tmpLog.debug(f"plugin={preparatorCore.__class__.__name__}")
0078                     # lock job again
0079                     lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "preparatorTime", "lockedBy", lockedBy)
0080                     if not lockedAgain:
0081                         tmpLog.debug("skip since locked by another thread")
0082                         continue
0083                     tmpStat, tmpStr = preparatorCore.check_stage_in_status(jobSpec)
0084                     # still running
0085                     if tmpStat is None:
0086                         # update job
0087                         jobSpec.lockedBy = None
0088                         self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0089                         tmpLog.debug(f"try to check later since still preparing with {tmpStr}")
0090                         continue
0091                     # succeeded
0092                     if tmpStat is True:
0093                         # resolve path
0094                         tmpStat, tmpStr = preparatorCore.resolve_input_paths(jobSpec)
0095                         if tmpStat is False:
0096                             jobSpec.lockedBy = None
0097                             self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0098                             tmpLog.error(f"failed to resolve input file paths : {tmpStr}")
0099                             continue
0100                         # manipulate container-related job params
0101                         jobSpec.manipulate_job_params_for_container()
0102                         # update job
0103                         jobSpec.lockedBy = None
0104                         jobSpec.set_all_input_ready()
0105                         if (maxFilesPerJob is None and jobSpec.auxInput is None) or (
0106                             len(jobSpec.inFiles) == 0 and jobSpec.auxInput in [None, JobSpec.AUX_inReady]
0107                         ):
0108                             # all done
0109                             allDone = True
0110                             jobSpec.subStatus = "prepared"
0111                             jobSpec.preparatorTime = None
0112                             if jobSpec.auxInput is not None:
0113                                 jobSpec.auxInput = JobSpec.AUX_allReady
0114                         else:
0115                             # immediate next lookup since there could be more files to check
0116                             allDone = False
0117                             jobSpec.trigger_preparation()
0118                             # change auxInput flag to check auxiliary inputs
0119                             if len(jobSpec.inFiles) == 0 and jobSpec.auxInput == JobSpec.AUX_allTriggered:
0120                                 jobSpec.auxInput = JobSpec.AUX_inReady
0121                         self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus}, update_in_file=True)
0122                         if allDone:
0123                             tmpLog.debug("succeeded")
0124                         else:
0125                             tmpLog.debug("partially succeeded")
0126                     else:
0127                         # update job
0128                         jobSpec.status = "failed"
0129                         jobSpec.subStatus = "failed_to_prepare"
0130                         jobSpec.lockedBy = None
0131                         jobSpec.preparatorTime = None
0132                         jobSpec.stateChangeTime = core_utils.naive_utcnow()
0133                         errStr = f"stage-in failed with {tmpStr}"
0134                         jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr)
0135                         jobSpec.trigger_propagation()
0136                         self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0137                         tmpLog.error(f"failed with {tmpStr}")
0138                 except Exception:
0139                     core_utils.dump_error_message(tmpLog)
0140             # get jobs to trigger preparation
0141             mainLog.debug("try to get jobs to prepare")
0142             try:
0143                 maxFilesPerJob = harvester_config.preparator.maxFilesPerJobToPrepare
0144                 if maxFilesPerJob <= 0:
0145                     maxFilesPerJob = None
0146             except Exception:
0147                 maxFilesPerJob = None
0148             jobsToTrigger = self.dbProxy.get_jobs_in_sub_status(
0149                 "fetched",
0150                 harvester_config.preparator.maxJobsToTrigger,
0151                 "preparatorTime",
0152                 "lockedBy",
0153                 harvester_config.preparator.triggerInterval,
0154                 harvester_config.preparator.lockInterval,
0155                 lockedBy,
0156                 "preparing",
0157                 max_files_per_job=maxFilesPerJob,
0158                 ng_file_status_list=["triggered", "ready"],
0159             )
0160             mainLog.debug(f"got {len(jobsToTrigger)} jobs to prepare")
0161             # loop over all jobs
0162             fileStatMap = dict()
0163             for jobSpec in jobsToTrigger:
0164                 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0165                 try:
0166                     tmpLog.debug("try to trigger preparation")
0167                     # configID
0168                     configID = jobSpec.configID
0169                     if not core_utils.dynamic_plugin_change():
0170                         configID = None
0171                     # get queue
0172                     if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0173                         tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0174                         continue
0175                     queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0176                     oldSubStatus = jobSpec.subStatus
0177                     # get plugin
0178                     if jobSpec.auxInput in [None, JobSpec.AUX_hasAuxInput]:
0179                         preparatorCore = self.pluginFactory.get_plugin(queueConfig.preparator)
0180                         fileType = "input"
0181                     else:
0182                         preparatorCore = self.pluginFactory.get_plugin(queueConfig.aux_preparator)
0183                         fileType = FileSpec.AUX_INPUT
0184                     if preparatorCore is None:
0185                         # not found
0186                         tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0187                         continue
0188                     tmpLog.debug(f"plugin={preparatorCore.__class__.__name__}")
0189                     # lock job again
0190                     lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "preparatorTime", "lockedBy", lockedBy)
0191                     if not lockedAgain:
0192                         tmpLog.debug("skip since locked by another thread")
0193                         continue
0194                     # check file status
0195                     if queueConfig.ddmEndpointIn not in fileStatMap:
0196                         fileStatMap[queueConfig.ddmEndpointIn] = dict()
0197                     # check if has to_prepare
0198                     hasToPrepare = False
0199                     for fileSpec in jobSpec.inFiles:
0200                         if fileSpec.status == "to_prepare":
0201                             hasToPrepare = True
0202                             break
0203                     newFileStatusData = []
0204                     toWait = False
0205                     newInFiles = []
0206                     for fileSpec in jobSpec.inFiles:
0207                         if fileSpec.status in ["preparing", "to_prepare"]:
0208                             newInFiles.append(fileSpec)
0209                             updateStatus = False
0210                             if fileSpec.lfn not in fileStatMap[queueConfig.ddmEndpointIn]:
0211                                 fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn] = self.dbProxy.get_file_status(
0212                                     fileSpec.lfn, fileType, queueConfig.ddmEndpointIn, "starting"
0213                                 )
0214                             if "ready" in fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn]:
0215                                 # the file is ready
0216                                 fileSpec.status = "ready"
0217                                 if fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn]["ready"]["path"]:
0218                                     fileSpec.path = list(fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn]["ready"]["path"])[0]
0219                                 # set group info if any
0220                                 groupInfo = self.dbProxy.get_group_for_file(fileSpec.lfn, fileType, queueConfig.ddmEndpointIn)
0221                                 if groupInfo is not None:
0222                                     fileSpec.groupID = groupInfo["groupID"]
0223                                     fileSpec.groupStatus = groupInfo["groupStatus"]
0224                                     fileSpec.groupUpdateTime = groupInfo["groupUpdateTime"]
0225                                 updateStatus = True
0226                             elif (not hasToPrepare and "to_prepare" in fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn]) or "triggered" in fileStatMap[
0227                                 queueConfig.ddmEndpointIn
0228                             ][fileSpec.lfn]:
0229                                 # the file is being prepared by another
0230                                 toWait = True
0231                                 if fileSpec.status != "preparing":
0232                                     fileSpec.status = "preparing"
0233                                     updateStatus = True
0234                             else:
0235                                 # change file status if the file is not prepared by another
0236                                 if fileSpec.status != "to_prepare":
0237                                     fileSpec.status = "to_prepare"
0238                                     updateStatus = True
0239                             # set new status
0240                             if updateStatus:
0241                                 newFileStatusData.append((fileSpec.fileID, fileSpec.lfn, fileSpec.status))
0242                                 fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn].setdefault(fileSpec.status, None)
0243                     if len(newFileStatusData) > 0:
0244                         self.dbProxy.change_file_status(jobSpec.PandaID, newFileStatusData, lockedBy)
0245                     # wait since files are being prepared by another
0246                     if toWait:
0247                         # update job
0248                         jobSpec.lockedBy = None
0249                         self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0250                         tmpLog.debug("wait since files are being prepared by another job")
0251                         continue
0252                     # trigger preparation
0253                     tmpStat, tmpStr = preparatorCore.trigger_preparation(jobSpec)
0254                     # check result
0255                     if tmpStat is True:
0256                         # succeeded
0257                         jobSpec.lockedBy = None
0258                         if (maxFilesPerJob is None and jobSpec.auxInput is None) or (
0259                             len(jobSpec.inFiles) == 0 and jobSpec.auxInput in [None, JobSpec.AUX_inTriggered]
0260                         ):
0261                             # all done
0262                             allDone = True
0263                             jobSpec.subStatus = "preparing"
0264                             jobSpec.preparatorTime = None
0265                             if jobSpec.auxInput is not None:
0266                                 jobSpec.auxInput = JobSpec.AUX_allTriggered
0267                         else:
0268                             # change file status but not change job sub status since
0269                             # there could be more files to prepare
0270                             allDone = False
0271                             for fileSpec in jobSpec.inFiles:
0272                                 if fileSpec.status == "to_prepare":
0273                                     fileSpec.status = "triggered"
0274                             # immediate next lookup
0275                             jobSpec.trigger_preparation()
0276                             # change auxInput flag to prepare auxiliary inputs
0277                             if len(jobSpec.inFiles) == 0 and jobSpec.auxInput == JobSpec.AUX_hasAuxInput:
0278                                 jobSpec.auxInput = JobSpec.AUX_inTriggered
0279                         self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus}, update_in_file=True)
0280                         if allDone:
0281                             tmpLog.debug("triggered")
0282                         else:
0283                             tmpLog.debug("partially triggered")
0284                     elif tmpStat is False:
0285                         # fatal error
0286                         jobSpec.status = "failed"
0287                         jobSpec.subStatus = "failed_to_prepare"
0288                         jobSpec.lockedBy = None
0289                         jobSpec.preparatorTime = None
0290                         jobSpec.stateChangeTime = core_utils.naive_utcnow()
0291                         errStr = f"stage-in failed with {tmpStr}"
0292                         jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr)
0293                         jobSpec.trigger_propagation()
0294                         self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0295                         tmpLog.debug(f"failed to trigger with {tmpStr}")
0296                     else:
0297                         # temporary error
0298                         jobSpec.lockedBy = None
0299                         self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0300                         tmpLog.debug(f"try to prepare later since {tmpStr}")
0301                 except Exception:
0302                     core_utils.dump_error_message(tmpLog)
0303             mainLog.debug("done" + sw.get_elapsed_time())
0304             # check if being terminated
0305             if self.terminated(harvester_config.preparator.sleepTime):
0306                 mainLog.debug("terminated")
0307                 return