File indexing completed on 2026-04-20 07:58:57
0001 from pandaharvester.harvesterbody.agent_base import AgentBase
0002 from pandaharvester.harvesterconfig import harvester_config
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0005 from pandaharvester.harvestercore.job_spec import JobSpec
0006 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0007 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0008
0009
0010 _logger = core_utils.setup_logger("stager")
0011
0012
0013
0014 class Stager(AgentBase):
0015
0016 def __init__(self, queue_config_mapper, single_mode=False):
0017 AgentBase.__init__(self, single_mode)
0018 self.dbProxy = DBProxy()
0019 self.queueConfigMapper = queue_config_mapper
0020 self.pluginFactory = PluginFactory()
0021
0022
0023 def run(self):
0024 lockedBy = f"stager-{self.get_pid()}"
0025 while True:
0026 sw = core_utils.get_stopwatch()
0027 mainLog = self.make_logger(_logger, f"id={lockedBy}", method_name="run")
0028 mainLog.debug("try to get jobs to check")
0029
0030 try:
0031 maxFilesPerJob = harvester_config.stager.maxFilesPerJobToCheck
0032 except Exception:
0033 maxFilesPerJob = None
0034 jobsToCheck = self.dbProxy.get_jobs_for_stage_out(
0035 harvester_config.stager.maxJobsToCheck,
0036 harvester_config.stager.checkInterval,
0037 harvester_config.stager.lockInterval,
0038 lockedBy,
0039 "transferring",
0040 JobSpec.HO_hasTransfer,
0041 max_files_per_job=maxFilesPerJob,
0042 )
0043 mainLog.debug(f"got {len(jobsToCheck)} jobs to check")
0044
0045 for jobSpec in jobsToCheck:
0046 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0047 try:
0048 tmpLog.debug("start checking")
0049
0050 configID = jobSpec.configID
0051 if not core_utils.dynamic_plugin_change():
0052 configID = None
0053
0054 if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0055 tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0056 continue
0057 queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0058
0059 stagerCore = self.pluginFactory.get_plugin(queueConfig.stager)
0060 if stagerCore is None:
0061
0062 tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0063 continue
0064
0065 lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "stagerTime", "stagerLock", lockedBy)
0066 if not lockedAgain:
0067 tmpLog.debug("skip since locked by another thread")
0068 continue
0069 tmpLog.debug(f"plugin={stagerCore.__class__.__name__}")
0070 tmpStat, tmpStr = stagerCore.check_stage_out_status(jobSpec)
0071
0072 if tmpStat is True:
0073
0074 newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0075 tmpLog.debug(f"succeeded new subStatus={newSubStatus}")
0076 elif tmpStat is False:
0077
0078 tmpLog.debug(f"fatal error when checking status with {tmpStr}")
0079
0080 for fileSpec in jobSpec.outFiles:
0081 if fileSpec.status != "finished":
0082 fileSpec.status = "failed"
0083 errStr = f"stage-out failed with {tmpStr}"
0084 jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
0085 jobSpec.trigger_propagation()
0086 newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0087 tmpLog.debug(f"updated new subStatus={newSubStatus}")
0088 else:
0089
0090 tmpLog.debug(f"try to check later since {tmpStr}")
0091 except Exception:
0092 core_utils.dump_error_message(tmpLog)
0093
0094 try:
0095 maxFilesPerJob = harvester_config.stager.maxFilesPerJobToTrigger
0096 except Exception:
0097 maxFilesPerJob = None
0098 jobsToTrigger = self.dbProxy.get_jobs_for_stage_out(
0099 harvester_config.stager.maxJobsToTrigger,
0100 harvester_config.stager.triggerInterval,
0101 harvester_config.stager.lockInterval,
0102 lockedBy,
0103 "to_transfer",
0104 JobSpec.HO_hasOutput,
0105 [JobSpec.HO_hasZipOutput, JobSpec.HO_hasPostZipOutput],
0106 max_files_per_job=maxFilesPerJob,
0107 )
0108 mainLog.debug(f"got {len(jobsToTrigger)} jobs to trigger")
0109
0110 for jobSpec in jobsToTrigger:
0111 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0112 try:
0113 tmpLog.debug("try to trigger stage-out")
0114
0115 configID = jobSpec.configID
0116 if not core_utils.dynamic_plugin_change():
0117 configID = None
0118
0119 if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0120 tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0121 continue
0122 queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0123
0124 stagerCore = self.pluginFactory.get_plugin(queueConfig.stager)
0125 if stagerCore is None:
0126
0127 tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0128 continue
0129
0130 lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "stagerTime", "stagerLock", lockedBy)
0131 if not lockedAgain:
0132 tmpLog.debug("skip since locked by another thread")
0133 continue
0134
0135 tmpLog.debug(f"plugin={stagerCore.__class__.__name__}")
0136 tmpStat, tmpStr = stagerCore.trigger_stage_out(jobSpec)
0137
0138 if tmpStat is True:
0139
0140 jobSpec.trigger_stage_out()
0141 jobSpec.all_files_triggered_to_stage_out()
0142 newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0143 tmpLog.debug(f"triggered new subStatus={newSubStatus}")
0144 elif tmpStat is False:
0145
0146 tmpLog.debug(f"fatal error to trigger with {tmpStr}")
0147
0148 for fileSpec in jobSpec.outFiles:
0149 if fileSpec.status != "finished":
0150 fileSpec.status = "failed"
0151 errStr = f"stage-out failed with {tmpStr}"
0152 jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
0153 jobSpec.trigger_propagation()
0154 newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0155 tmpLog.debug(f"updated new subStatus={newSubStatus}")
0156 else:
0157
0158 tmpLog.debug(f"try to trigger later since {tmpStr}")
0159 except Exception:
0160 core_utils.dump_error_message(tmpLog)
0161
0162 if hasattr(harvester_config, "zipper"):
0163 pluginConf = harvester_config.zipper
0164 else:
0165 pluginConf = harvester_config.stager
0166 try:
0167 maxFilesPerJob = pluginConf.maxFilesPerJobToZip
0168 except Exception:
0169 maxFilesPerJob = None
0170 try:
0171 zipInterval = pluginConf.zipInterval
0172 except Exception:
0173 zipInterval = pluginConf.triggerInterval
0174 try:
0175 usePostZipping = pluginConf.usePostZipping
0176 except Exception:
0177 usePostZipping = False
0178 jobsToZip = self.dbProxy.get_jobs_for_stage_out(
0179 pluginConf.maxJobsToZip,
0180 zipInterval,
0181 pluginConf.lockInterval,
0182 lockedBy,
0183 "to_transfer",
0184 JobSpec.HO_hasZipOutput,
0185 [JobSpec.HO_hasOutput, JobSpec.HO_hasPostZipOutput],
0186 max_files_per_job=maxFilesPerJob,
0187 )
0188 mainLog.debug(f"got {len(jobsToZip)} jobs to zip")
0189
0190 for jobSpec in jobsToZip:
0191 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0192 try:
0193 tmpLog.debug("try to zip output")
0194
0195 configID = jobSpec.configID
0196 if not core_utils.dynamic_plugin_change():
0197 configID = None
0198
0199 if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0200 tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0201 continue
0202 queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0203
0204 if hasattr(queueConfig, "zipper"):
0205 zipperCore = self.pluginFactory.get_plugin(queueConfig.zipper)
0206 else:
0207 zipperCore = self.pluginFactory.get_plugin(queueConfig.stager)
0208 if zipperCore is None:
0209
0210 tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0211 continue
0212
0213 lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "stagerTime", "stagerLock", lockedBy)
0214 if not lockedAgain:
0215 tmpLog.debug("skip since locked by another thread")
0216 continue
0217
0218 tmpLog.debug(f"plugin={zipperCore.__class__.__name__}")
0219 if usePostZipping:
0220 tmpStat, tmpStr = zipperCore.async_zip_output(jobSpec)
0221 else:
0222 tmpStat, tmpStr = zipperCore.zip_output(jobSpec)
0223
0224 if tmpStat is True:
0225
0226 jobSpec.trigger_stage_out()
0227 jobSpec.all_files_zipped(usePostZipping)
0228 newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, False, lockedBy)
0229 if usePostZipping:
0230 tmpLog.debug(f"async zipped new subStatus={newSubStatus}")
0231 else:
0232 tmpLog.debug(f"zipped new subStatus={newSubStatus}")
0233 elif tmpStat is None:
0234 tmpLog.debug(f"try later since {tmpStr}")
0235 else:
0236
0237 tmpLog.debug(f"fatal error to zip with {tmpStr}")
0238
0239 for fileSpec in jobSpec.outFiles:
0240 if fileSpec.status == "zipping":
0241 fileSpec.status = "failed"
0242 errStr = f"zip-output failed with {tmpStr}"
0243 jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
0244 jobSpec.trigger_propagation()
0245 newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0246 tmpLog.debug(f"updated new subStatus={newSubStatus}")
0247 except Exception:
0248 core_utils.dump_error_message(tmpLog)
0249 if usePostZipping:
0250 jobsToPostZip = self.dbProxy.get_jobs_for_stage_out(
0251 pluginConf.maxJobsToZip,
0252 zipInterval,
0253 pluginConf.lockInterval,
0254 lockedBy,
0255 "to_transfer",
0256 JobSpec.HO_hasPostZipOutput,
0257 [JobSpec.HO_hasOutput, JobSpec.HO_hasZipOutput],
0258 max_files_per_job=maxFilesPerJob,
0259 )
0260 mainLog.debug(f"got {len(jobsToPostZip)} jobs to post-zip")
0261
0262 for jobSpec in jobsToPostZip:
0263 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0264 try:
0265 tmpLog.debug("try to post-zip output")
0266
0267 configID = jobSpec.configID
0268 if not core_utils.dynamic_plugin_change():
0269 configID = None
0270
0271 if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0272 tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0273 continue
0274 queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0275
0276 if hasattr(queueConfig, "zipper"):
0277 zipperCore = self.pluginFactory.get_plugin(queueConfig.zipper)
0278 else:
0279 zipperCore = self.pluginFactory.get_plugin(queueConfig.stager)
0280 if zipperCore is None:
0281
0282 tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0283 continue
0284
0285 lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "stagerTime", "stagerLock", lockedBy)
0286 if not lockedAgain:
0287 tmpLog.debug("skip since locked by another thread")
0288 continue
0289
0290 tmpLog.debug(f"plugin={zipperCore.__class__.__name__}")
0291 tmpStat, tmpStr = zipperCore.post_zip_output(jobSpec)
0292
0293 if tmpStat is True:
0294
0295 jobSpec.trigger_stage_out()
0296 jobSpec.all_files_zipped()
0297 newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, False, lockedBy)
0298 tmpLog.debug(f"post-zipped new subStatus={newSubStatus}")
0299 elif tmpStat is None:
0300
0301 tmpLog.debug(f"try to post-zip later since {tmpStr}")
0302 else:
0303
0304 tmpLog.debug(f"fatal error to post-zip since {tmpStr}")
0305
0306 for fileSpec in jobSpec.outFiles:
0307 if fileSpec.status == "post_zipping":
0308 fileSpec.status = "failed"
0309 errStr = f"post-zipping failed with {tmpStr}"
0310 jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
0311 jobSpec.trigger_propagation()
0312 newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0313 tmpLog.debug(f"updated new subStatus={newSubStatus}")
0314 except Exception:
0315 core_utils.dump_error_message(tmpLog)
0316
0317 mainLog.debug("done" + sw.get_elapsed_time())
0318
0319 if self.terminated(harvester_config.stager.sleepTime):
0320 mainLog.debug("terminated")
0321 return