Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 import collections
0002 import datetime
0003 import itertools
0004 import random
0005 import time
0006 
0007 from pandaharvester.harvesterbody.agent_base import AgentBase
0008 from pandaharvester.harvesterconfig import harvester_config
0009 from pandaharvester.harvestercore import core_utils
0010 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0011 from pandaharvester.harvestercore.fifos import MonitorEventFIFO, MonitorFIFO
0012 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0013 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0014 from pandaharvester.harvestercore.work_spec import WorkSpec
0015 from pandaharvester.harvestermisc.apfmon import Apfmon
0016 
0017 # logger
0018 _logger = core_utils.setup_logger("monitor")
0019 
0020 
0021 # propagate important checkpoints to panda
0022 class Monitor(AgentBase):
0023     # constructor
0024     def __init__(self, queue_config_mapper, single_mode=False):
0025         tmp_log = self.make_logger(_logger, method_name="__init__")
0026         AgentBase.__init__(self, single_mode)
0027         self.queueConfigMapper = queue_config_mapper
0028         self.dbProxy = DBProxy()
0029         self.pluginFactory = PluginFactory()
0030         self.startTimestamp = time.time()
0031         try:
0032             self.monitor_fifo = MonitorFIFO()
0033         except Exception:
0034             tmp_log.error("failed to launch monitor-fifo")
0035             core_utils.dump_error_message(tmp_log)
0036         if self.monitor_fifo.enabled:
0037             try:
0038                 self.monitor_event_fifo = MonitorEventFIFO()
0039             except Exception:
0040                 tmp_log.error("failed to launch monitor-event-fifo")
0041                 core_utils.dump_error_message(tmp_log)
0042         else:
0043             self.monitor_event_fifo = None
0044         self.apfmon = Apfmon(self.queueConfigMapper)
0045         self.eventBasedMonCoreList = []
0046         if getattr(harvester_config.monitor, "eventBasedEnable", False):
0047             for pluginConf in harvester_config.monitor.eventBasedPlugins:
0048                 pluginFactory = PluginFactory()
0049                 plugin_key = pluginFactory.get_plugin_key(pluginConf)
0050                 try:
0051                     self.eventBasedMonCoreList.append(pluginFactory.get_plugin(pluginConf))
0052                 except Exception:
0053                     tmp_log.error(f"failed to launch event-based-monitor plugin of {plugin_key}")
0054                     core_utils.dump_error_message(tmp_log)
0055 
0056     # main loop
0057     def run(self):
0058         lockedBy = f"monitor-{self.get_pid()}"
0059         mainLog = self.make_logger(_logger, f"id={lockedBy}", method_name="run")
0060         # init messengers
0061         for queueName, queueConfig in self.queueConfigMapper.get_all_queues().items():
0062             # just import for module initialization
0063             try:
0064                 self.pluginFactory.get_plugin(queueConfig.messenger)
0065             except Exception:
0066                 mainLog.error(f"failed to launch messenger plugin for {queueName}")
0067                 core_utils.dump_error_message(mainLog)
0068         # main
0069         fifoSleepTimeMilli = getattr(harvester_config.monitor, "fifoSleepTimeMilli", 5000)
0070         fifoCheckDuration = getattr(harvester_config.monitor, "fifoCheckDuration", 30)
0071         fifoMaxWorkersPerChunk = getattr(harvester_config.monitor, "fifoMaxWorkersPerChunk", 500)
0072         fifoProtectiveDequeue = getattr(harvester_config.monitor, "fifoProtectiveDequeue", True)
0073         eventBasedCheckInterval = getattr(harvester_config.monitor, "eventBasedCheckInterval", 300)
0074         eventBasedTimeWindow = getattr(harvester_config.monitor, "eventBasedTimeWindow", 450)
0075         eventBasedCheckMaxEvents = getattr(harvester_config.monitor, "eventBasedCheckMaxEvents", 500)
0076         eventBasedEventLifetime = getattr(harvester_config.monitor, "eventBasedEventLifetime", 1800)
0077         eventBasedRemoveMaxEvents = getattr(harvester_config.monitor, "eventBasedRemoveMaxEvents", 2000)
0078         last_DB_cycle_timestamp = 0
0079         last_event_delivery_timestamp = 0
0080         last_event_digest_timestamp = 0
0081         last_event_dispose_timestamp = 0
0082         monitor_fifo = self.monitor_fifo
0083         sleepTime = (fifoSleepTimeMilli / 1000.0) if monitor_fifo.enabled else harvester_config.monitor.sleepTime
0084         adjusted_sleepTime = sleepTime
0085         if monitor_fifo.enabled:
0086             monitor_fifo.restore()
0087         while True:
0088             sw_main = core_utils.get_stopwatch()
0089             mainLog.debug("start a monitor cycle")
0090             if time.time() >= last_DB_cycle_timestamp + harvester_config.monitor.sleepTime and not (monitor_fifo.enabled and self.singleMode):
0091                 # run with workers from DB
0092                 sw_db = core_utils.get_stopwatch()
0093                 mainLog.debug("starting run with DB")
0094                 mainLog.debug("getting workers to monitor")
0095                 workSpecsPerQueue = self.dbProxy.get_workers_to_update(
0096                     harvester_config.monitor.maxWorkers, harvester_config.monitor.checkInterval, harvester_config.monitor.lockInterval, lockedBy
0097                 )
0098                 mainLog.debug(f"got {len(workSpecsPerQueue)} queues")
0099                 # loop over all workers
0100                 for queueName, configIdWorkSpecs in workSpecsPerQueue.items():
0101                     for configID, workSpecsList in configIdWorkSpecs.items():
0102                         try:
0103                             retVal = self.monitor_agent_core(lockedBy, queueName, workSpecsList, config_id=configID, check_source="DB")
0104                         except Exception as e:
0105                             mainLog.error(f"monitor_agent_core excepted with {e}")
0106                             retVal = None  # skip the loop
0107 
0108                         if monitor_fifo.enabled and retVal is not None:
0109                             workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
0110                             if workSpecsToEnqueue:
0111                                 mainLog.debug("putting workers to FIFO")
0112                                 try:
0113                                     score = fifoCheckInterval + timeNow_timestamp
0114                                     monitor_fifo.put((queueName, workSpecsToEnqueue), score)
0115                                     mainLog.info(f"put workers of {queueName} to FIFO with score {score}")
0116                                 except Exception as errStr:
0117                                     mainLog.error(f"failed to put object from FIFO: {errStr}")
0118                             if workSpecsToEnqueueToHead:
0119                                 mainLog.debug("putting workers to FIFO head")
0120                                 try:
0121                                     score = fifoCheckInterval - timeNow_timestamp
0122                                     monitor_fifo.put((queueName, workSpecsToEnqueueToHead), score)
0123                                     mainLog.info(f"put workers of {queueName} to FIFO with score {score}")
0124                                 except Exception as errStr:
0125                                     mainLog.error(f"failed to put object from FIFO head: {errStr}")
0126                 last_DB_cycle_timestamp = time.time()
0127                 if sw_db.get_elapsed_time_in_sec() > harvester_config.monitor.lockInterval:
0128                     mainLog.warning("a single DB cycle was longer than lockInterval " + sw_db.get_elapsed_time())
0129                 else:
0130                     mainLog.debug("done a DB cycle" + sw_db.get_elapsed_time())
0131                 mainLog.debug("ended run with DB")
0132             elif monitor_fifo.enabled:
0133                 # with FIFO
0134                 sw = core_utils.get_stopwatch()
0135                 to_run_fifo_check = True
0136                 n_loops = 0
0137                 n_loops_hit = 0
0138                 last_fifo_cycle_timestamp = time.time()
0139                 to_break = False
0140                 obj_dequeued_id_list = []
0141                 obj_to_enqueue_dict = collections.defaultdict(lambda: [[], 0, 0])
0142                 obj_to_enqueue_to_head_dict = collections.defaultdict(lambda: [[], 0, 0])
0143                 remaining_obj_to_enqueue_dict = {}
0144                 remaining_obj_to_enqueue_to_head_dict = {}
0145                 n_chunk_peeked_stat, sum_overhead_time_stat = 0, 0.0
0146                 # go get workers
0147                 if self.monitor_event_fifo.enabled:
0148                     # run with workers reported from plugin (event-based check)
0149                     to_deliver = time.time() >= last_event_delivery_timestamp + eventBasedCheckInterval
0150                     to_digest = time.time() >= last_event_digest_timestamp + eventBasedCheckInterval / 4
0151                     to_dispose = time.time() >= last_event_dispose_timestamp + eventBasedCheckInterval / 2
0152                     if to_deliver:
0153                         # deliver events of worker update
0154                         got_lock = self.dbProxy.get_process_lock("monitor_event_deliverer", lockedBy, eventBasedCheckInterval)
0155                         if got_lock:
0156                             self.monitor_event_deliverer(time_window=eventBasedTimeWindow)
0157                         else:
0158                             mainLog.debug("did not get lock. Skip monitor_event_deliverer")
0159                         last_event_delivery_timestamp = time.time()
0160                     if to_digest:
0161                         # digest events of worker update
0162                         to_run_fifo_check = False
0163                         retMap = self.monitor_event_digester(locked_by=lockedBy, max_events=eventBasedCheckMaxEvents)
0164                         for qc_key, retVal in retMap.items():
0165                             workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
0166                             # only enqueue postprocessing workers to FIFO
0167                             obj_to_enqueue_to_head_dict[qc_key][0].extend(workSpecsToEnqueueToHead)
0168                             obj_to_enqueue_to_head_dict[qc_key][1] = max(obj_to_enqueue_to_head_dict[qc_key][1], timeNow_timestamp)
0169                             obj_to_enqueue_to_head_dict[qc_key][2] = max(obj_to_enqueue_to_head_dict[qc_key][2], fifoCheckInterval)
0170                         last_event_digest_timestamp = time.time()
0171                     if to_dispose:
0172                         # dispose of outdated events of worker update
0173                         self.monitor_event_disposer(event_lifetime=eventBasedEventLifetime, max_events=eventBasedRemoveMaxEvents)
0174                         last_event_dispose_timestamp = time.time()
0175                 if to_run_fifo_check:
0176                     # run with workers from FIFO
0177                     while time.time() < last_fifo_cycle_timestamp + fifoCheckDuration:
0178                         sw.reset()
0179                         n_loops += 1
0180                         try:
0181                             retVal, overhead_time = monitor_fifo.to_check_workers()
0182                         except Exception as e:
0183                             mainLog.error(f"failed to check workers from FIFO: {e}")
0184                         if overhead_time is not None:
0185                             n_chunk_peeked_stat += 1
0186                             sum_overhead_time_stat += overhead_time
0187                         if retVal:
0188                             # check fifo size
0189                             try:
0190                                 fifo_size = monitor_fifo.size()
0191                                 mainLog.debug(f"FIFO size is {fifo_size}")
0192                             except Exception as e:
0193                                 mainLog.error(f"failed to get size of FIFO: {e}")
0194                                 time.sleep(2)
0195                                 continue
0196                             mainLog.debug("starting run with FIFO")
0197                             try:
0198                                 obj_gotten = monitor_fifo.get(timeout=1, protective=fifoProtectiveDequeue)
0199                             except Exception as errStr:
0200                                 mainLog.error(f"failed to get object from FIFO: {errStr}")
0201                                 time.sleep(2)
0202                                 continue
0203                             else:
0204                                 if obj_gotten is not None:
0205                                     sw_fifo = core_utils.get_stopwatch()
0206                                     if fifoProtectiveDequeue:
0207                                         obj_dequeued_id_list.append(obj_gotten.id)
0208                                     queueName, workSpecsList = obj_gotten.item
0209                                     mainLog.debug(f"got a chunk of {len(workSpecsList)} workers of {queueName} from FIFO" + sw.get_elapsed_time())
0210                                     sw.reset()
0211                                     configID = None
0212                                     for workSpecs in workSpecsList:
0213                                         if configID is None and len(workSpecs) > 0:
0214                                             configID = workSpecs[0].configID
0215                                         for workSpec in workSpecs:
0216                                             if workSpec.pandaid_list is None:
0217                                                 _jobspec_list = workSpec.get_jobspec_list()
0218                                                 if _jobspec_list is not None:
0219                                                     workSpec.pandaid_list = [j.PandaID for j in workSpec.get_jobspec_list()]
0220                                                 else:
0221                                                     workSpec.pandaid_list = []
0222                                                 workSpec.force_update("pandaid_list")
0223                                     try:
0224                                         retVal = self.monitor_agent_core(
0225                                             lockedBy, queueName, workSpecsList, from_fifo=True, config_id=configID, check_source="FIFO"
0226                                         )
0227                                     except Exception as e:
0228                                         mainLog.error(f"monitor_agent_core excepted with {e}")
0229                                         retVal = None  # skip the loop
0230 
0231                                     if retVal is not None:
0232                                         workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
0233                                         qc_key = (queueName, configID)
0234                                         try:
0235                                             if len(obj_to_enqueue_dict[qc_key][0]) + len(workSpecsToEnqueue) <= fifoMaxWorkersPerChunk:
0236                                                 obj_to_enqueue_dict[qc_key][0].extend(workSpecsToEnqueue)
0237                                                 obj_to_enqueue_dict[qc_key][1] = max(obj_to_enqueue_dict[qc_key][1], timeNow_timestamp)
0238                                                 obj_to_enqueue_dict[qc_key][2] = max(obj_to_enqueue_dict[qc_key][2], fifoCheckInterval)
0239                                             else:
0240                                                 to_break = True
0241                                                 remaining_obj_to_enqueue_dict[qc_key] = [workSpecsToEnqueue, timeNow_timestamp, fifoCheckInterval]
0242                                         except Exception as errStr:
0243                                             mainLog.error(f"failed to gather workers for FIFO: {errStr}")
0244                                             to_break = True
0245                                         try:
0246                                             if len(obj_to_enqueue_to_head_dict[qc_key][0]) + len(workSpecsToEnqueueToHead) <= fifoMaxWorkersPerChunk:
0247                                                 obj_to_enqueue_to_head_dict[qc_key][0].extend(workSpecsToEnqueueToHead)
0248                                                 obj_to_enqueue_to_head_dict[qc_key][1] = max(obj_to_enqueue_to_head_dict[qc_key][1], timeNow_timestamp)
0249                                                 obj_to_enqueue_to_head_dict[qc_key][2] = max(obj_to_enqueue_to_head_dict[qc_key][2], fifoCheckInterval)
0250                                             else:
0251                                                 to_break = True
0252                                                 remaining_obj_to_enqueue_to_head_dict[qc_key] = [workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval]
0253                                         except Exception as errStr:
0254                                             mainLog.error(f"failed to gather workers for FIFO head: {errStr}")
0255                                             to_break = True
0256                                         mainLog.debug(f"checked {len(workSpecsList)} workers from FIFO" + sw.get_elapsed_time())
0257                                     else:
0258                                         mainLog.debug("monitor_agent_core returned None. Skipped putting to FIFO")
0259                                     if sw_fifo.get_elapsed_time_in_sec() > harvester_config.monitor.lockInterval:
0260                                         mainLog.warning("a single FIFO cycle was longer than lockInterval " + sw_fifo.get_elapsed_time())
0261                                     else:
0262                                         mainLog.debug("done a FIFO cycle" + sw_fifo.get_elapsed_time())
0263                                         n_loops_hit += 1
0264                                     if to_break:
0265                                         break
0266                                 else:
0267                                     mainLog.debug("got nothing in FIFO")
0268                         else:
0269                             mainLog.debug("workers in FIFO too young to check. Skipped")
0270                             if self.singleMode:
0271                                 break
0272                             if overhead_time is not None:
0273                                 time.sleep(max(-overhead_time * random.uniform(0.1, 1), adjusted_sleepTime))
0274                             else:
0275                                 time.sleep(max(fifoCheckDuration * random.uniform(0.1, 1), adjusted_sleepTime))
0276                     mainLog.debug(f"run {n_loops} loops, including {n_loops_hit} FIFO cycles")
0277                 # enqueue to fifo
0278                 sw.reset()
0279                 n_chunk_put = 0
0280                 mainLog.debug("putting worker chunks to FIFO")
0281                 for _dct in (obj_to_enqueue_dict, remaining_obj_to_enqueue_dict):
0282                     for (queueName, configID), obj_to_enqueue in _dct.items():
0283                         try:
0284                             workSpecsToEnqueue, timeNow_timestamp, fifoCheckInterval = obj_to_enqueue
0285                             if workSpecsToEnqueue:
0286                                 score = fifoCheckInterval + timeNow_timestamp
0287                                 monitor_fifo.put((queueName, workSpecsToEnqueue), score)
0288                                 n_chunk_put += 1
0289                                 mainLog.info(f"put a chunk of {len(workSpecsToEnqueue)} workers of {queueName} to FIFO with score {score}")
0290                         except Exception as errStr:
0291                             mainLog.error(f"failed to put object from FIFO: {errStr}")
0292                 mainLog.debug("putting worker chunks to FIFO head")
0293                 for _dct in (obj_to_enqueue_to_head_dict, remaining_obj_to_enqueue_to_head_dict):
0294                     for (queueName, configID), obj_to_enqueue_to_head in _dct.items():
0295                         try:
0296                             workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = obj_to_enqueue_to_head
0297                             if workSpecsToEnqueueToHead:
0298                                 score = fifoCheckInterval + timeNow_timestamp - 2**32
0299                                 monitor_fifo.put((queueName, workSpecsToEnqueueToHead), score)
0300                                 n_chunk_put += 1
0301                                 mainLog.info(f"put a chunk of {len(workSpecsToEnqueueToHead)} workers of {queueName} to FIFO with score {score}")
0302                         except Exception as errStr:
0303                             mainLog.error(f"failed to put object from FIFO head: {errStr}")
0304                 # delete protective dequeued objects
0305                 if fifoProtectiveDequeue and len(obj_dequeued_id_list) > 0:
0306                     try:
0307                         monitor_fifo.delete(ids=obj_dequeued_id_list)
0308                     except Exception as e:
0309                         mainLog.error(f"failed to delete object from FIFO: {e}")
0310                 mainLog.debug(f"put {n_chunk_put} worker chunks into FIFO" + sw.get_elapsed_time())
0311                 # adjust adjusted_sleepTime
0312                 if n_chunk_peeked_stat > 0 and sum_overhead_time_stat > sleepTime:
0313                     speedup_factor = (sum_overhead_time_stat - sleepTime) / (n_chunk_peeked_stat * harvester_config.monitor.checkInterval)
0314                     speedup_factor = max(speedup_factor, 0)
0315                     adjusted_sleepTime = adjusted_sleepTime / (1.0 + speedup_factor)
0316                 elif n_chunk_peeked_stat == 0 or sum_overhead_time_stat < 0:
0317                     adjusted_sleepTime = (sleepTime + adjusted_sleepTime) / 2
0318                 mainLog.debug(f"adjusted_sleepTime becomes {adjusted_sleepTime:.3f} sec")
0319                 # end run with fifo
0320                 mainLog.debug("ended run with FIFO")
0321             # time the cycle
0322             mainLog.debug("done a monitor cycle" + sw_main.get_elapsed_time())
0323             # check if being terminated
0324             if self.terminated(adjusted_sleepTime):
0325                 mainLog.debug("terminated")
0326                 return
0327 
0328     # core of monitor agent to check workers in workSpecsList of queueName
0329     def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False, config_id=None, check_source=None):
0330         tmpQueLog = self.make_logger(_logger, f"id={lockedBy} queue={queueName}", method_name="run")
0331         # check queue
0332         if not self.queueConfigMapper.has_queue(queueName, config_id):
0333             tmpQueLog.error("config not found")
0334             return None
0335         # get queue
0336         queueConfig = self.queueConfigMapper.get_queue(queueName, config_id)
0337         # get plugins
0338         monCore = self.pluginFactory.get_plugin(queueConfig.monitor)
0339         messenger = self.pluginFactory.get_plugin(queueConfig.messenger)
0340         # workspec chunk of active workers
0341         workSpecsToEnqueue_dict = {}
0342         workSpecsToEnqueueToHead_dict = {}
0343         timeNow_timestamp = time.time()
0344         # get fifoCheckInterval for PQ and other fifo attributes
0345         try:
0346             fifoCheckInterval = monCore.fifoCheckInterval
0347         except Exception:
0348             if hasattr(harvester_config.monitor, "fifoCheckInterval"):
0349                 fifoCheckInterval = harvester_config.monitor.fifoCheckInterval
0350             else:
0351                 fifoCheckInterval = harvester_config.monitor.checkInterval
0352         try:
0353             forceEnqueueInterval = harvester_config.monitor.fifoForceEnqueueInterval
0354         except AttributeError:
0355             forceEnqueueInterval = 3600
0356         try:
0357             fifoMaxPreemptInterval = harvester_config.monitor.fifoMaxPreemptInterval
0358         except AttributeError:
0359             fifoMaxPreemptInterval = 60
0360         # check workers
0361         allWorkers = [item for sublist in workSpecsList for item in sublist]
0362         tmpQueLog.debug(f"checking {len(allWorkers)} workers")
0363         tmpStat, tmpRetMap = self.check_workers(monCore, messenger, allWorkers, queueConfig, tmpQueLog, from_fifo)
0364         if tmpStat:
0365             # loop over all worker chunks
0366             tmpQueLog.debug("update jobs and workers")
0367             iWorker = 0
0368             for workSpecs in workSpecsList:
0369                 jobSpecs = None
0370                 pandaIDsList = []
0371                 eventsToUpdateList = []
0372                 filesToStageOutList = dict()
0373                 isCheckedList = []
0374                 mapType = workSpecs[0].mapType
0375                 # loop over workSpecs
0376                 for workSpec in workSpecs:
0377                     tmpLog = self.make_logger(_logger, f"id={lockedBy} workerID={workSpec.workerID} from={check_source}", method_name="run")
0378                     tmpOut = tmpRetMap[workSpec.workerID]
0379                     oldStatus = tmpOut["oldStatus"]
0380                     newStatus = tmpOut["newStatus"]
0381                     monStatus = tmpOut["monStatus"]
0382                     diagMessage = tmpOut["diagMessage"]
0383                     workAttributes = tmpOut["workAttributes"]
0384                     eventsToUpdate = tmpOut["eventsToUpdate"]
0385                     filesToStageOut = tmpOut["filesToStageOut"]
0386                     eventsRequestParams = tmpOut["eventsRequestParams"]
0387                     nJobsToReFill = tmpOut["nJobsToReFill"]
0388                     pandaIDs = tmpOut["pandaIDs"]
0389                     isChecked = tmpOut["isChecked"]
0390                     tmpStr = "newStatus={0} monitoredStatus={1} diag={2} "
0391                     tmpStr += "postProcessed={3} files={4}"
0392                     tmpLog.debug(tmpStr.format(newStatus, monStatus, diagMessage, workSpec.is_post_processed(), str(filesToStageOut)))
0393                     iWorker += 1
0394                     # check status
0395                     if newStatus not in WorkSpec.ST_LIST:
0396                         tmpLog.error(f"unknown status={newStatus}")
0397                         return
0398                     # update worker
0399                     workSpec.set_status(newStatus)
0400                     workSpec.set_work_attributes(workAttributes)
0401                     workSpec.set_dialog_message(diagMessage)
0402                     if isChecked:
0403                         workSpec.checkTime = core_utils.naive_utcnow()
0404                     isCheckedList.append(isChecked)
0405                     if monStatus == WorkSpec.ST_failed:
0406                         if not workSpec.has_pilot_error() and workSpec.errorCode is None:
0407                             workSpec.set_pilot_error(PilotErrors.GENERALERROR, diagMessage)
0408                     elif monStatus == WorkSpec.ST_cancelled:
0409                         if not workSpec.has_pilot_error() and workSpec.errorCode is None:
0410                             workSpec.set_pilot_error(PilotErrors.PANDAKILL, diagMessage)
0411                     if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]:
0412                         workSpec.set_work_params({"finalMonStatus": monStatus})
0413                     # request events
0414                     if eventsRequestParams != {}:
0415                         workSpec.eventsRequest = WorkSpec.EV_requestEvents
0416                         workSpec.eventsRequestParams = eventsRequestParams
0417                     # jobs to refill
0418                     if nJobsToReFill is not None:
0419                         workSpec.nJobsToReFill = nJobsToReFill
0420                     # get associated jobs for the worker chunk
0421                     if workSpec.hasJob == 1 and jobSpecs is None:
0422                         jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, only_running=True, slim=True)
0423                     # pandaIDs for push
0424                     pandaIDsList.append(pandaIDs)
0425                     if len(eventsToUpdate) > 0:
0426                         eventsToUpdateList.append(eventsToUpdate)
0427                     if len(filesToStageOut) > 0:
0428                         filesToStageOutList[workSpec.workerID] = filesToStageOut
0429                     # apfmon status update
0430                     if newStatus != oldStatus:
0431                         tmpQueLog.debug(f"newStatus: {newStatus} monStatus: {monStatus} oldStatus: {oldStatus} workSpecStatus: {workSpec.status}")
0432                         self.apfmon.update_worker(workSpec, monStatus)
0433 
0434                 # lock workers for fifo
0435                 if from_fifo:
0436                     # collect some attributes to be updated when workers are locked
0437                     worker_id_list = dict()
0438                     for workSpec, isChecked in zip(workSpecs, isCheckedList):
0439                         attrs = dict()
0440                         if isChecked:
0441                             attrs["checkTime"] = workSpec.checkTime
0442                             workSpec.force_not_update("checkTime")
0443                         if workSpec.has_updated_attributes():
0444                             attrs["lockedBy"] = lockedBy
0445                             workSpec.lockedBy = lockedBy
0446                             workSpec.force_not_update("lockedBy")
0447                         else:
0448                             attrs["lockedBy"] = None
0449                         worker_id_list[workSpec.workerID] = attrs
0450                     temRetLockWorker = self.dbProxy.lock_workers(worker_id_list, harvester_config.monitor.lockInterval)
0451                     # skip if not locked
0452                     if not temRetLockWorker:
0453                         continue
0454                 # update jobs and workers
0455                 if jobSpecs is not None and len(jobSpecs) > 0:
0456                     tmpQueLog.debug(f"updating {len(jobSpecs)} jobs with {len(workSpecs)} workers")
0457                     core_utils.update_job_attributes_with_workers(mapType, jobSpecs, workSpecs, filesToStageOutList, eventsToUpdateList)
0458                 # update local database
0459                 tmpRet = self.dbProxy.update_jobs_workers(jobSpecs, workSpecs, lockedBy, pandaIDsList)
0460                 if not tmpRet:
0461                     for workSpec in workSpecs:
0462                         tmpLog = self.make_logger(_logger, f"id={lockedBy} workerID={workSpec.workerID}", method_name="run")
0463                         if from_fifo:
0464                             tmpLog.info("failed to update the DB. Maybe locked by other thread running with DB")
0465                         else:
0466                             if workSpec.status in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled, WorkSpec.ST_missed]:
0467                                 tmpLog.info("worker already in final status. Skipped")
0468                             else:
0469                                 tmpLog.error("failed to update the DB. lockInterval may be too short")
0470                 else:
0471                     if jobSpecs is not None:
0472                         for jobSpec in jobSpecs:
0473                             tmpLog = self.make_logger(_logger, f"id={lockedBy} PandaID={jobSpec.PandaID}", method_name="run")
0474                             tmpLog.debug(
0475                                 f"new status={jobSpec.status} subStatus={jobSpec.subStatus} status_in_metadata={jobSpec.get_job_status_from_attributes()}"
0476                             )
0477                 # send ACK to workers for events and files
0478                 if len(eventsToUpdateList) > 0 or len(filesToStageOutList) > 0:
0479                     for workSpec in workSpecs:
0480                         try:
0481                             messenger.acknowledge_events_files(workSpec)
0482                         except Exception:
0483                             core_utils.dump_error_message(tmpQueLog)
0484                             tmpQueLog.error(f"failed to send ACK to workerID={workSpec.workerID}")
0485                 # active workers for fifo
0486                 if self.monitor_fifo.enabled and workSpecs:
0487                     workSpec = workSpecs[0]
0488                     tmpOut = tmpRetMap[workSpec.workerID]
0489                     newStatus = tmpOut["newStatus"]
0490                     monStatus = tmpOut["monStatus"]
0491                     if (
0492                         newStatus in [WorkSpec.ST_submitted, WorkSpec.ST_running, WorkSpec.ST_idle]
0493                         and workSpec.mapType != WorkSpec.MT_MultiWorkers
0494                         and workSpec.workAttributes is not None
0495                     ):
0496                         timeNow = core_utils.naive_utcnow()
0497                         timeNow_timestamp = time.time()
0498                         # get lastCheckAt
0499                         _bool, lastCheckAt = workSpec.get_work_params("lastCheckAt")
0500                         try:
0501                             last_check_period = timeNow_timestamp - lastCheckAt
0502                         except TypeError:
0503                             last_check_period = forceEnqueueInterval + 1.0
0504                         # get lastForceEnqueueAt
0505                         _bool, lastForceEnqueueAt = workSpec.get_work_params("lastForceEnqueueAt")
0506                         if not (_bool and lastForceEnqueueAt is not None):
0507                             lastForceEnqueueAt = 0
0508                         # notification
0509                         intolerable_delay = max(forceEnqueueInterval * 2, harvester_config.monitor.checkInterval * 4)
0510                         if (
0511                             _bool
0512                             and lastCheckAt is not None
0513                             and last_check_period > harvester_config.monitor.checkInterval
0514                             and timeNow_timestamp - harvester_config.monitor.checkInterval > self.startTimestamp
0515                         ):
0516                             if last_check_period > intolerable_delay:
0517                                 tmpQueLog.error(
0518                                     "last check period of workerID={0} is {1} sec, intolerably longer than monitor checkInterval. Will NOT enquque worker by force. Please check why monitor checks worker slowly".format(
0519                                         workSpec.workerID, last_check_period
0520                                     )
0521                                 )
0522                             else:
0523                                 tmpQueLog.warning(
0524                                     f"last check period of workerID={workSpec.workerID} is {last_check_period} sec, longer than monitor checkInterval"
0525                                 )
0526                         # preparation to enqueue fifo
0527                         if (from_fifo) or (
0528                             not from_fifo
0529                             and timeNow_timestamp - harvester_config.monitor.sleepTime > self.startTimestamp
0530                             and last_check_period > forceEnqueueInterval
0531                             and last_check_period < intolerable_delay
0532                             and timeNow_timestamp - lastForceEnqueueAt > 86400 + forceEnqueueInterval
0533                         ):
0534                             if not from_fifo:
0535                                 # in DB cycle
0536                                 tmpQueLog.warning(
0537                                     "last check period of workerID={0} is {1} sec, longer than monitor forceEnqueueInterval. Enqueue the worker by force".format(
0538                                         workSpec.workerID, last_check_period
0539                                     )
0540                                 )
0541                                 workSpec.set_work_params({"lastForceEnqueueAt": timeNow_timestamp})
0542                             workSpec.set_work_params({"lastCheckAt": timeNow_timestamp})
0543                             workSpec.lockedBy = None
0544                             workSpec.force_update("lockedBy")
0545                             if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]:
0546                                 # for post-processing
0547                                 _bool, startFifoPreemptAt = workSpec.get_work_params("startFifoPreemptAt")
0548                                 if not _bool or startFifoPreemptAt is None:
0549                                     startFifoPreemptAt = timeNow_timestamp
0550                                     workSpec.set_work_params({"startFifoPreemptAt": startFifoPreemptAt})
0551                                 tmpQueLog.debug(f"workerID={workSpec.workerID} , startFifoPreemptAt: {startFifoPreemptAt}")
0552                                 if timeNow_timestamp - startFifoPreemptAt < fifoMaxPreemptInterval:
0553                                     workSpecsToEnqueueToHead_dict[workSpec.workerID] = workSpecs
0554                                 else:
0555                                     workSpec.set_work_params({"startFifoPreemptAt": timeNow_timestamp})
0556                                     workSpec.modificationTime = timeNow
0557                                     workSpec.force_update("modificationTime")
0558                                     workSpecsToEnqueue_dict[workSpec.workerID] = workSpecs
0559                             else:
0560                                 workSpec.modificationTime = timeNow
0561                                 workSpec.force_update("modificationTime")
0562                                 workSpecsToEnqueue_dict[workSpec.workerID] = workSpecs
0563         else:
0564             tmpQueLog.error("failed to check workers")
0565         workSpecsToEnqueue = list(workSpecsToEnqueue_dict.values())
0566         workSpecsToEnqueueToHead = list(workSpecsToEnqueueToHead_dict.values())
0567         retVal = workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval
0568         tmpQueLog.debug("done")
0569         return retVal
0570 
0571     # wrapper for checkWorkers
0572     def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, from_fifo):
0573         # check timeout value
0574         try:
0575             checkTimeout = mon_core.checkTimeout
0576         except Exception:
0577             try:
0578                 checkTimeout = harvester_config.monitor.checkTimeout
0579             except Exception:
0580                 checkTimeout = None
0581         try:
0582             workerQueueTimeLimit = harvester_config.monitor.workerQueueTimeLimit
0583         except AttributeError:
0584             workerQueueTimeLimit = 172800
0585         workersToCheck = []
0586         thingsToPostProcess = []
0587         retMap = dict()
0588         for workSpec in all_workers:
0589             eventsRequestParams = {}
0590             eventsToUpdate = []
0591             pandaIDs = []
0592             workStatus = None
0593             workAttributes = None
0594             filesToStageOut = []
0595             nJobsToReFill = None
0596             if workSpec.has_work_params("finalMonStatus"):
0597                 # to post-process
0598                 _bool, finalMonStatus = workSpec.get_work_params("finalMonStatus")
0599                 _thing = (workSpec, (finalMonStatus, ""))
0600                 thingsToPostProcess.append(_thing)
0601             else:
0602                 # job-level late binding
0603                 if workSpec.hasJob == 0 and workSpec.mapType != WorkSpec.MT_NoJob:
0604                     # check if job is requested
0605                     jobRequested = messenger.job_requested(workSpec)
0606                     if jobRequested:
0607                         # set ready when job is requested
0608                         workStatus = WorkSpec.ST_ready
0609                     else:
0610                         workStatus = workSpec.status
0611                 elif workSpec.nJobsToReFill in [0, None]:
0612                     # check if job is requested to refill free slots
0613                     jobRequested = messenger.job_requested(workSpec)
0614                     if jobRequested:
0615                         nJobsToReFill = jobRequested
0616                     workersToCheck.append(workSpec)
0617                 else:
0618                     workersToCheck.append(workSpec)
0619             # add
0620             retMap[workSpec.workerID] = {
0621                 "oldStatus": workSpec.status,
0622                 "newStatus": workStatus,
0623                 "monStatus": workStatus,
0624                 "workAttributes": workAttributes,
0625                 "filesToStageOut": filesToStageOut,
0626                 "eventsRequestParams": eventsRequestParams,
0627                 "eventsToUpdate": eventsToUpdate,
0628                 "diagMessage": "",
0629                 "pandaIDs": pandaIDs,
0630                 "nJobsToReFill": nJobsToReFill,
0631                 "isChecked": True,
0632             }
0633         # check workers
0634         tmp_log.debug("checking workers with plugin")
0635         try:
0636             if workersToCheck:
0637                 tmpStat, tmpOut = mon_core.check_workers(workersToCheck)
0638                 if not tmpStat:
0639                     tmp_log.error(f"failed to check workers with: {tmpOut}")
0640                     workersToCheck = []
0641                     tmpOut = []
0642                 else:
0643                     tmp_log.debug("checked")
0644             else:
0645                 tmp_log.debug("Nothing to be checked with plugin")
0646                 tmpOut = []
0647             timeNow = core_utils.naive_utcnow()
0648             for workSpec, (newStatus, diagMessage) in itertools.chain(zip(workersToCheck, tmpOut), thingsToPostProcess):
0649                 workerID = workSpec.workerID
0650                 tmp_log.debug(f"Going to check workerID={workerID}")
0651                 pandaIDs = []
0652                 if workerID in retMap:
0653                     # failed to check status
0654                     if newStatus is None:
0655                         tmp_log.warning(f"Failed to check workerID={workerID} with {diagMessage}")
0656                         retMap[workerID]["isChecked"] = False
0657                         # set status
0658                         if (
0659                             workSpec.checkTime is not None
0660                             and checkTimeout is not None
0661                             and timeNow - workSpec.checkTime > datetime.timedelta(seconds=checkTimeout)
0662                         ):
0663                             # kill due to timeout
0664                             tmp_log.debug(f"kill workerID={workerID} due to consecutive check failures")
0665                             self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
0666                             newStatus = WorkSpec.ST_cancelled
0667                             diagMessage = "Killed by Harvester due to consecutive worker check failures. " + diagMessage
0668                             workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
0669                         else:
0670                             # use original status
0671                             newStatus = workSpec.status
0672                     # request kill
0673                     if messenger.kill_requested(workSpec):
0674                         tmp_log.debug(f"kill workerID={workerID} as requested")
0675                         self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
0676                     # stuck queuing for too long
0677                     if workSpec.status == WorkSpec.ST_submitted and timeNow > workSpec.submitTime + datetime.timedelta(seconds=workerQueueTimeLimit):
0678                         tmp_log.debug(f"kill workerID={workerID} due to queuing longer than {workerQueueTimeLimit} seconds")
0679                         self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
0680                         diagMessage = "Killed by Harvester due to worker queuing too long. " + diagMessage
0681                         workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
0682                         # set closed
0683                         workSpec.set_pilot_closed()
0684                     # expired heartbeat - only when requested in the configuration
0685                     try:
0686                         # check if the queue configuration requires checking for worker heartbeat
0687                         worker_heartbeat_limit = int(queue_config.messenger["worker_heartbeat"])
0688                     except (AttributeError, KeyError):
0689                         worker_heartbeat_limit = None
0690                     tmp_log.debug(f"workerID={workerID} heartbeat limit is configured to {worker_heartbeat_limit}")
0691                     if worker_heartbeat_limit:
0692                         if messenger.is_alive(workSpec, worker_heartbeat_limit):
0693                             tmp_log.debug(f"heartbeat for workerID={workerID} is valid")
0694                         else:
0695                             tmp_log.debug(f"heartbeat for workerID={workerID} expired: sending kill request")
0696                             self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
0697                             diagMessage = "Killed by Harvester due to worker heartbeat expired. " + diagMessage
0698                             workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
0699                     # get work attributes
0700                     workAttributes = messenger.get_work_attributes(workSpec)
0701                     retMap[workerID]["workAttributes"] = workAttributes
0702                     # get output files
0703                     filesToStageOut = messenger.get_files_to_stage_out(workSpec)
0704                     retMap[workerID]["filesToStageOut"] = filesToStageOut
0705                     # get events to update
0706                     if workSpec.eventsRequest in [WorkSpec.EV_useEvents, WorkSpec.EV_requestEvents]:
0707                         eventsToUpdate = messenger.events_to_update(workSpec)
0708                         retMap[workerID]["eventsToUpdate"] = eventsToUpdate
0709                     # request events
0710                     if workSpec.eventsRequest == WorkSpec.EV_useEvents:
0711                         eventsRequestParams = messenger.events_requested(workSpec)
0712                         retMap[workerID]["eventsRequestParams"] = eventsRequestParams
0713                     # get PandaIDs for pull model
0714                     if workSpec.mapType == WorkSpec.MT_NoJob:
0715                         pandaIDs = messenger.get_panda_ids(workSpec)
0716                     retMap[workerID]["pandaIDs"] = pandaIDs
0717                     # keep original new status
0718                     retMap[workerID]["monStatus"] = newStatus
0719                     # set running or idle while there are events to update or files to stage out
0720                     if newStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]:
0721                         isOK = True
0722                         if len(retMap[workerID]["filesToStageOut"]) > 0 or len(retMap[workerID]["eventsToUpdate"]) > 0:
0723                             if workSpec.status == WorkSpec.ST_running:
0724                                 newStatus = WorkSpec.ST_running
0725                             else:
0726                                 newStatus = WorkSpec.ST_idle
0727                         elif not workSpec.is_post_processed():
0728                             if (not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot) or (
0729                                 hasattr(messenger, "forcePostProcessing") and messenger.forcePostProcessing
0730                             ):
0731                                 # post processing unless heartbeat is suppressed
0732                                 jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, True, only_running=True, slim=True)
0733                                 # post processing
0734                                 tmpStat = messenger.post_processing(workSpec, jobSpecs, workSpec.mapType)
0735                                 if tmpStat is None:
0736                                     # retry
0737                                     ppTimeOut = getattr(harvester_config.monitor, "postProcessTimeout", 0)
0738                                     if ppTimeOut > 0:
0739                                         timeLimit = core_utils.naive_utcnow() - datetime.timedelta(minutes=ppTimeOut)
0740                                         if workSpec.endTime is None or workSpec.endTime > timeLimit:
0741                                             isOK = False
0742                                             # set end time just in case for timeout
0743                                             workSpec.set_end_time()
0744                             if isOK:
0745                                 workSpec.post_processed()
0746                             if workSpec.status == WorkSpec.ST_running:
0747                                 newStatus = WorkSpec.ST_running
0748                             else:
0749                                 newStatus = WorkSpec.ST_idle
0750                         # reset modification time to immediately trigger subsequent lookup
0751                         if isOK and not self.monitor_fifo.enabled:
0752                             workSpec.trigger_next_lookup()
0753                     retMap[workerID]["newStatus"] = newStatus
0754                     retMap[workerID]["diagMessage"] = diagMessage
0755                 else:
0756                     tmp_log.debug(f"workerID={workerID} not in retMap")
0757             return True, retMap
0758         except Exception:
0759             core_utils.dump_error_message(tmp_log)
0760             return False, None
0761 
0762     # ask plugin for workers to update, get workspecs, and queue the event
0763     def monitor_event_deliverer(self, time_window):
0764         tmpLog = self.make_logger(_logger, f"id=monitor-{self.get_pid()}", method_name="monitor_event_deliverer")
0765         tmpLog.debug("start")
0766         for mon_core in self.eventBasedMonCoreList:
0767             tmpLog.debug(f"run with {mon_core.__class__.__name__}")
0768             worker_update_list = mon_core.report_updated_workers(time_window=time_window)
0769             for workerID, updateTimestamp in worker_update_list:
0770                 retVal = self.monitor_event_fifo.putbyid(id=workerID, item=True, score=updateTimestamp)
0771                 if not retVal:
0772                     retVal = self.monitor_event_fifo.update(id=workerID, score=updateTimestamp, temporary=0, cond_score="gt")
0773                     if retVal:
0774                         tmpLog.debug(f"updated event with workerID={workerID}")
0775                     else:
0776                         tmpLog.debug(f"event with workerID={workerID} is updated. Skipped")
0777                 else:
0778                     tmpLog.debug(f"put event with workerID={workerID}")
0779         tmpLog.debug("done")
0780 
0781     # get events and check workers
0782     def monitor_event_digester(self, locked_by, max_events):
0783         tmpLog = self.make_logger(_logger, f"id=monitor-{self.get_pid()}", method_name="monitor_event_digester")
0784         tmpLog.debug("start")
0785         retMap = {}
0786         try:
0787             obj_gotten_list = self.monitor_event_fifo.getmany(mode="first", count=max_events, protective=True)
0788         except Exception as e:
0789             obj_gotten_list = []
0790             tmpLog.error(f"monitor_event_fifo excepted with {e}")
0791         workerID_list = [obj_gotten.id for obj_gotten in obj_gotten_list]
0792         tmpLog.debug(f"got {len(workerID_list)} worker events")
0793         if len(workerID_list) > 0:
0794             updated_workers_dict = self.dbProxy.get_workers_from_ids(workerID_list)
0795             tmpLog.debug("got workspecs for worker events")
0796             for queueName, _val in updated_workers_dict.items():
0797                 for configID, workSpecsList in _val.items():
0798                     qc_key = (queueName, configID)
0799                     tmpLog.debug("checking workers of queueName={0} configID={1}".format(*qc_key))
0800                     try:
0801                         retVal = self.monitor_agent_core(locked_by, queueName, workSpecsList, from_fifo=True, config_id=configID, check_source="Event")
0802                     except Exception as e:
0803                         tmpLog.error(f"monitor_agent_core excepted with {e}")
0804                         retVal = None  # skip the loop
0805 
0806                     if retVal:
0807                         retMap[qc_key] = retVal
0808         tmpLog.debug("done")
0809         return retMap
0810 
0811     # remove outdated events
0812     def monitor_event_disposer(self, event_lifetime, max_events):
0813         tmpLog = self.make_logger(_logger, f"id=monitor-{self.get_pid()}", method_name="monitor_event_disposer")
0814         tmpLog.debug("start")
0815         timeNow_timestamp = time.time()
0816         try:
0817             obj_gotten_list = self.monitor_event_fifo.getmany(mode="first", maxscore=(timeNow_timestamp - event_lifetime), count=max_events, temporary=True)
0818         except Exception as e:
0819             obj_gotten_list = []
0820             tmpLog.error(f"monitor_event_fifo excepted with {e}")
0821         tmpLog.debug(f"removed {len(obj_gotten_list)} events")
0822         try:
0823             n_events = self.monitor_event_fifo.size()
0824             tmpLog.debug(f"now {n_events} events in monitor-event fifo")
0825         except Exception as e:
0826             tmpLog.error(f"failed to get size of monitor-event fifo: {e}")
0827         tmpLog.debug("done")