File indexing completed on 2026-04-20 07:58:58
0001 import collections
0002 import datetime
0003 import json
0004 import os
0005 import pickle
0006 import socket
0007 import time
0008 from calendar import timegm
0009 from threading import get_ident
0010
0011 from pandaharvester.harvesterconfig import harvester_config
0012 from pandaharvester.harvestercore import core_utils
0013 from pandaharvester.harvestercore.db_interface import DBInterface
0014 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0015 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0016
0017
0018 _attribute_list = ["id", "item", "score"]
0019
0020
0021 FifoObject = collections.namedtuple("FifoObject", _attribute_list, rename=False)
0022
0023
0024 _logger = core_utils.setup_logger("fifos")
0025
0026
0027
0028
0029 class FIFOBase(object):
0030
0031 def __init__(self, **kwarg):
0032 for tmpKey, tmpVal in kwarg.items():
0033 setattr(self, tmpKey, tmpVal)
0034 self.hostname = socket.gethostname()
0035 self.os_pid = os.getpid()
0036 self.dbProxy = DBProxy()
0037 self.dbInterface = DBInterface()
0038
0039
0040 def get_pid(self):
0041 thread_id = get_ident()
0042 if thread_id is None:
0043 thread_id = 0
0044 return f"{self.hostname}_{self.os_pid}-{format(get_ident(), 'x')}"
0045
0046
0047 def make_logger(self, base_log, token=None, method_name=None, send_dialog=True):
0048 if send_dialog and hasattr(self, "dbInterface"):
0049 hook = self.dbInterface
0050 else:
0051 hook = None
0052 return core_utils.make_logger(base_log, token=token, method_name=method_name, hook=hook)
0053
0054
0055 def _initialize_fifo(self, force_enable=False):
0056 self.fifoName = f"{self.titleName}_fifo"
0057 self.config = getattr(harvester_config, self.titleName)
0058 if force_enable:
0059 self.enabled = True
0060 elif hasattr(self.config, "fifoEnable") and self.config.fifoEnable:
0061 self.enabled = True
0062 else:
0063 self.enabled = False
0064 return
0065 pluginConf = vars(self.config).copy()
0066 pluginConf.update({"titleName": self.titleName})
0067 if hasattr(self.config, "fifoModule") and hasattr(self.config, "fifoClass"):
0068 pluginConf.update(
0069 {
0070 "module": self.config.fifoModule,
0071 "name": self.config.fifoClass,
0072 }
0073 )
0074 else:
0075 if not hasattr(harvester_config, "fifo"):
0076 return
0077 pluginConf.update(
0078 {
0079 "module": harvester_config.fifo.fifoModule,
0080 "name": harvester_config.fifo.fifoClass,
0081 }
0082 )
0083 pluginFactory = PluginFactory()
0084 self.fifo = pluginFactory.get_plugin(pluginConf)
0085
0086
0087 def encode(self, item):
0088 item_serialized = pickle.dumps(item, -1)
0089 return item_serialized
0090
0091
0092 def decode(self, item_serialized):
0093 item = pickle.loads(item_serialized)
0094 return item
0095
0096
0097 def size(self):
0098 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="size")
0099 retVal = self.fifo.size()
0100 mainLog.debug(f"size={retVal}")
0101 return retVal
0102
0103
0104 def put(self, item, score=None, encode_item=True):
0105 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="put")
0106 if encode_item:
0107 item_serialized = self.encode(item)
0108 else:
0109 item_serialized = item
0110 if score is None:
0111 score = time.time()
0112 retVal = self.fifo.put(item_serialized, score)
0113 mainLog.debug(f"score={score}")
0114 return retVal
0115
0116
0117 def putbyid(self, id, item, score=None, encode_item=True):
0118 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="putbyid")
0119 if encode_item:
0120 item_serialized = self.encode(item)
0121 else:
0122 item_serialized = item
0123 if score is None:
0124 score = time.time()
0125 retVal = self.fifo.putbyid(id, item_serialized, score)
0126 mainLog.debug(f"id={id} score={score}")
0127 return retVal
0128
0129
0130 def get(self, timeout=None, protective=False, decode_item=True):
0131 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="get")
0132 object_tuple = self.fifo.get(timeout, protective)
0133 if object_tuple is None:
0134 retVal = None
0135 else:
0136 id, item_serialized, score = object_tuple
0137 if item_serialized is not None and decode_item:
0138 item = self.decode(item_serialized)
0139 else:
0140 item = item_serialized
0141 retVal = FifoObject(id, item, score)
0142 mainLog.debug(f"called. protective={protective} decode_item={decode_item}")
0143 return retVal
0144
0145
0146 def getlast(self, timeout=None, protective=False, decode_item=True):
0147 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="getlast")
0148 object_tuple = self.fifo.getlast(timeout, protective)
0149 if object_tuple is None:
0150 retVal = None
0151 else:
0152 id, item_serialized, score = object_tuple
0153 if item_serialized is not None and decode_item:
0154 item = self.decode(item_serialized)
0155 else:
0156 item = item_serialized
0157 retVal = FifoObject(id, item, score)
0158 mainLog.debug(f"called. protective={protective} decode_item={decode_item}")
0159 return retVal
0160
0161
0162 def getmany(self, mode="first", minscore=None, maxscore=None, count=None, protective=False, temporary=False, decode_item=True):
0163 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="getmany")
0164 object_tuple_list = self.fifo.getmany(mode, minscore, maxscore, count, protective, temporary)
0165 if not object_tuple_list:
0166 mainLog.debug("empty list")
0167 ret_list = []
0168 for object_tuple in object_tuple_list:
0169 id, item_serialized, score = object_tuple
0170 if item_serialized is not None and decode_item:
0171 item = self.decode(item_serialized)
0172 else:
0173 item = item_serialized
0174 val_tuple = FifoObject(id, item, score)
0175 ret_list.append(val_tuple)
0176 mainLog.debug(
0177 f"mode={mode} minscore={minscore} maxscore={maxscore} count={count} protective={protective} temporary={temporary} decode_item={decode_item}"
0178 )
0179 return ret_list
0180
0181
0182
0183 def peek(self, skip_item=False):
0184 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="peek")
0185 object_tuple = self.fifo.peek(skip_item=skip_item)
0186 if object_tuple is None:
0187 retVal = None
0188 mainLog.debug("fifo empty")
0189 else:
0190 id, item_serialized, score = object_tuple
0191 if item_serialized is None and score is None:
0192 retVal = FifoObject(None, None, None)
0193 else:
0194 if score is None:
0195 score = time.time()
0196 retVal = FifoObject(id, item_serialized, score)
0197 mainLog.debug(f"score={score}")
0198 return retVal
0199
0200
0201 def peeklast(self, skip_item=False):
0202 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="peeklast")
0203 object_tuple = self.fifo.peeklast(skip_item=skip_item)
0204 if object_tuple is None:
0205 retVal = None
0206 mainLog.debug("fifo empty")
0207 else:
0208 id, item_serialized, score = object_tuple
0209 if item_serialized is None and score is None:
0210 retVal = FifoObject(None, None, None)
0211 else:
0212 if score is None:
0213 score = time.time()
0214 retVal = FifoObject(id, item_serialized, score)
0215 mainLog.debug(f"score={score}")
0216 return retVal
0217
0218
0219 def peekbyid(self, id, temporary=False, skip_item=False):
0220 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="peekbyid")
0221 object_tuple = self.fifo.peekbyid(id, temporary, skip_item=skip_item)
0222 if object_tuple is None:
0223 retVal = None
0224 mainLog.debug("fifo empty")
0225 else:
0226 id_gotten, item_serialized, score = object_tuple
0227 if item_serialized is None and score is None:
0228 retVal = FifoObject(None, None, None)
0229 else:
0230 if score is None:
0231 score = time.time()
0232 retVal = FifoObject(id, item_serialized, score)
0233 mainLog.debug(f"id={id} score={score} temporary={temporary}")
0234 return retVal
0235
0236
0237 def peekmany(self, mode="first", minscore=None, maxscore=None, count=None, skip_item=False):
0238 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="peekmany")
0239 object_tuple_list = self.fifo.peekmany(mode, minscore, maxscore, count, skip_item)
0240 if not object_tuple_list:
0241 mainLog.debug("empty list")
0242 ret_list = []
0243 for object_tuple in object_tuple_list:
0244 id_gotten, item_serialized, score = object_tuple
0245 if item_serialized is None and score is None:
0246 val_tuple = FifoObject(None, None, None)
0247 else:
0248 if score is None:
0249 score = time.time()
0250 val_tuple = FifoObject(id, item_serialized, score)
0251 ret_list.append(val_tuple)
0252 mainLog.debug(f"mode={mode} minscore={minscore} maxscore={maxscore} count={count}")
0253 return ret_list
0254
0255
0256 def delete(self, ids):
0257 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="release")
0258 retVal = self.fifo.delete(ids)
0259 mainLog.debug(f"released {retVal} objects in {ids}")
0260 return retVal
0261
0262
0263 def restore(self, ids=None):
0264 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="restore")
0265 retVal = self.fifo.restore(ids)
0266 if ids is None:
0267 mainLog.debug("restored all objects")
0268 else:
0269 mainLog.debug(f"restored objects in {ids}")
0270 return retVal
0271
0272
0273 def update(self, id, item=None, score=None, temporary=None, cond_score="gt"):
0274 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="update")
0275 retVal = self.fifo.update(id, item, score, temporary, cond_score)
0276 update_report_list = []
0277 if item is not None:
0278 update_report_list.append(f"item={item}")
0279 if score is not None:
0280 update_report_list.append(f"score={score}")
0281 if temporary is not None:
0282 update_report_list.append(f"temporary={temporary}")
0283 update_report = " ".join(update_report_list)
0284 mainLog.debug(f"update id={id} cond_score={cond_score}: return={retVal}, {update_report}")
0285 return retVal
0286
0287
0288
0289 class SpecialFIFOBase(FIFOBase):
0290
0291 def __init__(self, **kwarg):
0292 FIFOBase.__init__(self, **kwarg)
0293 self.fifoName = f"{self.titleName}_fifo"
0294 pluginConf = {}
0295 pluginConf.update({"titleName": self.titleName})
0296 pluginConf.update(
0297 {
0298 "module": harvester_config.fifo.fifoModule,
0299 "name": harvester_config.fifo.fifoClass,
0300 }
0301 )
0302 pluginFactory = PluginFactory()
0303 self.fifo = pluginFactory.get_plugin(pluginConf)
0304
0305
0306
0307 class ManagementFIFO(SpecialFIFOBase):
0308 titleName = "management"
0309
0310
0311 def cleanup_tables(self, age_sec=1209600):
0312 retVal = self.fifo.cleanup_tables(age_sec)
0313 return retVal
0314
0315
0316
0317 class BenchmarkFIFO(SpecialFIFOBase):
0318 titleName = "benchmark"
0319
0320
0321
0322 class MonitorFIFO(FIFOBase):
0323 titleName = "monitor"
0324
0325
0326 def __init__(self, **kwarg):
0327 FIFOBase.__init__(self, **kwarg)
0328 self._initialize_fifo()
0329
0330 def populate(self, seconds_ago=0, clear_fifo=False):
0331 """
0332 Populate monitor fifo with all active worker chunks and timeNow as score from DB
0333 with modificationTime earlier than seconds_ago seconds ago
0334 object in fifo = [(queueName_1, [[worker_1_1], [worker_1_2], ...]), (queueName_2, ...)]
0335 """
0336 if clear_fifo:
0337 self.fifo.clear()
0338 try:
0339 fifoMaxWorkersToPopulate = self.config.fifoMaxWorkersToPopulate
0340 except AttributeError:
0341 fifoMaxWorkersToPopulate = 2**32
0342 try:
0343 fifoMaxWorkersPerChunk = self.config.fifoMaxWorkersPerChunk
0344 except AttributeError:
0345 fifoMaxWorkersPerChunk = 500
0346 workspec_iterator = self.dbProxy.get_active_workers(fifoMaxWorkersToPopulate, seconds_ago)
0347 last_queueName = None
0348 workspec_chunk = []
0349 timeNow_timestamp = time.time()
0350 score = timeNow_timestamp
0351 for workspec in workspec_iterator:
0352 workspec.set_work_params({"lastCheckAt": timeNow_timestamp})
0353 if last_queueName is None:
0354 try:
0355 score = timegm(workspec.modificationTime.utctimetuple())
0356 except Exception:
0357 pass
0358 workspec_chunk = [[workspec]]
0359 last_queueName = workspec.computingSite
0360 elif workspec.computingSite == last_queueName and len(workspec_chunk) < fifoMaxWorkersPerChunk:
0361 workspec_chunk.append([workspec])
0362 else:
0363 self.put((last_queueName, workspec_chunk), score)
0364 try:
0365 score = timegm(workspec.modificationTime.utctimetuple())
0366 except Exception:
0367 pass
0368 workspec_chunk = [[workspec]]
0369 last_queueName = workspec.computingSite
0370 if len(workspec_chunk) > 0:
0371 self.put((last_queueName, workspec_chunk), score)
0372
0373 def to_check_workers(self, check_interval=harvester_config.monitor.checkInterval):
0374 """
0375 Justify whether to check any worker by the modificationTime of the first worker in fifo
0376 retVal True if OK to dequeue to check;
0377 retVal False otherwise.
0378 Return retVal, overhead_time
0379 """
0380 mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="to_check_worker")
0381 retVal = False
0382 overhead_time = None
0383 timeNow_timestamp = time.time()
0384 peeked_tuple = self.peek(skip_item=True)
0385 if peeked_tuple is not None:
0386 score = peeked_tuple.score
0387 overhead_time = timeNow_timestamp - score
0388 if overhead_time > 0:
0389 retVal = True
0390 if score < 0:
0391 mainLog.debug("True. Preempting")
0392 overhead_time = None
0393 else:
0394 mainLog.debug("True")
0395 mainLog.info(f"Overhead time is {overhead_time:.3f} sec")
0396 else:
0397 mainLog.debug("False. Workers too young to check")
0398 mainLog.debug(f"Overhead time is {overhead_time:.3f} sec")
0399 else:
0400 mainLog.debug("False. Got nothing in FIFO")
0401 return retVal, overhead_time
0402
0403
0404 class MonitorEventFIFO(SpecialFIFOBase):
0405 titleName = "monitorEvent"
0406
0407
0408 def __init__(self, **kwarg):
0409 self.config = getattr(harvester_config, "monitor")
0410 self.enabled = False
0411 if hasattr(self.config, "fifoEnable") and self.config.fifoEnable and getattr(self.config, "eventBasedEnable", False):
0412 self.enabled = True
0413 SpecialFIFOBase.__init__(self, **kwarg)