File indexing completed on 2026-04-20 07:58:57
0001 import datetime
0002 import time
0003
0004 from pandaharvester.harvesterbody.agent_base import AgentBase
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.command_spec import CommandSpec
0008 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0009 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0010
0011
0012 _logger = core_utils.setup_logger("propagator")
0013
0014 STATS_PERIOD = 300
0015 METRICS_PERIOD = 300
0016
0017
0018
0019
0020 class Propagator(AgentBase):
0021
0022 def __init__(self, communicator, queue_config_mapper, single_mode=False):
0023 AgentBase.__init__(self, single_mode)
0024 self.dbProxy = DBProxy()
0025 self.communicator = communicator
0026 self.queueConfigMapper = queue_config_mapper
0027 self._last_stats_update = None
0028 self._last_metrics_update = None
0029
0030
0031 self.dbProxy.clean_service_metrics()
0032
0033
0034 def run(self):
0035
0036 while True:
0037 sw_main = core_utils.get_stopwatch()
0038 mainLog = self.make_logger(_logger, f"id={self.get_pid()}", method_name="run")
0039 mainLog.debug("getting jobs to propagate")
0040 sw = core_utils.get_stopwatch()
0041 jobSpecs = self.dbProxy.get_jobs_to_propagate(
0042 harvester_config.propagator.maxJobs, harvester_config.propagator.lockInterval, harvester_config.propagator.updateInterval, self.get_pid()
0043 )
0044 mainLog.debug(f"got {len(jobSpecs)} jobs {sw.get_elapsed_time()}")
0045
0046 iJobs = 0
0047 nJobs = harvester_config.propagator.nJobsInBulk
0048 hbSuppressMap = dict()
0049 while iJobs < len(jobSpecs):
0050 jobList = jobSpecs[iJobs : iJobs + nJobs]
0051 iJobs += nJobs
0052
0053 jobListToSkip = []
0054 jobListToUpdate = []
0055 jobListToCheck = []
0056 retList = []
0057 for tmpJobSpec in jobList:
0058 if tmpJobSpec.computingSite not in hbSuppressMap:
0059 queueConfig = self.queueConfigMapper.get_queue(tmpJobSpec.computingSite, tmpJobSpec.configID)
0060 if queueConfig:
0061 hbSuppressMap[tmpJobSpec.computingSite] = queueConfig.get_no_heartbeat_status()
0062 else:
0063 hbSuppressMap[tmpJobSpec.computingSite] = ["running", "transferring", "finished", "failed"]
0064
0065 if tmpJobSpec.get_status() in hbSuppressMap[tmpJobSpec.computingSite] and not tmpJobSpec.not_suppress_heartbeat():
0066
0067 if tmpJobSpec.status == "running":
0068 jobListToCheck.append(tmpJobSpec)
0069 else:
0070 jobListToSkip.append(tmpJobSpec)
0071 retList.append({"StatusCode": 0, "command": None})
0072 else:
0073 jobListToUpdate.append(tmpJobSpec)
0074 sw.reset()
0075 retList += self.communicator.check_jobs(jobListToCheck)
0076 mainLog.debug(f"check_jobs for {len(jobListToCheck)} jobs {sw.get_elapsed_time()}")
0077 sw.reset()
0078 retList += self.communicator.update_jobs(jobListToUpdate, self.get_pid())
0079 mainLog.debug(f"update_jobs for {len(jobListToUpdate)} jobs took {sw.get_elapsed_time()}")
0080
0081 for tmpJobSpec, tmpRet in zip(jobListToSkip + jobListToCheck + jobListToUpdate, retList):
0082 if tmpRet["StatusCode"] == 0:
0083 if tmpJobSpec in jobListToUpdate:
0084 mainLog.debug(f"updated PandaID={tmpJobSpec.PandaID} status={tmpJobSpec.status}")
0085 else:
0086 mainLog.debug(f"skip updating PandaID={tmpJobSpec.PandaID} status={tmpJobSpec.status}")
0087
0088 tmpJobSpec.propagatorLock = None
0089 if tmpJobSpec.is_final_status() and tmpJobSpec.status == tmpJobSpec.get_status():
0090
0091 tmpJobSpec.propagatorTime = None
0092 tmpJobSpec.subStatus = "done"
0093 tmpJobSpec.modificationTime = core_utils.naive_utcnow()
0094 elif tmpJobSpec.is_final_status() and not tmpJobSpec.all_events_done():
0095
0096 tmpJobSpec.trigger_propagation()
0097 else:
0098
0099 if tmpJobSpec.status == "starting" and "eventService" in tmpJobSpec.jobParams and tmpJobSpec.subStatus != "submitted":
0100 tmpEvStat, tmpEvRet = self.communicator.check_event_availability(tmpJobSpec)
0101 if tmpEvStat:
0102 if tmpEvRet is not None:
0103 tmpJobSpec.nRemainingEvents = tmpEvRet
0104 if tmpEvRet == 0:
0105 mainLog.debug(f"kill PandaID={tmpJobSpec.PandaID} due to no event")
0106 tmpRet["command"] = "tobekilled"
0107
0108 if "command" in tmpRet and tmpRet["command"] in ["tobekilled"]:
0109 nWorkers = self.dbProxy.mark_workers_to_kill_by_pandaid(tmpJobSpec.PandaID)
0110 if nWorkers == 0:
0111
0112 tmpJobSpec.status = "cancelled"
0113 tmpJobSpec.subStatus = "killed"
0114 tmpJobSpec.set_pilot_error(PilotErrors.PANDAKILL, PilotErrors.pilot_error_msg[PilotErrors.PANDAKILL])
0115 tmpJobSpec.stateChangeTime = core_utils.naive_utcnow()
0116 tmpJobSpec.trigger_propagation()
0117 self.dbProxy.update_job(tmpJobSpec, {"propagatorLock": self.get_pid()}, update_out_file=True)
0118 else:
0119 mainLog.error(f"failed to update PandaID={tmpJobSpec.PandaID} status={tmpJobSpec.status}")
0120 mainLog.debug("getting workers to propagate")
0121 sw.reset()
0122 workSpecs = self.dbProxy.get_workers_to_propagate(harvester_config.propagator.maxWorkers, harvester_config.propagator.updateInterval)
0123 mainLog.debug(f"got {len(workSpecs)} workers {sw.get_elapsed_time()}")
0124
0125 sw.reset()
0126 iWorkers = 0
0127 nWorkers = harvester_config.propagator.nWorkersInBulk
0128 while iWorkers < len(workSpecs):
0129 workList = workSpecs[iWorkers : iWorkers + nWorkers]
0130 iWorkers += nWorkers
0131 retList, tmpErrStr = self.communicator.update_workers(workList)
0132
0133 if retList is None:
0134 mainLog.error(f"failed to update workers with {tmpErrStr}")
0135 else:
0136 for tmpWorkSpec, tmpRet in zip(workList, retList):
0137 if tmpRet:
0138 mainLog.debug(f"updated workerID={tmpWorkSpec.workerID} status={tmpWorkSpec.status}")
0139
0140 for logFilePath, logOffset, logSize, logRemoteName in tmpWorkSpec.get_log_files_to_upload():
0141 with open(logFilePath, "rb") as logFileObj:
0142 tmpStat, tmpErr = self.communicator.upload_file(logRemoteName, logFileObj, logOffset, logSize)
0143 if tmpStat:
0144 tmpWorkSpec.update_log_files_to_upload(logFilePath, logOffset + logSize)
0145
0146 if tmpWorkSpec.is_final_status():
0147 tmpWorkSpec.disable_propagation()
0148 self.dbProxy.update_worker(tmpWorkSpec, {"workerID": tmpWorkSpec.workerID})
0149 else:
0150 mainLog.error(f"failed to update workerID={tmpWorkSpec.workerID} status={tmpWorkSpec.status}")
0151 mainLog.debug(f"update_workers for {iWorkers} workers took {sw.get_elapsed_time()}")
0152 mainLog.debug("getting commands")
0153 commandSpecs = self.dbProxy.get_commands_for_receiver("propagator")
0154 mainLog.debug(f"got {len(commandSpecs)} commands")
0155 for commandSpec in commandSpecs:
0156 if commandSpec.command.startswith(CommandSpec.COM_reportWorkerStats):
0157
0158 siteName = commandSpec.command.split(":")[-1]
0159 workerStats = self.dbProxy.get_worker_stats(siteName)
0160 if len(workerStats) == 0:
0161 mainLog.error(f"failed to get worker stats for {siteName}")
0162 else:
0163
0164 tmpRet, tmpStr = self.communicator.update_worker_stats(siteName, workerStats)
0165 if tmpRet:
0166 mainLog.debug(f"updated worker stats (command) for {siteName}")
0167 else:
0168 mainLog.error(f"failed to update worker stats (command) for {siteName} err={tmpStr}")
0169
0170 if not self._last_stats_update or time.time() - self._last_stats_update > STATS_PERIOD:
0171
0172
0173 active_ups_queues = self.queueConfigMapper.get_active_ups_queues()
0174
0175
0176 worker_stats_bulk = self.dbProxy.get_worker_stats_bulk(active_ups_queues)
0177 if not worker_stats_bulk:
0178 mainLog.error("failed to get worker stats in bulk")
0179 else:
0180 for site_name in worker_stats_bulk:
0181 tmp_ret, tmp_str = self.communicator.update_worker_stats(site_name, worker_stats_bulk[site_name])
0182 if tmp_ret:
0183 mainLog.debug(f"update of worker stats (bulk) for {site_name}")
0184 self._last_stats_update = time.time()
0185 else:
0186 mainLog.error(f"failed to update worker stats (bulk) for {site_name} err={tmp_str}")
0187
0188 if not self._last_metrics_update or core_utils.naive_utcnow() - self._last_metrics_update > datetime.timedelta(seconds=METRICS_PERIOD):
0189
0190 service_metrics_list = self.dbProxy.get_service_metrics(self._last_metrics_update)
0191 if not service_metrics_list:
0192 if self._last_metrics_update:
0193 mainLog.error("failed to get service metrics")
0194 self._last_metrics_update = core_utils.naive_utcnow()
0195 else:
0196 tmp_ret, tmp_str = self.communicator.update_service_metrics(service_metrics_list)
0197 if tmp_ret:
0198 mainLog.debug("update of service metrics OK")
0199 self._last_metrics_update = core_utils.naive_utcnow()
0200 else:
0201 mainLog.error(f"failed to update service metrics err={tmp_str}")
0202
0203
0204 mainLog.debug("getting dialog messages to propagate")
0205 try:
0206 maxDialogs = harvester_config.propagator.maxDialogs
0207 except Exception:
0208 maxDialogs = 50
0209 diagSpecs = self.dbProxy.get_dialog_messages_to_send(maxDialogs, harvester_config.propagator.lockInterval)
0210 mainLog.debug(f"got {len(diagSpecs)} dialogs")
0211 if len(diagSpecs) > 0:
0212 tmpStat, tmpStr = self.communicator.send_dialog_messages(diagSpecs)
0213 if tmpStat:
0214 diagIDs = [diagSpec.diagID for diagSpec in diagSpecs]
0215 self.dbProxy.delete_dialog_messages(diagIDs)
0216 mainLog.debug(f"sent {len(diagSpecs)} dialogs")
0217
0218 else:
0219 mainLog.error(f"failed to send dialogs err={tmpStr}")
0220 if sw_main.get_elapsed_time_in_sec() > harvester_config.propagator.lockInterval:
0221 mainLog.warning("a single cycle was longer than lockInterval. done" + sw_main.get_elapsed_time())
0222 else:
0223 mainLog.debug("done" + sw_main.get_elapsed_time())
0224
0225 if self.terminated(harvester_config.propagator.sleepTime):
0226 mainLog.debug("terminated")
0227 return