Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 from pandaharvester.harvesterbody.agent_base import AgentBase
0002 from pandaharvester.harvesterconfig import harvester_config
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0005 from pandaharvester.harvestercore.job_spec import JobSpec
0006 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0007 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0008 
0009 # logger
0010 _logger = core_utils.setup_logger("stager")
0011 
0012 
0013 # class for stage-out
0014 class Stager(AgentBase):
0015     # constructor
0016     def __init__(self, queue_config_mapper, single_mode=False):
0017         AgentBase.__init__(self, single_mode)
0018         self.dbProxy = DBProxy()
0019         self.queueConfigMapper = queue_config_mapper
0020         self.pluginFactory = PluginFactory()
0021 
0022     # main loop
0023     def run(self):
0024         lockedBy = f"stager-{self.get_pid()}"
0025         while True:
0026             sw = core_utils.get_stopwatch()
0027             mainLog = self.make_logger(_logger, f"id={lockedBy}", method_name="run")
0028             mainLog.debug("try to get jobs to check")
0029             # get jobs to check preparation
0030             try:
0031                 maxFilesPerJob = harvester_config.stager.maxFilesPerJobToCheck
0032             except Exception:
0033                 maxFilesPerJob = None
0034             jobsToCheck = self.dbProxy.get_jobs_for_stage_out(
0035                 harvester_config.stager.maxJobsToCheck,
0036                 harvester_config.stager.checkInterval,
0037                 harvester_config.stager.lockInterval,
0038                 lockedBy,
0039                 "transferring",
0040                 JobSpec.HO_hasTransfer,
0041                 max_files_per_job=maxFilesPerJob,
0042             )
0043             mainLog.debug(f"got {len(jobsToCheck)} jobs to check")
0044             # loop over all jobs
0045             for jobSpec in jobsToCheck:
0046                 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0047                 try:
0048                     tmpLog.debug("start checking")
0049                     # configID
0050                     configID = jobSpec.configID
0051                     if not core_utils.dynamic_plugin_change():
0052                         configID = None
0053                     # get queue
0054                     if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0055                         tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0056                         continue
0057                     queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0058                     # get plugin
0059                     stagerCore = self.pluginFactory.get_plugin(queueConfig.stager)
0060                     if stagerCore is None:
0061                         # not found
0062                         tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0063                         continue
0064                     # lock job again
0065                     lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "stagerTime", "stagerLock", lockedBy)
0066                     if not lockedAgain:
0067                         tmpLog.debug("skip since locked by another thread")
0068                         continue
0069                     tmpLog.debug(f"plugin={stagerCore.__class__.__name__}")
0070                     tmpStat, tmpStr = stagerCore.check_stage_out_status(jobSpec)
0071                     # check result
0072                     if tmpStat is True:
0073                         # succeeded
0074                         newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0075                         tmpLog.debug(f"succeeded new subStatus={newSubStatus}")
0076                     elif tmpStat is False:
0077                         # fatal error
0078                         tmpLog.debug(f"fatal error when checking status with {tmpStr}")
0079                         # update job
0080                         for fileSpec in jobSpec.outFiles:
0081                             if fileSpec.status != "finished":
0082                                 fileSpec.status = "failed"
0083                         errStr = f"stage-out failed with {tmpStr}"
0084                         jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
0085                         jobSpec.trigger_propagation()
0086                         newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0087                         tmpLog.debug(f"updated new subStatus={newSubStatus}")
0088                     else:
0089                         # on-going
0090                         tmpLog.debug(f"try to check later since {tmpStr}")
0091                 except Exception:
0092                     core_utils.dump_error_message(tmpLog)
0093             # get jobs to trigger stage-out
0094             try:
0095                 maxFilesPerJob = harvester_config.stager.maxFilesPerJobToTrigger
0096             except Exception:
0097                 maxFilesPerJob = None
0098             jobsToTrigger = self.dbProxy.get_jobs_for_stage_out(
0099                 harvester_config.stager.maxJobsToTrigger,
0100                 harvester_config.stager.triggerInterval,
0101                 harvester_config.stager.lockInterval,
0102                 lockedBy,
0103                 "to_transfer",
0104                 JobSpec.HO_hasOutput,
0105                 [JobSpec.HO_hasZipOutput, JobSpec.HO_hasPostZipOutput],
0106                 max_files_per_job=maxFilesPerJob,
0107             )
0108             mainLog.debug(f"got {len(jobsToTrigger)} jobs to trigger")
0109             # loop over all jobs
0110             for jobSpec in jobsToTrigger:
0111                 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0112                 try:
0113                     tmpLog.debug("try to trigger stage-out")
0114                     # configID
0115                     configID = jobSpec.configID
0116                     if not core_utils.dynamic_plugin_change():
0117                         configID = None
0118                     # get queue
0119                     if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0120                         tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0121                         continue
0122                     queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0123                     # get plugin
0124                     stagerCore = self.pluginFactory.get_plugin(queueConfig.stager)
0125                     if stagerCore is None:
0126                         # not found
0127                         tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0128                         continue
0129                     # lock job again
0130                     lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "stagerTime", "stagerLock", lockedBy)
0131                     if not lockedAgain:
0132                         tmpLog.debug("skip since locked by another thread")
0133                         continue
0134                     # trigger stage-out
0135                     tmpLog.debug(f"plugin={stagerCore.__class__.__name__}")
0136                     tmpStat, tmpStr = stagerCore.trigger_stage_out(jobSpec)
0137                     # check result
0138                     if tmpStat is True:
0139                         # succeeded
0140                         jobSpec.trigger_stage_out()
0141                         jobSpec.all_files_triggered_to_stage_out()
0142                         newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0143                         tmpLog.debug(f"triggered new subStatus={newSubStatus}")
0144                     elif tmpStat is False:
0145                         # fatal error
0146                         tmpLog.debug(f"fatal error to trigger with {tmpStr}")
0147                         # update job
0148                         for fileSpec in jobSpec.outFiles:
0149                             if fileSpec.status != "finished":
0150                                 fileSpec.status = "failed"
0151                         errStr = f"stage-out failed with {tmpStr}"
0152                         jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
0153                         jobSpec.trigger_propagation()
0154                         newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0155                         tmpLog.debug(f"updated new subStatus={newSubStatus}")
0156                     else:
0157                         # temporary error
0158                         tmpLog.debug(f"try to trigger later since {tmpStr}")
0159                 except Exception:
0160                     core_utils.dump_error_message(tmpLog)
0161             # get jobs to zip output
0162             if hasattr(harvester_config, "zipper"):
0163                 pluginConf = harvester_config.zipper
0164             else:
0165                 pluginConf = harvester_config.stager
0166             try:
0167                 maxFilesPerJob = pluginConf.maxFilesPerJobToZip
0168             except Exception:
0169                 maxFilesPerJob = None
0170             try:
0171                 zipInterval = pluginConf.zipInterval
0172             except Exception:
0173                 zipInterval = pluginConf.triggerInterval
0174             try:
0175                 usePostZipping = pluginConf.usePostZipping
0176             except Exception:
0177                 usePostZipping = False
0178             jobsToZip = self.dbProxy.get_jobs_for_stage_out(
0179                 pluginConf.maxJobsToZip,
0180                 zipInterval,
0181                 pluginConf.lockInterval,
0182                 lockedBy,
0183                 "to_transfer",
0184                 JobSpec.HO_hasZipOutput,
0185                 [JobSpec.HO_hasOutput, JobSpec.HO_hasPostZipOutput],
0186                 max_files_per_job=maxFilesPerJob,
0187             )
0188             mainLog.debug(f"got {len(jobsToZip)} jobs to zip")
0189             # loop over all jobs
0190             for jobSpec in jobsToZip:
0191                 tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0192                 try:
0193                     tmpLog.debug("try to zip output")
0194                     # configID
0195                     configID = jobSpec.configID
0196                     if not core_utils.dynamic_plugin_change():
0197                         configID = None
0198                     # get queue
0199                     if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0200                         tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0201                         continue
0202                     queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0203                     # get plugin
0204                     if hasattr(queueConfig, "zipper"):
0205                         zipperCore = self.pluginFactory.get_plugin(queueConfig.zipper)
0206                     else:
0207                         zipperCore = self.pluginFactory.get_plugin(queueConfig.stager)
0208                     if zipperCore is None:
0209                         # not found
0210                         tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0211                         continue
0212                     # lock job again
0213                     lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "stagerTime", "stagerLock", lockedBy)
0214                     if not lockedAgain:
0215                         tmpLog.debug("skip since locked by another thread")
0216                         continue
0217                     # zipping
0218                     tmpLog.debug(f"plugin={zipperCore.__class__.__name__}")
0219                     if usePostZipping:
0220                         tmpStat, tmpStr = zipperCore.async_zip_output(jobSpec)
0221                     else:
0222                         tmpStat, tmpStr = zipperCore.zip_output(jobSpec)
0223                     # succeeded
0224                     if tmpStat is True:
0225                         # update job
0226                         jobSpec.trigger_stage_out()
0227                         jobSpec.all_files_zipped(usePostZipping)
0228                         newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, False, lockedBy)
0229                         if usePostZipping:
0230                             tmpLog.debug(f"async zipped new subStatus={newSubStatus}")
0231                         else:
0232                             tmpLog.debug(f"zipped new subStatus={newSubStatus}")
0233                     elif tmpStat is None:
0234                         tmpLog.debug(f"try later since {tmpStr}")
0235                     else:
0236                         # failed
0237                         tmpLog.debug(f"fatal error to zip with {tmpStr}")
0238                         # update job
0239                         for fileSpec in jobSpec.outFiles:
0240                             if fileSpec.status == "zipping":
0241                                 fileSpec.status = "failed"
0242                         errStr = f"zip-output failed with {tmpStr}"
0243                         jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
0244                         jobSpec.trigger_propagation()
0245                         newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0246                         tmpLog.debug(f"updated new subStatus={newSubStatus}")
0247                 except Exception:
0248                     core_utils.dump_error_message(tmpLog)
0249             if usePostZipping:
0250                 jobsToPostZip = self.dbProxy.get_jobs_for_stage_out(
0251                     pluginConf.maxJobsToZip,
0252                     zipInterval,
0253                     pluginConf.lockInterval,
0254                     lockedBy,
0255                     "to_transfer",
0256                     JobSpec.HO_hasPostZipOutput,
0257                     [JobSpec.HO_hasOutput, JobSpec.HO_hasZipOutput],
0258                     max_files_per_job=maxFilesPerJob,
0259                 )
0260                 mainLog.debug(f"got {len(jobsToPostZip)} jobs to post-zip")
0261                 # loop over all jobs
0262                 for jobSpec in jobsToPostZip:
0263                     tmpLog = self.make_logger(_logger, f"PandaID={jobSpec.PandaID}", method_name="run")
0264                     try:
0265                         tmpLog.debug("try to post-zip output")
0266                         # configID
0267                         configID = jobSpec.configID
0268                         if not core_utils.dynamic_plugin_change():
0269                             configID = None
0270                         # get queue
0271                         if not self.queueConfigMapper.has_queue(jobSpec.computingSite, configID):
0272                             tmpLog.error(f"queue config for {jobSpec.computingSite}/{configID} not found")
0273                             continue
0274                         queueConfig = self.queueConfigMapper.get_queue(jobSpec.computingSite, configID)
0275                         # get plugin
0276                         if hasattr(queueConfig, "zipper"):
0277                             zipperCore = self.pluginFactory.get_plugin(queueConfig.zipper)
0278                         else:
0279                             zipperCore = self.pluginFactory.get_plugin(queueConfig.stager)
0280                         if zipperCore is None:
0281                             # not found
0282                             tmpLog.error(f"plugin for {jobSpec.computingSite} not found")
0283                             continue
0284                         # lock job again
0285                         lockedAgain = self.dbProxy.lock_job_again(jobSpec.PandaID, "stagerTime", "stagerLock", lockedBy)
0286                         if not lockedAgain:
0287                             tmpLog.debug("skip since locked by another thread")
0288                             continue
0289                         # post-zipping
0290                         tmpLog.debug(f"plugin={zipperCore.__class__.__name__}")
0291                         tmpStat, tmpStr = zipperCore.post_zip_output(jobSpec)
0292                         # succeeded
0293                         if tmpStat is True:
0294                             # update job
0295                             jobSpec.trigger_stage_out()
0296                             jobSpec.all_files_zipped()
0297                             newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, False, lockedBy)
0298                             tmpLog.debug(f"post-zipped new subStatus={newSubStatus}")
0299                         elif tmpStat is None:
0300                             # pending
0301                             tmpLog.debug(f"try to post-zip later since {tmpStr}")
0302                         else:
0303                             # fatal error
0304                             tmpLog.debug(f"fatal error to post-zip since {tmpStr}")
0305                             # update job
0306                             for fileSpec in jobSpec.outFiles:
0307                                 if fileSpec.status == "post_zipping":
0308                                     fileSpec.status = "failed"
0309                             errStr = f"post-zipping failed with {tmpStr}"
0310                             jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
0311                             jobSpec.trigger_propagation()
0312                             newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
0313                             tmpLog.debug(f"updated new subStatus={newSubStatus}")
0314                     except Exception:
0315                         core_utils.dump_error_message(tmpLog)
0316 
0317             mainLog.debug("done" + sw.get_elapsed_time())
0318             # check if being terminated
0319             if self.terminated(harvester_config.stager.sleepTime):
0320                 mainLog.debug("terminated")
0321                 return