File indexing completed on 2026-04-20 07:58:57
0001 import datetime
0002
0003 from pandaharvester.harvesterbody.agent_base import AgentBase
0004 from pandaharvester.harvesterconfig import harvester_config
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0007 from pandaharvester.harvestercore.file_spec import FileSpec
0008 from pandaharvester.harvestercore.job_spec import JobSpec
0009 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0010 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0011
0012
0013 _logger = core_utils.setup_logger("preparator")
0014
0015
0016
0017 class Preparator(AgentBase):
0018
0019 def __init__(self, communicator, queue_config_mapper, single_mode=False):
0020 AgentBase.__init__(self, single_mode)
0021 self.dbProxy = DBProxy()
0022 self.communicator = communicator
0023 self.queueConfigMapper = queue_config_mapper
0024 self.pluginFactory = PluginFactory()
0025
0026
0027
0028 def run(self):
0029 lockedBy = f"preparator-{self.get_pid()}"
0030 while True:
0031 sw = core_utils.get_stopwatch()
0032 mainLog = self.make_logger(_logger, f"id={lockedBy}", method_name="run")
0033 mainLog.debug("try to get jobs to check")
0034
0035 try:
0036 maxFilesPerJob = harvester_config.preparator.maxFilesPerJobToCheck
0037 if maxFilesPerJob <= 0:
0038 maxFilesPerJob = None
0039 except Exception:
0040 maxFilesPerJob = None
0041 jobsToCheck = self.dbProxy.get_jobs_in_sub_status(
0042 "preparing",
0043 harvester_config.preparator.maxJobsToCheck,
0044 "preparatorTime",
0045 "lockedBy",
0046 harvester_config.preparator.checkInterval,
0047 harvester_config.preparator.lockInterval,
0048 lockedBy,
0049 max_files_per_job=maxFilesPerJob,
0050 ng_file_status_list=["ready"],
0051 )
0052 mainLog.debug(f"got {len(jobsToCheck)} jobs to check")
0053
0054 for jobSpec in jobsToCheck:
0055 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0056 try:
0057 tmpLog.debug("start checking")
0058
0059 configID = jobSpec.configID
0060 if not core_utils.dynamic_plugin_change():
0061 configID = None
0062
0063 if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0064 tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0065 continue
0066 queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, jobSpec.configID)
0067 oldSubStatus = jobSpec.subStatus
0068
0069 if jobSpec.auxInput in [None, JobSpec.AUX_allTriggered]:
0070 preparatorCore = self.pluginFactory.get_plugin(queueConfig.preparator)
0071 else:
0072 preparatorCore = self.pluginFactory.get_plugin(queueConfig.aux_preparator)
0073 if preparatorCore is None:
0074
0075 tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0076 continue
0077 tmpLog.debug(f"plugin={preparatorCore.__class__.__name__}")
0078
0079 lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "preparatorTime", "lockedBy", lockedBy)
0080 if not lockedAgain:
0081 tmpLog.debug("skip since locked by another thread")
0082 continue
0083 tmpStat, tmpStr = preparatorCore.check_stage_in_status(jobSpec)
0084
0085 if tmpStat is None:
0086
0087 jobSpec.lockedBy = None
0088 self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0089 tmpLog.debug(f"try to check later since still preparing with {tmpStr}")
0090 continue
0091
0092 if tmpStat is True:
0093
0094 tmpStat, tmpStr = preparatorCore.resolve_input_paths(jobSpec)
0095 if tmpStat is False:
0096 jobSpec.lockedBy = None
0097 self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0098 tmpLog.error(f"failed to resolve input file paths : {tmpStr}")
0099 continue
0100
0101 jobSpec.manipulate_job_params_for_container()
0102
0103 jobSpec.lockedBy = None
0104 jobSpec.set_all_input_ready()
0105 if (maxFilesPerJob is None and jobSpec.auxInput is None) or (
0106 len(jobSpec.inFiles) == 0 and jobSpec.auxInput in [None, JobSpec.AUX_inReady]
0107 ):
0108
0109 allDone = True
0110 jobSpec.subStatus = "prepared"
0111 jobSpec.preparatorTime = None
0112 if jobSpec.auxInput is not None:
0113 jobSpec.auxInput = JobSpec.AUX_allReady
0114 else:
0115
0116 allDone = False
0117 jobSpec.trigger_preparation()
0118
0119 if len(jobSpec.inFiles) == 0 and jobSpec.auxInput == JobSpec.AUX_allTriggered:
0120 jobSpec.auxInput = JobSpec.AUX_inReady
0121 self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus}, update_in_file=True)
0122 if allDone:
0123 tmpLog.debug("succeeded")
0124 else:
0125 tmpLog.debug("partially succeeded")
0126 else:
0127
0128 jobSpec.status = "failed"
0129 jobSpec.subStatus = "failed_to_prepare"
0130 jobSpec.lockedBy = None
0131 jobSpec.preparatorTime = None
0132 jobSpec.stateChangeTime = core_utils.naive_utcnow()
0133 errStr = f"stage-in failed with {tmpStr}"
0134 jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr)
0135 jobSpec.trigger_propagation()
0136 self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0137 tmpLog.error(f"failed with {tmpStr}")
0138 except Exception:
0139 core_utils.dump_error_message(tmpLog)
0140
0141 mainLog.debug("try to get jobs to prepare")
0142 try:
0143 maxFilesPerJob = harvester_config.preparator.maxFilesPerJobToPrepare
0144 if maxFilesPerJob <= 0:
0145 maxFilesPerJob = None
0146 except Exception:
0147 maxFilesPerJob = None
0148 jobsToTrigger = self.dbProxy.get_jobs_in_sub_status(
0149 "fetched",
0150 harvester_config.preparator.maxJobsToTrigger,
0151 "preparatorTime",
0152 "lockedBy",
0153 harvester_config.preparator.triggerInterval,
0154 harvester_config.preparator.lockInterval,
0155 lockedBy,
0156 "preparing",
0157 max_files_per_job=maxFilesPerJob,
0158 ng_file_status_list=["triggered", "ready"],
0159 )
0160 mainLog.debug(f"got {len(jobsToTrigger)} jobs to prepare")
0161
0162 fileStatMap = dict()
0163 for jobSpec in jobsToTrigger:
0164 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0165 try:
0166 tmpLog.debug("try to trigger preparation")
0167
0168 configID = jobSpec.configID
0169 if not core_utils.dynamic_plugin_change():
0170 configID = None
0171
0172 if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0173 tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0174 continue
0175 queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0176 oldSubStatus = jobSpec.subStatus
0177
0178 if jobSpec.auxInput in [None, JobSpec.AUX_hasAuxInput]:
0179 preparatorCore = self.pluginFactory.get_plugin(queueConfig.preparator)
0180 fileType = "input"
0181 else:
0182 preparatorCore = self.pluginFactory.get_plugin(queueConfig.aux_preparator)
0183 fileType = FileSpec.AUX_INPUT
0184 if preparatorCore is None:
0185
0186 tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0187 continue
0188 tmpLog.debug(f"plugin={preparatorCore.__class__.__name__}")
0189
0190 lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "preparatorTime", "lockedBy", lockedBy)
0191 if not lockedAgain:
0192 tmpLog.debug("skip since locked by another thread")
0193 continue
0194
0195 if queueConfig.ddmEndpointIn not in fileStatMap:
0196 fileStatMap[queueConfig.ddmEndpointIn] = dict()
0197
0198 hasToPrepare = False
0199 for fileSpec in jobSpec.inFiles:
0200 if fileSpec.status == "to_prepare":
0201 hasToPrepare = True
0202 break
0203 newFileStatusData = []
0204 toWait = False
0205 newInFiles = []
0206 for fileSpec in jobSpec.inFiles:
0207 if fileSpec.status in ["preparing", "to_prepare"]:
0208 newInFiles.append(fileSpec)
0209 updateStatus = False
0210 if fileSpec.lfn not in fileStatMap[queueConfig.ddmEndpointIn]:
0211 fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn] = self.dbProxy.get_file_status(
0212 fileSpec.lfn, fileType, queueConfig.ddmEndpointIn, "starting"
0213 )
0214 if "ready" in fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn]:
0215
0216 fileSpec.status = "ready"
0217 if fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn]["ready"]["path"]:
0218 fileSpec.path = list(fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn]["ready"]["path"])[0]
0219
0220 groupInfo = self.dbProxy.get_group_for_file(fileSpec.lfn, fileType, queueConfig.ddmEndpointIn)
0221 if groupInfo is not None:
0222 fileSpec.groupID = groupInfo["groupID"]
0223 fileSpec.groupStatus = groupInfo["groupStatus"]
0224 fileSpec.groupUpdateTime = groupInfo["groupUpdateTime"]
0225 updateStatus = True
0226 elif (not hasToPrepare and "to_prepare" in fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn]) or "triggered" in fileStatMap[
0227 queueConfig.ddmEndpointIn
0228 ][fileSpec.lfn]:
0229
0230 toWait = True
0231 if fileSpec.status != "preparing":
0232 fileSpec.status = "preparing"
0233 updateStatus = True
0234 else:
0235
0236 if fileSpec.status != "to_prepare":
0237 fileSpec.status = "to_prepare"
0238 updateStatus = True
0239
0240 if updateStatus:
0241 newFileStatusData.append((fileSpec.fileID, fileSpec.lfn, fileSpec.status))
0242 fileStatMap[queueConfig.ddmEndpointIn][fileSpec.lfn].setdefault(fileSpec.status, None)
0243 if len(newFileStatusData) > 0:
0244 self.dbProxy.change_file_status(jobSpec.PandaID, newFileStatusData, lockedBy)
0245
0246 if toWait:
0247
0248 jobSpec.lockedBy = None
0249 self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0250 tmpLog.debug("wait since files are being prepared by another job")
0251 continue
0252
0253 tmpStat, tmpStr = preparatorCore.trigger_preparation(jobSpec)
0254
0255 if tmpStat is True:
0256
0257 jobSpec.lockedBy = None
0258 if (maxFilesPerJob is None and jobSpec.auxInput is None) or (
0259 len(jobSpec.inFiles) == 0 and jobSpec.auxInput in [None, JobSpec.AUX_inTriggered]
0260 ):
0261
0262 allDone = True
0263 jobSpec.subStatus = "preparing"
0264 jobSpec.preparatorTime = None
0265 if jobSpec.auxInput is not None:
0266 jobSpec.auxInput = JobSpec.AUX_allTriggered
0267 else:
0268
0269
0270 allDone = False
0271 for fileSpec in jobSpec.inFiles:
0272 if fileSpec.status == "to_prepare":
0273 fileSpec.status = "triggered"
0274
0275 jobSpec.trigger_preparation()
0276
0277 if len(jobSpec.inFiles) == 0 and jobSpec.auxInput == JobSpec.AUX_hasAuxInput:
0278 jobSpec.auxInput = JobSpec.AUX_inTriggered
0279 self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus}, update_in_file=True)
0280 if allDone:
0281 tmpLog.debug("triggered")
0282 else:
0283 tmpLog.debug("partially triggered")
0284 elif tmpStat is False:
0285
0286 jobSpec.status = "failed"
0287 jobSpec.subStatus = "failed_to_prepare"
0288 jobSpec.lockedBy = None
0289 jobSpec.preparatorTime = None
0290 jobSpec.stateChangeTime = core_utils.naive_utcnow()
0291 errStr = f"stage-in failed with {tmpStr}"
0292 jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr)
0293 jobSpec.trigger_propagation()
0294 self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0295 tmpLog.debug(f"failed to trigger with {tmpStr}")
0296 else:
0297
0298 jobSpec.lockedBy = None
0299 self.dbProxy.update_job(jobSpec, {"lockedBy": lockedBy, "subStatus": oldSubStatus})
0300 tmpLog.debug(f"try to prepare later since {tmpStr}")
0301 except Exception:
0302 core_utils.dump_error_message(tmpLog)
0303 mainLog.debug("done" + sw.get_elapsed_time())
0304
0305 if self.terminated(harvester_config.preparator.sleepTime):
0306 mainLog.debug("terminated")
0307 return