File indexing completed on 2026-04-20 07:58:56
0001 import collections
0002 import datetime
0003 import itertools
0004 import random
0005 import time
0006
0007 from pandaharvester.harvesterbody.agent_base import AgentBase
0008 from pandaharvester.harvesterconfig import harvester_config
0009 from pandaharvester.harvestercore import core_utils
0010 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0011 from pandaharvester.harvestercore.fifos import MonitorEventFIFO, MonitorFIFO
0012 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0013 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0014 from pandaharvester.harvestercore.work_spec import WorkSpec
0015 from pandaharvester.harvestermisc.apfmon import Apfmon
0016
0017
0018 _logger = core_utils.setup_logger("monitor")
0019
0020
0021
0022 class Monitor(AgentBase):
0023
0024 def __init__(self, queue_config_mapper, single_mode=False):
0025 tmp_log = self.make_logger(_logger, method_name="__init__")
0026 AgentBase.__init__(self, single_mode)
0027 self.queueConfigMapper = queue_config_mapper
0028 self.dbProxy = DBProxy()
0029 self.pluginFactory = PluginFactory()
0030 self.startTimestamp = time.time()
0031 try:
0032 self.monitor_fifo = MonitorFIFO()
0033 except Exception:
0034 tmp_log.error("failed to launch monitor-fifo")
0035 core_utils.dump_error_message(tmp_log)
0036 if self.monitor_fifo.enabled:
0037 try:
0038 self.monitor_event_fifo = MonitorEventFIFO()
0039 except Exception:
0040 tmp_log.error("failed to launch monitor-event-fifo")
0041 core_utils.dump_error_message(tmp_log)
0042 else:
0043 self.monitor_event_fifo = None
0044 self.apfmon = Apfmon(self.queueConfigMapper)
0045 self.eventBasedMonCoreList = []
0046 if getattr(harvester_config.monitor, "eventBasedEnable", False):
0047 for pluginConf in harvester_config.monitor.eventBasedPlugins:
0048 pluginFactory = PluginFactory()
0049 plugin_key = pluginFactory.get_plugin_key(pluginConf)
0050 try:
0051 self.eventBasedMonCoreList.append(pluginFactory.get_plugin(pluginConf))
0052 except Exception:
0053 tmp_log.error(f"failed to launch event-based-monitor plugin of {plugin_key}")
0054 core_utils.dump_error_message(tmp_log)
0055
0056
0057 def run(self):
0058 lockedBy = f"monitor-{self.get_pid()}"
0059 mainLog = self.make_logger(_logger, f"id={lockedBy}", method_name="run")
0060
0061 for queueName, queueConfig in self.queueConfigMapper.get_all_queues().items():
0062
0063 try:
0064 self.pluginFactory.get_plugin(queueConfig.messenger)
0065 except Exception:
0066 mainLog.error(f"failed to launch messenger plugin for {queueName}")
0067 core_utils.dump_error_message(mainLog)
0068
0069 fifoSleepTimeMilli = getattr(harvester_config.monitor, "fifoSleepTimeMilli", 5000)
0070 fifoCheckDuration = getattr(harvester_config.monitor, "fifoCheckDuration", 30)
0071 fifoMaxWorkersPerChunk = getattr(harvester_config.monitor, "fifoMaxWorkersPerChunk", 500)
0072 fifoProtectiveDequeue = getattr(harvester_config.monitor, "fifoProtectiveDequeue", True)
0073 eventBasedCheckInterval = getattr(harvester_config.monitor, "eventBasedCheckInterval", 300)
0074 eventBasedTimeWindow = getattr(harvester_config.monitor, "eventBasedTimeWindow", 450)
0075 eventBasedCheckMaxEvents = getattr(harvester_config.monitor, "eventBasedCheckMaxEvents", 500)
0076 eventBasedEventLifetime = getattr(harvester_config.monitor, "eventBasedEventLifetime", 1800)
0077 eventBasedRemoveMaxEvents = getattr(harvester_config.monitor, "eventBasedRemoveMaxEvents", 2000)
0078 last_DB_cycle_timestamp = 0
0079 last_event_delivery_timestamp = 0
0080 last_event_digest_timestamp = 0
0081 last_event_dispose_timestamp = 0
0082 monitor_fifo = self.monitor_fifo
0083 sleepTime = (fifoSleepTimeMilli / 1000.0) if monitor_fifo.enabled else harvester_config.monitor.sleepTime
0084 adjusted_sleepTime = sleepTime
0085 if monitor_fifo.enabled:
0086 monitor_fifo.restore()
0087 while True:
0088 sw_main = core_utils.get_stopwatch()
0089 mainLog.debug("start a monitor cycle")
0090 if time.time() >= last_DB_cycle_timestamp + harvester_config.monitor.sleepTime and not (monitor_fifo.enabled and self.singleMode):
0091
0092 sw_db = core_utils.get_stopwatch()
0093 mainLog.debug("starting run with DB")
0094 mainLog.debug("getting workers to monitor")
0095 workSpecsPerQueue = self.dbProxy.get_workers_to_update(
0096 harvester_config.monitor.maxWorkers, harvester_config.monitor.checkInterval, harvester_config.monitor.lockInterval, lockedBy
0097 )
0098 mainLog.debug(f"got {len(workSpecsPerQueue)} queues")
0099
0100 for queueName, configIdWorkSpecs in workSpecsPerQueue.items():
0101 for configID, workSpecsList in configIdWorkSpecs.items():
0102 try:
0103 retVal = self.monitor_agent_core(lockedBy, queueName, workSpecsList, config_id=configID, check_source="DB")
0104 except Exception as e:
0105 mainLog.error(f"monitor_agent_core excepted with {e}")
0106 retVal = None
0107
0108 if monitor_fifo.enabled and retVal is not None:
0109 workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
0110 if workSpecsToEnqueue:
0111 mainLog.debug("putting workers to FIFO")
0112 try:
0113 score = fifoCheckInterval + timeNow_timestamp
0114 monitor_fifo.put((queueName, workSpecsToEnqueue), score)
0115 mainLog.info(f"put workers of {queueName} to FIFO with score {score}")
0116 except Exception as errStr:
0117 mainLog.error(f"failed to put object from FIFO: {errStr}")
0118 if workSpecsToEnqueueToHead:
0119 mainLog.debug("putting workers to FIFO head")
0120 try:
0121 score = fifoCheckInterval - timeNow_timestamp
0122 monitor_fifo.put((queueName, workSpecsToEnqueueToHead), score)
0123 mainLog.info(f"put workers of {queueName} to FIFO with score {score}")
0124 except Exception as errStr:
0125 mainLog.error(f"failed to put object from FIFO head: {errStr}")
0126 last_DB_cycle_timestamp = time.time()
0127 if sw_db.get_elapsed_time_in_sec() > harvester_config.monitor.lockInterval:
0128 mainLog.warning("a single DB cycle was longer than lockInterval " + sw_db.get_elapsed_time())
0129 else:
0130 mainLog.debug("done a DB cycle" + sw_db.get_elapsed_time())
0131 mainLog.debug("ended run with DB")
0132 elif monitor_fifo.enabled:
0133
0134 sw = core_utils.get_stopwatch()
0135 to_run_fifo_check = True
0136 n_loops = 0
0137 n_loops_hit = 0
0138 last_fifo_cycle_timestamp = time.time()
0139 to_break = False
0140 obj_dequeued_id_list = []
0141 obj_to_enqueue_dict = collections.defaultdict(lambda: [[], 0, 0])
0142 obj_to_enqueue_to_head_dict = collections.defaultdict(lambda: [[], 0, 0])
0143 remaining_obj_to_enqueue_dict = {}
0144 remaining_obj_to_enqueue_to_head_dict = {}
0145 n_chunk_peeked_stat, sum_overhead_time_stat = 0, 0.0
0146
0147 if self.monitor_event_fifo.enabled:
0148
0149 to_deliver = time.time() >= last_event_delivery_timestamp + eventBasedCheckInterval
0150 to_digest = time.time() >= last_event_digest_timestamp + eventBasedCheckInterval / 4
0151 to_dispose = time.time() >= last_event_dispose_timestamp + eventBasedCheckInterval / 2
0152 if to_deliver:
0153
0154 got_lock = self.dbProxy.get_process_lock("monitor_event_deliverer", lockedBy, eventBasedCheckInterval)
0155 if got_lock:
0156 self.monitor_event_deliverer(time_window=eventBasedTimeWindow)
0157 else:
0158 mainLog.debug("did not get lock. Skip monitor_event_deliverer")
0159 last_event_delivery_timestamp = time.time()
0160 if to_digest:
0161
0162 to_run_fifo_check = False
0163 retMap = self.monitor_event_digester(locked_by=lockedBy, max_events=eventBasedCheckMaxEvents)
0164 for qc_key, retVal in retMap.items():
0165 workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
0166
0167 obj_to_enqueue_to_head_dict[qc_key][0].extend(workSpecsToEnqueueToHead)
0168 obj_to_enqueue_to_head_dict[qc_key][1] = max(obj_to_enqueue_to_head_dict[qc_key][1], timeNow_timestamp)
0169 obj_to_enqueue_to_head_dict[qc_key][2] = max(obj_to_enqueue_to_head_dict[qc_key][2], fifoCheckInterval)
0170 last_event_digest_timestamp = time.time()
0171 if to_dispose:
0172
0173 self.monitor_event_disposer(event_lifetime=eventBasedEventLifetime, max_events=eventBasedRemoveMaxEvents)
0174 last_event_dispose_timestamp = time.time()
0175 if to_run_fifo_check:
0176
0177 while time.time() < last_fifo_cycle_timestamp + fifoCheckDuration:
0178 sw.reset()
0179 n_loops += 1
0180 try:
0181 retVal, overhead_time = monitor_fifo.to_check_workers()
0182 except Exception as e:
0183 mainLog.error(f"failed to check workers from FIFO: {e}")
0184 if overhead_time is not None:
0185 n_chunk_peeked_stat += 1
0186 sum_overhead_time_stat += overhead_time
0187 if retVal:
0188
0189 try:
0190 fifo_size = monitor_fifo.size()
0191 mainLog.debug(f"FIFO size is {fifo_size}")
0192 except Exception as e:
0193 mainLog.error(f"failed to get size of FIFO: {e}")
0194 time.sleep(2)
0195 continue
0196 mainLog.debug("starting run with FIFO")
0197 try:
0198 obj_gotten = monitor_fifo.get(timeout=1, protective=fifoProtectiveDequeue)
0199 except Exception as errStr:
0200 mainLog.error(f"failed to get object from FIFO: {errStr}")
0201 time.sleep(2)
0202 continue
0203 else:
0204 if obj_gotten is not None:
0205 sw_fifo = core_utils.get_stopwatch()
0206 if fifoProtectiveDequeue:
0207 obj_dequeued_id_list.append(obj_gotten.id)
0208 queueName, workSpecsList = obj_gotten.item
0209 mainLog.debug(f"got a chunk of {len(workSpecsList)} workers of {queueName} from FIFO" + sw.get_elapsed_time())
0210 sw.reset()
0211 configID = None
0212 for workSpecs in workSpecsList:
0213 if configID is None and len(workSpecs) > 0:
0214 configID = workSpecs[0].configID
0215 for workSpec in workSpecs:
0216 if workSpec.pandaid_list is None:
0217 _jobspec_list = workSpec.get_jobspec_list()
0218 if _jobspec_list is not None:
0219 workSpec.pandaid_list = [j.PandaID for j in workSpec.get_jobspec_list()]
0220 else:
0221 workSpec.pandaid_list = []
0222 workSpec.force_update("pandaid_list")
0223 try:
0224 retVal = self.monitor_agent_core(
0225 lockedBy, queueName, workSpecsList, from_fifo=True, config_id=configID, check_source="FIFO"
0226 )
0227 except Exception as e:
0228 mainLog.error(f"monitor_agent_core excepted with {e}")
0229 retVal = None
0230
0231 if retVal is not None:
0232 workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
0233 qc_key = (queueName, configID)
0234 try:
0235 if len(obj_to_enqueue_dict[qc_key][0]) + len(workSpecsToEnqueue) <= fifoMaxWorkersPerChunk:
0236 obj_to_enqueue_dict[qc_key][0].extend(workSpecsToEnqueue)
0237 obj_to_enqueue_dict[qc_key][1] = max(obj_to_enqueue_dict[qc_key][1], timeNow_timestamp)
0238 obj_to_enqueue_dict[qc_key][2] = max(obj_to_enqueue_dict[qc_key][2], fifoCheckInterval)
0239 else:
0240 to_break = True
0241 remaining_obj_to_enqueue_dict[qc_key] = [workSpecsToEnqueue, timeNow_timestamp, fifoCheckInterval]
0242 except Exception as errStr:
0243 mainLog.error(f"failed to gather workers for FIFO: {errStr}")
0244 to_break = True
0245 try:
0246 if len(obj_to_enqueue_to_head_dict[qc_key][0]) + len(workSpecsToEnqueueToHead) <= fifoMaxWorkersPerChunk:
0247 obj_to_enqueue_to_head_dict[qc_key][0].extend(workSpecsToEnqueueToHead)
0248 obj_to_enqueue_to_head_dict[qc_key][1] = max(obj_to_enqueue_to_head_dict[qc_key][1], timeNow_timestamp)
0249 obj_to_enqueue_to_head_dict[qc_key][2] = max(obj_to_enqueue_to_head_dict[qc_key][2], fifoCheckInterval)
0250 else:
0251 to_break = True
0252 remaining_obj_to_enqueue_to_head_dict[qc_key] = [workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval]
0253 except Exception as errStr:
0254 mainLog.error(f"failed to gather workers for FIFO head: {errStr}")
0255 to_break = True
0256 mainLog.debug(f"checked {len(workSpecsList)} workers from FIFO" + sw.get_elapsed_time())
0257 else:
0258 mainLog.debug("monitor_agent_core returned None. Skipped putting to FIFO")
0259 if sw_fifo.get_elapsed_time_in_sec() > harvester_config.monitor.lockInterval:
0260 mainLog.warning("a single FIFO cycle was longer than lockInterval " + sw_fifo.get_elapsed_time())
0261 else:
0262 mainLog.debug("done a FIFO cycle" + sw_fifo.get_elapsed_time())
0263 n_loops_hit += 1
0264 if to_break:
0265 break
0266 else:
0267 mainLog.debug("got nothing in FIFO")
0268 else:
0269 mainLog.debug("workers in FIFO too young to check. Skipped")
0270 if self.singleMode:
0271 break
0272 if overhead_time is not None:
0273 time.sleep(max(-overhead_time * random.uniform(0.1, 1), adjusted_sleepTime))
0274 else:
0275 time.sleep(max(fifoCheckDuration * random.uniform(0.1, 1), adjusted_sleepTime))
0276 mainLog.debug(f"run {n_loops} loops, including {n_loops_hit} FIFO cycles")
0277
0278 sw.reset()
0279 n_chunk_put = 0
0280 mainLog.debug("putting worker chunks to FIFO")
0281 for _dct in (obj_to_enqueue_dict, remaining_obj_to_enqueue_dict):
0282 for (queueName, configID), obj_to_enqueue in _dct.items():
0283 try:
0284 workSpecsToEnqueue, timeNow_timestamp, fifoCheckInterval = obj_to_enqueue
0285 if workSpecsToEnqueue:
0286 score = fifoCheckInterval + timeNow_timestamp
0287 monitor_fifo.put((queueName, workSpecsToEnqueue), score)
0288 n_chunk_put += 1
0289 mainLog.info(f"put a chunk of {len(workSpecsToEnqueue)} workers of {queueName} to FIFO with score {score}")
0290 except Exception as errStr:
0291 mainLog.error(f"failed to put object from FIFO: {errStr}")
0292 mainLog.debug("putting worker chunks to FIFO head")
0293 for _dct in (obj_to_enqueue_to_head_dict, remaining_obj_to_enqueue_to_head_dict):
0294 for (queueName, configID), obj_to_enqueue_to_head in _dct.items():
0295 try:
0296 workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = obj_to_enqueue_to_head
0297 if workSpecsToEnqueueToHead:
0298 score = fifoCheckInterval + timeNow_timestamp - 2**32
0299 monitor_fifo.put((queueName, workSpecsToEnqueueToHead), score)
0300 n_chunk_put += 1
0301 mainLog.info(f"put a chunk of {len(workSpecsToEnqueueToHead)} workers of {queueName} to FIFO with score {score}")
0302 except Exception as errStr:
0303 mainLog.error(f"failed to put object from FIFO head: {errStr}")
0304
0305 if fifoProtectiveDequeue and len(obj_dequeued_id_list) > 0:
0306 try:
0307 monitor_fifo.delete(ids=obj_dequeued_id_list)
0308 except Exception as e:
0309 mainLog.error(f"failed to delete object from FIFO: {e}")
0310 mainLog.debug(f"put {n_chunk_put} worker chunks into FIFO" + sw.get_elapsed_time())
0311
0312 if n_chunk_peeked_stat > 0 and sum_overhead_time_stat > sleepTime:
0313 speedup_factor = (sum_overhead_time_stat - sleepTime) / (n_chunk_peeked_stat * harvester_config.monitor.checkInterval)
0314 speedup_factor = max(speedup_factor, 0)
0315 adjusted_sleepTime = adjusted_sleepTime / (1.0 + speedup_factor)
0316 elif n_chunk_peeked_stat == 0 or sum_overhead_time_stat < 0:
0317 adjusted_sleepTime = (sleepTime + adjusted_sleepTime) / 2
0318 mainLog.debug(f"adjusted_sleepTime becomes {adjusted_sleepTime:.3f} sec")
0319
0320 mainLog.debug("ended run with FIFO")
0321
0322 mainLog.debug("done a monitor cycle" + sw_main.get_elapsed_time())
0323
0324 if self.terminated(adjusted_sleepTime):
0325 mainLog.debug("terminated")
0326 return
0327
0328
0329 def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False, config_id=None, check_source=None):
0330 tmpQueLog = self.make_logger(_logger, f"id={lockedBy} queue={queueName}", method_name="run")
0331
0332 if not self.queueConfigMapper.has_queue(queueName, config_id):
0333 tmpQueLog.error("config not found")
0334 return None
0335
0336 queueConfig = self.queueConfigMapper.get_queue(queueName, config_id)
0337
0338 monCore = self.pluginFactory.get_plugin(queueConfig.monitor)
0339 messenger = self.pluginFactory.get_plugin(queueConfig.messenger)
0340
0341 workSpecsToEnqueue_dict = {}
0342 workSpecsToEnqueueToHead_dict = {}
0343 timeNow_timestamp = time.time()
0344
0345 try:
0346 fifoCheckInterval = monCore.fifoCheckInterval
0347 except Exception:
0348 if hasattr(harvester_config.monitor, "fifoCheckInterval"):
0349 fifoCheckInterval = harvester_config.monitor.fifoCheckInterval
0350 else:
0351 fifoCheckInterval = harvester_config.monitor.checkInterval
0352 try:
0353 forceEnqueueInterval = harvester_config.monitor.fifoForceEnqueueInterval
0354 except AttributeError:
0355 forceEnqueueInterval = 3600
0356 try:
0357 fifoMaxPreemptInterval = harvester_config.monitor.fifoMaxPreemptInterval
0358 except AttributeError:
0359 fifoMaxPreemptInterval = 60
0360
0361 allWorkers = [item for sublist in workSpecsList for item in sublist]
0362 tmpQueLog.debug(f"checking {len(allWorkers)} workers")
0363 tmpStat, tmpRetMap = self.check_workers(monCore, messenger, allWorkers, queueConfig, tmpQueLog, from_fifo)
0364 if tmpStat:
0365
0366 tmpQueLog.debug("update jobs and workers")
0367 iWorker = 0
0368 for workSpecs in workSpecsList:
0369 jobSpecs = None
0370 pandaIDsList = []
0371 eventsToUpdateList = []
0372 filesToStageOutList = dict()
0373 isCheckedList = []
0374 mapType = workSpecs[0].mapType
0375
0376 for workSpec in workSpecs:
0377 tmpLog = self.make_logger(_logger, f"id={lockedBy} workerID={workSpec.workerID} from={check_source}", method_name="run")
0378 tmpOut = tmpRetMap[workSpec.workerID]
0379 oldStatus = tmpOut["oldStatus"]
0380 newStatus = tmpOut["newStatus"]
0381 monStatus = tmpOut["monStatus"]
0382 diagMessage = tmpOut["diagMessage"]
0383 workAttributes = tmpOut["workAttributes"]
0384 eventsToUpdate = tmpOut["eventsToUpdate"]
0385 filesToStageOut = tmpOut["filesToStageOut"]
0386 eventsRequestParams = tmpOut["eventsRequestParams"]
0387 nJobsToReFill = tmpOut["nJobsToReFill"]
0388 pandaIDs = tmpOut["pandaIDs"]
0389 isChecked = tmpOut["isChecked"]
0390 tmpStr = "newStatus={0} monitoredStatus={1} diag={2} "
0391 tmpStr += "postProcessed={3} files={4}"
0392 tmpLog.debug(tmpStr.format(newStatus, monStatus, diagMessage, workSpec.is_post_processed(), str(filesToStageOut)))
0393 iWorker += 1
0394
0395 if newStatus not in WorkSpec.ST_LIST:
0396 tmpLog.error(f"unknown status={newStatus}")
0397 return
0398
0399 workSpec.set_status(newStatus)
0400 workSpec.set_work_attributes(workAttributes)
0401 workSpec.set_dialog_message(diagMessage)
0402 if isChecked:
0403 workSpec.checkTime = core_utils.naive_utcnow()
0404 isCheckedList.append(isChecked)
0405 if monStatus == WorkSpec.ST_failed:
0406 if not workSpec.has_pilot_error() and workSpec.errorCode is None:
0407 workSpec.set_pilot_error(PilotErrors.GENERALERROR, diagMessage)
0408 elif monStatus == WorkSpec.ST_cancelled:
0409 if not workSpec.has_pilot_error() and workSpec.errorCode is None:
0410 workSpec.set_pilot_error(PilotErrors.PANDAKILL, diagMessage)
0411 if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]:
0412 workSpec.set_work_params({"finalMonStatus": monStatus})
0413
0414 if eventsRequestParams != {}:
0415 workSpec.eventsRequest = WorkSpec.EV_requestEvents
0416 workSpec.eventsRequestParams = eventsRequestParams
0417
0418 if nJobsToReFill is not None:
0419 workSpec.nJobsToReFill = nJobsToReFill
0420
0421 if workSpec.hasJob == 1 and jobSpecs is None:
0422 jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, only_running=True, slim=True)
0423
0424 pandaIDsList.append(pandaIDs)
0425 if len(eventsToUpdate) > 0:
0426 eventsToUpdateList.append(eventsToUpdate)
0427 if len(filesToStageOut) > 0:
0428 filesToStageOutList[workSpec.workerID] = filesToStageOut
0429
0430 if newStatus != oldStatus:
0431 tmpQueLog.debug(f"newStatus: {newStatus} monStatus: {monStatus} oldStatus: {oldStatus} workSpecStatus: {workSpec.status}")
0432 self.apfmon.update_worker(workSpec, monStatus)
0433
0434
0435 if from_fifo:
0436
0437 worker_id_list = dict()
0438 for workSpec, isChecked in zip(workSpecs, isCheckedList):
0439 attrs = dict()
0440 if isChecked:
0441 attrs["checkTime"] = workSpec.checkTime
0442 workSpec.force_not_update("checkTime")
0443 if workSpec.has_updated_attributes():
0444 attrs["lockedBy"] = lockedBy
0445 workSpec.lockedBy = lockedBy
0446 workSpec.force_not_update("lockedBy")
0447 else:
0448 attrs["lockedBy"] = None
0449 worker_id_list[workSpec.workerID] = attrs
0450 temRetLockWorker = self.dbProxy.lock_workers(worker_id_list, harvester_config.monitor.lockInterval)
0451
0452 if not temRetLockWorker:
0453 continue
0454
0455 if jobSpecs is not None and len(jobSpecs) > 0:
0456 tmpQueLog.debug(f"updating {len(jobSpecs)} jobs with {len(workSpecs)} workers")
0457 core_utils.update_job_attributes_with_workers(mapType, jobSpecs, workSpecs, filesToStageOutList, eventsToUpdateList)
0458
0459 tmpRet = self.dbProxy.update_jobs_workers(jobSpecs, workSpecs, lockedBy, pandaIDsList)
0460 if not tmpRet:
0461 for workSpec in workSpecs:
0462 tmpLog = self.make_logger(_logger, f"id={lockedBy} workerID={workSpec.workerID}", method_name="run")
0463 if from_fifo:
0464 tmpLog.info("failed to update the DB. Maybe locked by other thread running with DB")
0465 else:
0466 if workSpec.status in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled, WorkSpec.ST_missed]:
0467 tmpLog.info("worker already in final status. Skipped")
0468 else:
0469 tmpLog.error("failed to update the DB. lockInterval may be too short")
0470 else:
0471 if jobSpecs is not None:
0472 for jobSpec in jobSpecs:
0473 tmpLog = self.make_logger(_logger, f"id={lockedBy} PandaID={jobSpec.PandaID}", method_name="run")
0474 tmpLog.debug(
0475 f"new status={jobSpec.status} subStatus={jobSpec.subStatus} status_in_metadata={jobSpec.get_job_status_from_attributes()}"
0476 )
0477
0478 if len(eventsToUpdateList) > 0 or len(filesToStageOutList) > 0:
0479 for workSpec in workSpecs:
0480 try:
0481 messenger.acknowledge_events_files(workSpec)
0482 except Exception:
0483 core_utils.dump_error_message(tmpQueLog)
0484 tmpQueLog.error(f"failed to send ACK to workerID={workSpec.workerID}")
0485
0486 if self.monitor_fifo.enabled and workSpecs:
0487 workSpec = workSpecs[0]
0488 tmpOut = tmpRetMap[workSpec.workerID]
0489 newStatus = tmpOut["newStatus"]
0490 monStatus = tmpOut["monStatus"]
0491 if (
0492 newStatus in [WorkSpec.ST_submitted, WorkSpec.ST_running, WorkSpec.ST_idle]
0493 and workSpec.mapType != WorkSpec.MT_MultiWorkers
0494 and workSpec.workAttributes is not None
0495 ):
0496 timeNow = core_utils.naive_utcnow()
0497 timeNow_timestamp = time.time()
0498
0499 _bool, lastCheckAt = workSpec.get_work_params("lastCheckAt")
0500 try:
0501 last_check_period = timeNow_timestamp - lastCheckAt
0502 except TypeError:
0503 last_check_period = forceEnqueueInterval + 1.0
0504
0505 _bool, lastForceEnqueueAt = workSpec.get_work_params("lastForceEnqueueAt")
0506 if not (_bool and lastForceEnqueueAt is not None):
0507 lastForceEnqueueAt = 0
0508
0509 intolerable_delay = max(forceEnqueueInterval * 2, harvester_config.monitor.checkInterval * 4)
0510 if (
0511 _bool
0512 and lastCheckAt is not None
0513 and last_check_period > harvester_config.monitor.checkInterval
0514 and timeNow_timestamp - harvester_config.monitor.checkInterval > self.startTimestamp
0515 ):
0516 if last_check_period > intolerable_delay:
0517 tmpQueLog.error(
0518 "last check period of workerID={0} is {1} sec, intolerably longer than monitor checkInterval. Will NOT enquque worker by force. Please check why monitor checks worker slowly".format(
0519 workSpec.workerID, last_check_period
0520 )
0521 )
0522 else:
0523 tmpQueLog.warning(
0524 f"last check period of workerID={workSpec.workerID} is {last_check_period} sec, longer than monitor checkInterval"
0525 )
0526
0527 if (from_fifo) or (
0528 not from_fifo
0529 and timeNow_timestamp - harvester_config.monitor.sleepTime > self.startTimestamp
0530 and last_check_period > forceEnqueueInterval
0531 and last_check_period < intolerable_delay
0532 and timeNow_timestamp - lastForceEnqueueAt > 86400 + forceEnqueueInterval
0533 ):
0534 if not from_fifo:
0535
0536 tmpQueLog.warning(
0537 "last check period of workerID={0} is {1} sec, longer than monitor forceEnqueueInterval. Enqueue the worker by force".format(
0538 workSpec.workerID, last_check_period
0539 )
0540 )
0541 workSpec.set_work_params({"lastForceEnqueueAt": timeNow_timestamp})
0542 workSpec.set_work_params({"lastCheckAt": timeNow_timestamp})
0543 workSpec.lockedBy = None
0544 workSpec.force_update("lockedBy")
0545 if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]:
0546
0547 _bool, startFifoPreemptAt = workSpec.get_work_params("startFifoPreemptAt")
0548 if not _bool or startFifoPreemptAt is None:
0549 startFifoPreemptAt = timeNow_timestamp
0550 workSpec.set_work_params({"startFifoPreemptAt": startFifoPreemptAt})
0551 tmpQueLog.debug(f"workerID={workSpec.workerID} , startFifoPreemptAt: {startFifoPreemptAt}")
0552 if timeNow_timestamp - startFifoPreemptAt < fifoMaxPreemptInterval:
0553 workSpecsToEnqueueToHead_dict[workSpec.workerID] = workSpecs
0554 else:
0555 workSpec.set_work_params({"startFifoPreemptAt": timeNow_timestamp})
0556 workSpec.modificationTime = timeNow
0557 workSpec.force_update("modificationTime")
0558 workSpecsToEnqueue_dict[workSpec.workerID] = workSpecs
0559 else:
0560 workSpec.modificationTime = timeNow
0561 workSpec.force_update("modificationTime")
0562 workSpecsToEnqueue_dict[workSpec.workerID] = workSpecs
0563 else:
0564 tmpQueLog.error("failed to check workers")
0565 workSpecsToEnqueue = list(workSpecsToEnqueue_dict.values())
0566 workSpecsToEnqueueToHead = list(workSpecsToEnqueueToHead_dict.values())
0567 retVal = workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval
0568 tmpQueLog.debug("done")
0569 return retVal
0570
0571
0572 def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, from_fifo):
0573
0574 try:
0575 checkTimeout = mon_core.checkTimeout
0576 except Exception:
0577 try:
0578 checkTimeout = harvester_config.monitor.checkTimeout
0579 except Exception:
0580 checkTimeout = None
0581 try:
0582 workerQueueTimeLimit = harvester_config.monitor.workerQueueTimeLimit
0583 except AttributeError:
0584 workerQueueTimeLimit = 172800
0585 workersToCheck = []
0586 thingsToPostProcess = []
0587 retMap = dict()
0588 for workSpec in all_workers:
0589 eventsRequestParams = {}
0590 eventsToUpdate = []
0591 pandaIDs = []
0592 workStatus = None
0593 workAttributes = None
0594 filesToStageOut = []
0595 nJobsToReFill = None
0596 if workSpec.has_work_params("finalMonStatus"):
0597
0598 _bool, finalMonStatus = workSpec.get_work_params("finalMonStatus")
0599 _thing = (workSpec, (finalMonStatus, ""))
0600 thingsToPostProcess.append(_thing)
0601 else:
0602
0603 if workSpec.hasJob == 0 and workSpec.mapType != WorkSpec.MT_NoJob:
0604
0605 jobRequested = messenger.job_requested(workSpec)
0606 if jobRequested:
0607
0608 workStatus = WorkSpec.ST_ready
0609 else:
0610 workStatus = workSpec.status
0611 elif workSpec.nJobsToReFill in [0, None]:
0612
0613 jobRequested = messenger.job_requested(workSpec)
0614 if jobRequested:
0615 nJobsToReFill = jobRequested
0616 workersToCheck.append(workSpec)
0617 else:
0618 workersToCheck.append(workSpec)
0619
0620 retMap[workSpec.workerID] = {
0621 "oldStatus": workSpec.status,
0622 "newStatus": workStatus,
0623 "monStatus": workStatus,
0624 "workAttributes": workAttributes,
0625 "filesToStageOut": filesToStageOut,
0626 "eventsRequestParams": eventsRequestParams,
0627 "eventsToUpdate": eventsToUpdate,
0628 "diagMessage": "",
0629 "pandaIDs": pandaIDs,
0630 "nJobsToReFill": nJobsToReFill,
0631 "isChecked": True,
0632 }
0633
0634 tmp_log.debug("checking workers with plugin")
0635 try:
0636 if workersToCheck:
0637 tmpStat, tmpOut = mon_core.check_workers(workersToCheck)
0638 if not tmpStat:
0639 tmp_log.error(f"failed to check workers with: {tmpOut}")
0640 workersToCheck = []
0641 tmpOut = []
0642 else:
0643 tmp_log.debug("checked")
0644 else:
0645 tmp_log.debug("Nothing to be checked with plugin")
0646 tmpOut = []
0647 timeNow = core_utils.naive_utcnow()
0648 for workSpec, (newStatus, diagMessage) in itertools.chain(zip(workersToCheck, tmpOut), thingsToPostProcess):
0649 workerID = workSpec.workerID
0650 tmp_log.debug(f"Going to check workerID={workerID}")
0651 pandaIDs = []
0652 if workerID in retMap:
0653
0654 if newStatus is None:
0655 tmp_log.warning(f"Failed to check workerID={workerID} with {diagMessage}")
0656 retMap[workerID]["isChecked"] = False
0657
0658 if (
0659 workSpec.checkTime is not None
0660 and checkTimeout is not None
0661 and timeNow - workSpec.checkTime > datetime.timedelta(seconds=checkTimeout)
0662 ):
0663
0664 tmp_log.debug(f"kill workerID={workerID} due to consecutive check failures")
0665 self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
0666 newStatus = WorkSpec.ST_cancelled
0667 diagMessage = "Killed by Harvester due to consecutive worker check failures. " + diagMessage
0668 workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
0669 else:
0670
0671 newStatus = workSpec.status
0672
0673 if messenger.kill_requested(workSpec):
0674 tmp_log.debug(f"kill workerID={workerID} as requested")
0675 self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
0676
0677 if workSpec.status == WorkSpec.ST_submitted and timeNow > workSpec.submitTime + datetime.timedelta(seconds=workerQueueTimeLimit):
0678 tmp_log.debug(f"kill workerID={workerID} due to queuing longer than {workerQueueTimeLimit} seconds")
0679 self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
0680 diagMessage = "Killed by Harvester due to worker queuing too long. " + diagMessage
0681 workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
0682
0683 workSpec.set_pilot_closed()
0684
0685 try:
0686
0687 worker_heartbeat_limit = int(queue_config.messenger["worker_heartbeat"])
0688 except (AttributeError, KeyError):
0689 worker_heartbeat_limit = None
0690 tmp_log.debug(f"workerID={workerID} heartbeat limit is configured to {worker_heartbeat_limit}")
0691 if worker_heartbeat_limit:
0692 if messenger.is_alive(workSpec, worker_heartbeat_limit):
0693 tmp_log.debug(f"heartbeat for workerID={workerID} is valid")
0694 else:
0695 tmp_log.debug(f"heartbeat for workerID={workerID} expired: sending kill request")
0696 self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
0697 diagMessage = "Killed by Harvester due to worker heartbeat expired. " + diagMessage
0698 workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
0699
0700 workAttributes = messenger.get_work_attributes(workSpec)
0701 retMap[workerID]["workAttributes"] = workAttributes
0702
0703 filesToStageOut = messenger.get_files_to_stage_out(workSpec)
0704 retMap[workerID]["filesToStageOut"] = filesToStageOut
0705
0706 if workSpec.eventsRequest in [WorkSpec.EV_useEvents, WorkSpec.EV_requestEvents]:
0707 eventsToUpdate = messenger.events_to_update(workSpec)
0708 retMap[workerID]["eventsToUpdate"] = eventsToUpdate
0709
0710 if workSpec.eventsRequest == WorkSpec.EV_useEvents:
0711 eventsRequestParams = messenger.events_requested(workSpec)
0712 retMap[workerID]["eventsRequestParams"] = eventsRequestParams
0713
0714 if workSpec.mapType == WorkSpec.MT_NoJob:
0715 pandaIDs = messenger.get_panda_ids(workSpec)
0716 retMap[workerID]["pandaIDs"] = pandaIDs
0717
0718 retMap[workerID]["monStatus"] = newStatus
0719
0720 if newStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]:
0721 isOK = True
0722 if len(retMap[workerID]["filesToStageOut"]) > 0 or len(retMap[workerID]["eventsToUpdate"]) > 0:
0723 if workSpec.status == WorkSpec.ST_running:
0724 newStatus = WorkSpec.ST_running
0725 else:
0726 newStatus = WorkSpec.ST_idle
0727 elif not workSpec.is_post_processed():
0728 if (not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot) or (
0729 hasattr(messenger, "forcePostProcessing") and messenger.forcePostProcessing
0730 ):
0731
0732 jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, True, only_running=True, slim=True)
0733
0734 tmpStat = messenger.post_processing(workSpec, jobSpecs, workSpec.mapType)
0735 if tmpStat is None:
0736
0737 ppTimeOut = getattr(harvester_config.monitor, "postProcessTimeout", 0)
0738 if ppTimeOut > 0:
0739 timeLimit = core_utils.naive_utcnow() - datetime.timedelta(minutes=ppTimeOut)
0740 if workSpec.endTime is None or workSpec.endTime > timeLimit:
0741 isOK = False
0742
0743 workSpec.set_end_time()
0744 if isOK:
0745 workSpec.post_processed()
0746 if workSpec.status == WorkSpec.ST_running:
0747 newStatus = WorkSpec.ST_running
0748 else:
0749 newStatus = WorkSpec.ST_idle
0750
0751 if isOK and not self.monitor_fifo.enabled:
0752 workSpec.trigger_next_lookup()
0753 retMap[workerID]["newStatus"] = newStatus
0754 retMap[workerID]["diagMessage"] = diagMessage
0755 else:
0756 tmp_log.debug(f"workerID={workerID} not in retMap")
0757 return True, retMap
0758 except Exception:
0759 core_utils.dump_error_message(tmp_log)
0760 return False, None
0761
0762
0763 def monitor_event_deliverer(self, time_window):
0764 tmpLog = self.make_logger(_logger, f"id=monitor-{self.get_pid()}", method_name="monitor_event_deliverer")
0765 tmpLog.debug("start")
0766 for mon_core in self.eventBasedMonCoreList:
0767 tmpLog.debug(f"run with {mon_core.__class__.__name__}")
0768 worker_update_list = mon_core.report_updated_workers(time_window=time_window)
0769 for workerID, updateTimestamp in worker_update_list:
0770 retVal = self.monitor_event_fifo.putbyid(id=workerID, item=True, score=updateTimestamp)
0771 if not retVal:
0772 retVal = self.monitor_event_fifo.update(id=workerID, score=updateTimestamp, temporary=0, cond_score="gt")
0773 if retVal:
0774 tmpLog.debug(f"updated event with workerID={workerID}")
0775 else:
0776 tmpLog.debug(f"event with workerID={workerID} is updated. Skipped")
0777 else:
0778 tmpLog.debug(f"put event with workerID={workerID}")
0779 tmpLog.debug("done")
0780
0781
0782 def monitor_event_digester(self, locked_by, max_events):
0783 tmpLog = self.make_logger(_logger, f"id=monitor-{self.get_pid()}", method_name="monitor_event_digester")
0784 tmpLog.debug("start")
0785 retMap = {}
0786 try:
0787 obj_gotten_list = self.monitor_event_fifo.getmany(mode="first", count=max_events, protective=True)
0788 except Exception as e:
0789 obj_gotten_list = []
0790 tmpLog.error(f"monitor_event_fifo excepted with {e}")
0791 workerID_list = [obj_gotten.id for obj_gotten in obj_gotten_list]
0792 tmpLog.debug(f"got {len(workerID_list)} worker events")
0793 if len(workerID_list) > 0:
0794 updated_workers_dict = self.dbProxy.get_workers_from_ids(workerID_list)
0795 tmpLog.debug("got workspecs for worker events")
0796 for queueName, _val in updated_workers_dict.items():
0797 for configID, workSpecsList in _val.items():
0798 qc_key = (queueName, configID)
0799 tmpLog.debug("checking workers of queueName={0} configID={1}".format(*qc_key))
0800 try:
0801 retVal = self.monitor_agent_core(locked_by, queueName, workSpecsList, from_fifo=True, config_id=configID, check_source="Event")
0802 except Exception as e:
0803 tmpLog.error(f"monitor_agent_core excepted with {e}")
0804 retVal = None
0805
0806 if retVal:
0807 retMap[qc_key] = retVal
0808 tmpLog.debug("done")
0809 return retMap
0810
0811
0812 def monitor_event_disposer(self, event_lifetime, max_events):
0813 tmpLog = self.make_logger(_logger, f"id=monitor-{self.get_pid()}", method_name="monitor_event_disposer")
0814 tmpLog.debug("start")
0815 timeNow_timestamp = time.time()
0816 try:
0817 obj_gotten_list = self.monitor_event_fifo.getmany(mode="first", maxscore=(timeNow_timestamp - event_lifetime), count=max_events, temporary=True)
0818 except Exception as e:
0819 obj_gotten_list = []
0820 tmpLog.error(f"monitor_event_fifo excepted with {e}")
0821 tmpLog.debug(f"removed {len(obj_gotten_list)} events")
0822 try:
0823 n_events = self.monitor_event_fifo.size()
0824 tmpLog.debug(f"now {n_events} events in monitor-event fifo")
0825 except Exception as e:
0826 tmpLog.error(f"failed to get size of monitor-event fifo: {e}")
0827 tmpLog.debug("done")