Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 import datetime
0002 import time
0003 
0004 from pandaharvester.harvesterbody.agent_base import AgentBase
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.command_spec import CommandSpec
0008 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0009 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0010 
0011 # logger
0012 _logger = core_utils.setup_logger("propagator")
0013 
0014 STATS_PERIOD = 300
0015 METRICS_PERIOD = 300
0016 
0017 # propagate important checkpoints to panda
0018 
0019 
0020 class Propagator(AgentBase):
0021     # constructor
0022     def __init__(self, communicator, queue_config_mapper, single_mode=False):
0023         AgentBase.__init__(self, single_mode)
0024         self.dbProxy = DBProxy()
0025         self.communicator = communicator
0026         self.queueConfigMapper = queue_config_mapper
0027         self._last_stats_update = None
0028         self._last_metrics_update = None
0029 
0030         # clean old service metrics at start up of the agent
0031         self.dbProxy.clean_service_metrics()
0032 
0033     # main loop
0034     def run(self):
0035 
0036         while True:
0037             sw_main = core_utils.get_stopwatch()
0038             mainLog = self.make_logger(_logger, f"id={self.get_pid()}", method_name="run")
0039             mainLog.debug("getting jobs to propagate")
0040             sw = core_utils.get_stopwatch()
0041             jobSpecs = self.dbProxy.get_jobs_to_propagate(
0042                 harvester_config.propagator.maxJobs, harvester_config.propagator.lockInterval, harvester_config.propagator.updateInterval, self.get_pid()
0043             )
0044             mainLog.debug(f"got {len(jobSpecs)} jobs {sw.get_elapsed_time()}")
0045             # update jobs in central database
0046             iJobs = 0
0047             nJobs = harvester_config.propagator.nJobsInBulk
0048             hbSuppressMap = dict()
0049             while iJobs < len(jobSpecs):
0050                 jobList = jobSpecs[iJobs : iJobs + nJobs]
0051                 iJobs += nJobs
0052                 # collect jobs to update or check
0053                 jobListToSkip = []
0054                 jobListToUpdate = []
0055                 jobListToCheck = []
0056                 retList = []
0057                 for tmpJobSpec in jobList:
0058                     if tmpJobSpec.computingSite not in hbSuppressMap:
0059                         queueConfig = self.queueConfigMapper.get_queue(tmpJobSpec.computingSite, tmpJobSpec.configID)
0060                         if queueConfig:
0061                             hbSuppressMap[tmpJobSpec.computingSite] = queueConfig.get_no_heartbeat_status()
0062                         else:  # assume truepilot
0063                             hbSuppressMap[tmpJobSpec.computingSite] = ["running", "transferring", "finished", "failed"]
0064                     # heartbeat is suppressed
0065                     if tmpJobSpec.get_status() in hbSuppressMap[tmpJobSpec.computingSite] and not tmpJobSpec.not_suppress_heartbeat():
0066                         # check running job to detect lost heartbeat
0067                         if tmpJobSpec.status == "running":
0068                             jobListToCheck.append(tmpJobSpec)
0069                         else:
0070                             jobListToSkip.append(tmpJobSpec)
0071                             retList.append({"StatusCode": 0, "command": None})
0072                     else:
0073                         jobListToUpdate.append(tmpJobSpec)
0074                 sw.reset()
0075                 retList += self.communicator.check_jobs(jobListToCheck)
0076                 mainLog.debug(f"check_jobs for {len(jobListToCheck)} jobs {sw.get_elapsed_time()}")
0077                 sw.reset()
0078                 retList += self.communicator.update_jobs(jobListToUpdate, self.get_pid())
0079                 mainLog.debug(f"update_jobs for {len(jobListToUpdate)} jobs took {sw.get_elapsed_time()}")
0080                 # logging
0081                 for tmpJobSpec, tmpRet in zip(jobListToSkip + jobListToCheck + jobListToUpdate, retList):
0082                     if tmpRet["StatusCode"] == 0:
0083                         if tmpJobSpec in jobListToUpdate:
0084                             mainLog.debug(f"updated PandaID={tmpJobSpec.PandaID} status={tmpJobSpec.status}")
0085                         else:
0086                             mainLog.debug(f"skip updating PandaID={tmpJobSpec.PandaID} status={tmpJobSpec.status}")
0087                         # release job
0088                         tmpJobSpec.propagatorLock = None
0089                         if tmpJobSpec.is_final_status() and tmpJobSpec.status == tmpJobSpec.get_status():
0090                             # unset to disable further updating
0091                             tmpJobSpec.propagatorTime = None
0092                             tmpJobSpec.subStatus = "done"
0093                             tmpJobSpec.modificationTime = core_utils.naive_utcnow()
0094                         elif tmpJobSpec.is_final_status() and not tmpJobSpec.all_events_done():
0095                             # trigger next propagation to update remaining events
0096                             tmpJobSpec.trigger_propagation()
0097                         else:
0098                             # check event availability
0099                             if tmpJobSpec.status == "starting" and "eventService" in tmpJobSpec.jobParams and tmpJobSpec.subStatus != "submitted":
0100                                 tmpEvStat, tmpEvRet = self.communicator.check_event_availability(tmpJobSpec)
0101                                 if tmpEvStat:
0102                                     if tmpEvRet is not None:
0103                                         tmpJobSpec.nRemainingEvents = tmpEvRet
0104                                     if tmpEvRet == 0:
0105                                         mainLog.debug(f"kill PandaID={tmpJobSpec.PandaID} due to no event")
0106                                         tmpRet["command"] = "tobekilled"
0107                             # got kill command
0108                             if "command" in tmpRet and tmpRet["command"] in ["tobekilled"]:
0109                                 nWorkers = self.dbProxy.mark_workers_to_kill_by_pandaid(tmpJobSpec.PandaID)
0110                                 if nWorkers == 0:
0111                                     # no workers
0112                                     tmpJobSpec.status = "cancelled"
0113                                     tmpJobSpec.subStatus = "killed"
0114                                     tmpJobSpec.set_pilot_error(PilotErrors.PANDAKILL, PilotErrors.pilot_error_msg[PilotErrors.PANDAKILL])
0115                                     tmpJobSpec.stateChangeTime = core_utils.naive_utcnow()
0116                                     tmpJobSpec.trigger_propagation()
0117                         self.dbProxy.update_job(tmpJobSpec, {"propagatorLock": self.get_pid()}, update_out_file=True)
0118                     else:
0119                         mainLog.error(f"failed to update PandaID={tmpJobSpec.PandaID} status={tmpJobSpec.status}")
0120             mainLog.debug("getting workers to propagate")
0121             sw.reset()
0122             workSpecs = self.dbProxy.get_workers_to_propagate(harvester_config.propagator.maxWorkers, harvester_config.propagator.updateInterval)
0123             mainLog.debug(f"got {len(workSpecs)} workers {sw.get_elapsed_time()}")
0124             # update workers in central database
0125             sw.reset()
0126             iWorkers = 0
0127             nWorkers = harvester_config.propagator.nWorkersInBulk
0128             while iWorkers < len(workSpecs):
0129                 workList = workSpecs[iWorkers : iWorkers + nWorkers]
0130                 iWorkers += nWorkers
0131                 retList, tmpErrStr = self.communicator.update_workers(workList)
0132                 # logging
0133                 if retList is None:
0134                     mainLog.error(f"failed to update workers with {tmpErrStr}")
0135                 else:
0136                     for tmpWorkSpec, tmpRet in zip(workList, retList):
0137                         if tmpRet:
0138                             mainLog.debug(f"updated workerID={tmpWorkSpec.workerID} status={tmpWorkSpec.status}")
0139                             # update logs
0140                             for logFilePath, logOffset, logSize, logRemoteName in tmpWorkSpec.get_log_files_to_upload():
0141                                 with open(logFilePath, "rb") as logFileObj:
0142                                     tmpStat, tmpErr = self.communicator.upload_file(logRemoteName, logFileObj, logOffset, logSize)
0143                                     if tmpStat:
0144                                         tmpWorkSpec.update_log_files_to_upload(logFilePath, logOffset + logSize)
0145                             # disable further update
0146                             if tmpWorkSpec.is_final_status():
0147                                 tmpWorkSpec.disable_propagation()
0148                             self.dbProxy.update_worker(tmpWorkSpec, {"workerID": tmpWorkSpec.workerID})
0149                         else:
0150                             mainLog.error(f"failed to update workerID={tmpWorkSpec.workerID} status={tmpWorkSpec.status}")
0151             mainLog.debug(f"update_workers for {iWorkers} workers took {sw.get_elapsed_time()}")
0152             mainLog.debug("getting commands")
0153             commandSpecs = self.dbProxy.get_commands_for_receiver("propagator")
0154             mainLog.debug(f"got {len(commandSpecs)} commands")
0155             for commandSpec in commandSpecs:
0156                 if commandSpec.command.startswith(CommandSpec.COM_reportWorkerStats):
0157                     # get worker stats
0158                     siteName = commandSpec.command.split(":")[-1]
0159                     workerStats = self.dbProxy.get_worker_stats(siteName)
0160                     if len(workerStats) == 0:
0161                         mainLog.error(f"failed to get worker stats for {siteName}")
0162                     else:
0163                         # report worker stats
0164                         tmpRet, tmpStr = self.communicator.update_worker_stats(siteName, workerStats)
0165                         if tmpRet:
0166                             mainLog.debug(f"updated worker stats (command) for {siteName}")
0167                         else:
0168                             mainLog.error(f"failed to update worker stats (command) for {siteName} err={tmpStr}")
0169 
0170             if not self._last_stats_update or time.time() - self._last_stats_update > STATS_PERIOD:
0171                 # get active UPS queues. PanDA server needs to know about them and which harvester instance is taking
0172                 # care of them
0173                 active_ups_queues = self.queueConfigMapper.get_active_ups_queues()
0174 
0175                 # update worker stats for all sites
0176                 worker_stats_bulk = self.dbProxy.get_worker_stats_bulk(active_ups_queues)
0177                 if not worker_stats_bulk:
0178                     mainLog.error("failed to get worker stats in bulk")
0179                 else:
0180                     for site_name in worker_stats_bulk:
0181                         tmp_ret, tmp_str = self.communicator.update_worker_stats(site_name, worker_stats_bulk[site_name])
0182                         if tmp_ret:
0183                             mainLog.debug(f"update of worker stats (bulk) for {site_name}")
0184                             self._last_stats_update = time.time()
0185                         else:
0186                             mainLog.error(f"failed to update worker stats (bulk) for {site_name} err={tmp_str}")
0187 
0188             if not self._last_metrics_update or core_utils.naive_utcnow() - self._last_metrics_update > datetime.timedelta(seconds=METRICS_PERIOD):
0189                 # get latest metrics from DB
0190                 service_metrics_list = self.dbProxy.get_service_metrics(self._last_metrics_update)
0191                 if not service_metrics_list:
0192                     if self._last_metrics_update:
0193                         mainLog.error("failed to get service metrics")
0194                     self._last_metrics_update = core_utils.naive_utcnow()
0195                 else:
0196                     tmp_ret, tmp_str = self.communicator.update_service_metrics(service_metrics_list)
0197                     if tmp_ret:
0198                         mainLog.debug("update of service metrics OK")
0199                         self._last_metrics_update = core_utils.naive_utcnow()
0200                     else:
0201                         mainLog.error(f"failed to update service metrics err={tmp_str}")
0202 
0203             # send dialog messages
0204             mainLog.debug("getting dialog messages to propagate")
0205             try:
0206                 maxDialogs = harvester_config.propagator.maxDialogs
0207             except Exception:
0208                 maxDialogs = 50
0209             diagSpecs = self.dbProxy.get_dialog_messages_to_send(maxDialogs, harvester_config.propagator.lockInterval)
0210             mainLog.debug(f"got {len(diagSpecs)} dialogs")
0211             if len(diagSpecs) > 0:
0212                 tmpStat, tmpStr = self.communicator.send_dialog_messages(diagSpecs)
0213                 if tmpStat:
0214                     diagIDs = [diagSpec.diagID for diagSpec in diagSpecs]
0215                     self.dbProxy.delete_dialog_messages(diagIDs)
0216                     mainLog.debug(f"sent {len(diagSpecs)} dialogs")
0217 
0218                 else:
0219                     mainLog.error(f"failed to send dialogs err={tmpStr}")
0220             if sw_main.get_elapsed_time_in_sec() > harvester_config.propagator.lockInterval:
0221                 mainLog.warning("a single cycle was longer than lockInterval. done" + sw_main.get_elapsed_time())
0222             else:
0223                 mainLog.debug("done" + sw_main.get_elapsed_time())
0224             # check if being terminated
0225             if self.terminated(harvester_config.propagator.sleepTime):
0226                 mainLog.debug("terminated")
0227                 return