Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import collections
0002 import datetime
0003 import json
0004 import os
0005 import pickle
0006 import socket
0007 import time
0008 from calendar import timegm
0009 from threading import get_ident
0010 
0011 from pandaharvester.harvesterconfig import harvester_config
0012 from pandaharvester.harvestercore import core_utils
0013 from pandaharvester.harvestercore.db_interface import DBInterface
0014 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0015 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0016 
0017 # attribute list
0018 _attribute_list = ["id", "item", "score"]
0019 
0020 # fifo object spec
0021 FifoObject = collections.namedtuple("FifoObject", _attribute_list, rename=False)
0022 
0023 # logger
0024 _logger = core_utils.setup_logger("fifos")
0025 
0026 # base class of fifo message queue
0027 
0028 
0029 class FIFOBase(object):
0030     # constructor
0031     def __init__(self, **kwarg):
0032         for tmpKey, tmpVal in kwarg.items():
0033             setattr(self, tmpKey, tmpVal)
0034         self.hostname = socket.gethostname()
0035         self.os_pid = os.getpid()
0036         self.dbProxy = DBProxy()
0037         self.dbInterface = DBInterface()
0038 
0039     # get process identifier
0040     def get_pid(self):
0041         thread_id = get_ident()
0042         if thread_id is None:
0043             thread_id = 0
0044         return f"{self.hostname}_{self.os_pid}-{format(get_ident(), 'x')}"
0045 
0046     # make logger
0047     def make_logger(self, base_log, token=None, method_name=None, send_dialog=True):
0048         if send_dialog and hasattr(self, "dbInterface"):
0049             hook = self.dbInterface
0050         else:
0051             hook = None
0052         return core_utils.make_logger(base_log, token=token, method_name=method_name, hook=hook)
0053 
0054     # initialize fifo from harvester configuration
0055     def _initialize_fifo(self, force_enable=False):
0056         self.fifoName = f"{self.titleName}_fifo"
0057         self.config = getattr(harvester_config, self.titleName)
0058         if force_enable:
0059             self.enabled = True
0060         elif hasattr(self.config, "fifoEnable") and self.config.fifoEnable:
0061             self.enabled = True
0062         else:
0063             self.enabled = False
0064             return
0065         pluginConf = vars(self.config).copy()
0066         pluginConf.update({"titleName": self.titleName})
0067         if hasattr(self.config, "fifoModule") and hasattr(self.config, "fifoClass"):
0068             pluginConf.update(
0069                 {
0070                     "module": self.config.fifoModule,
0071                     "name": self.config.fifoClass,
0072                 }
0073             )
0074         else:
0075             if not hasattr(harvester_config, "fifo"):
0076                 return
0077             pluginConf.update(
0078                 {
0079                     "module": harvester_config.fifo.fifoModule,
0080                     "name": harvester_config.fifo.fifoClass,
0081                 }
0082             )
0083         pluginFactory = PluginFactory()
0084         self.fifo = pluginFactory.get_plugin(pluginConf)
0085 
0086     # encode
0087     def encode(self, item):
0088         item_serialized = pickle.dumps(item, -1)
0089         return item_serialized
0090 
0091     # decode
0092     def decode(self, item_serialized):
0093         item = pickle.loads(item_serialized)
0094         return item
0095 
0096     # size of queue
0097     def size(self):
0098         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="size")
0099         retVal = self.fifo.size()
0100         mainLog.debug(f"size={retVal}")
0101         return retVal
0102 
0103     # enqueue
0104     def put(self, item, score=None, encode_item=True):
0105         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="put")
0106         if encode_item:
0107             item_serialized = self.encode(item)
0108         else:
0109             item_serialized = item
0110         if score is None:
0111             score = time.time()
0112         retVal = self.fifo.put(item_serialized, score)
0113         mainLog.debug(f"score={score}")
0114         return retVal
0115 
0116     # enqueue by id, which is unique
0117     def putbyid(self, id, item, score=None, encode_item=True):
0118         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="putbyid")
0119         if encode_item:
0120             item_serialized = self.encode(item)
0121         else:
0122             item_serialized = item
0123         if score is None:
0124             score = time.time()
0125         retVal = self.fifo.putbyid(id, item_serialized, score)
0126         mainLog.debug(f"id={id} score={score}")
0127         return retVal
0128 
0129     # dequeue to get the first fifo object
0130     def get(self, timeout=None, protective=False, decode_item=True):
0131         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="get")
0132         object_tuple = self.fifo.get(timeout, protective)
0133         if object_tuple is None:
0134             retVal = None
0135         else:
0136             id, item_serialized, score = object_tuple
0137             if item_serialized is not None and decode_item:
0138                 item = self.decode(item_serialized)
0139             else:
0140                 item = item_serialized
0141             retVal = FifoObject(id, item, score)
0142         mainLog.debug(f"called. protective={protective} decode_item={decode_item}")
0143         return retVal
0144 
0145     # dequeue to get the last fifo object
0146     def getlast(self, timeout=None, protective=False, decode_item=True):
0147         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="getlast")
0148         object_tuple = self.fifo.getlast(timeout, protective)
0149         if object_tuple is None:
0150             retVal = None
0151         else:
0152             id, item_serialized, score = object_tuple
0153             if item_serialized is not None and decode_item:
0154                 item = self.decode(item_serialized)
0155             else:
0156                 item = item_serialized
0157             retVal = FifoObject(id, item, score)
0158         mainLog.debug(f"called. protective={protective} decode_item={decode_item}")
0159         return retVal
0160 
0161     # dequeue list of objects with some conditions
0162     def getmany(self, mode="first", minscore=None, maxscore=None, count=None, protective=False, temporary=False, decode_item=True):
0163         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="getmany")
0164         object_tuple_list = self.fifo.getmany(mode, minscore, maxscore, count, protective, temporary)
0165         if not object_tuple_list:
0166             mainLog.debug("empty list")
0167         ret_list = []
0168         for object_tuple in object_tuple_list:
0169             id, item_serialized, score = object_tuple
0170             if item_serialized is not None and decode_item:
0171                 item = self.decode(item_serialized)
0172             else:
0173                 item = item_serialized
0174             val_tuple = FifoObject(id, item, score)
0175             ret_list.append(val_tuple)
0176         mainLog.debug(
0177             f"mode={mode} minscore={minscore} maxscore={maxscore} count={count} protective={protective} temporary={temporary} decode_item={decode_item}"
0178         )
0179         return ret_list
0180 
0181     # get tuple of the first object and its score without dequeuing
0182     # If item is large un unnecessary to show int peek, set skip_item=True
0183     def peek(self, skip_item=False):
0184         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="peek")
0185         object_tuple = self.fifo.peek(skip_item=skip_item)
0186         if object_tuple is None:
0187             retVal = None
0188             mainLog.debug("fifo empty")
0189         else:
0190             id, item_serialized, score = object_tuple
0191             if item_serialized is None and score is None:
0192                 retVal = FifoObject(None, None, None)
0193             else:
0194                 if score is None:
0195                     score = time.time()
0196                 retVal = FifoObject(id, item_serialized, score)
0197             mainLog.debug(f"score={score}")
0198         return retVal
0199 
0200     # get tuple of the last object and its score without dequeuing
0201     def peeklast(self, skip_item=False):
0202         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="peeklast")
0203         object_tuple = self.fifo.peeklast(skip_item=skip_item)
0204         if object_tuple is None:
0205             retVal = None
0206             mainLog.debug("fifo empty")
0207         else:
0208             id, item_serialized, score = object_tuple
0209             if item_serialized is None and score is None:
0210                 retVal = FifoObject(None, None, None)
0211             else:
0212                 if score is None:
0213                     score = time.time()
0214                 retVal = FifoObject(id, item_serialized, score)
0215             mainLog.debug(f"score={score}")
0216         return retVal
0217 
0218     # get tuple of the object by id without dequeuing
0219     def peekbyid(self, id, temporary=False, skip_item=False):
0220         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="peekbyid")
0221         object_tuple = self.fifo.peekbyid(id, temporary, skip_item=skip_item)
0222         if object_tuple is None:
0223             retVal = None
0224             mainLog.debug("fifo empty")
0225         else:
0226             id_gotten, item_serialized, score = object_tuple
0227             if item_serialized is None and score is None:
0228                 retVal = FifoObject(None, None, None)
0229             else:
0230                 if score is None:
0231                     score = time.time()
0232                 retVal = FifoObject(id, item_serialized, score)
0233             mainLog.debug(f"id={id} score={score} temporary={temporary}")
0234         return retVal
0235 
0236     # get list of object tuples without dequeuing
0237     def peekmany(self, mode="first", minscore=None, maxscore=None, count=None, skip_item=False):
0238         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="peekmany")
0239         object_tuple_list = self.fifo.peekmany(mode, minscore, maxscore, count, skip_item)
0240         if not object_tuple_list:
0241             mainLog.debug("empty list")
0242         ret_list = []
0243         for object_tuple in object_tuple_list:
0244             id_gotten, item_serialized, score = object_tuple
0245             if item_serialized is None and score is None:
0246                 val_tuple = FifoObject(None, None, None)
0247             else:
0248                 if score is None:
0249                     score = time.time()
0250                 val_tuple = FifoObject(id, item_serialized, score)
0251             ret_list.append(val_tuple)
0252         mainLog.debug(f"mode={mode} minscore={minscore} maxscore={maxscore} count={count}")
0253         return ret_list
0254 
0255     # delete objects by list of ids from temporary space, return the number of objects successfully deleted
0256     def delete(self, ids):
0257         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="release")
0258         retVal = self.fifo.delete(ids)
0259         mainLog.debug(f"released {retVal} objects in {ids}")
0260         return retVal
0261 
0262     # restore objects by list of ids from temporary space to fifo; ids=None to restore all objects
0263     def restore(self, ids=None):
0264         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="restore")
0265         retVal = self.fifo.restore(ids)
0266         if ids is None:
0267             mainLog.debug("restored all objects")
0268         else:
0269             mainLog.debug(f"restored objects in {ids}")
0270         return retVal
0271 
0272     # update a object by its id with some conditions
0273     def update(self, id, item=None, score=None, temporary=None, cond_score="gt"):
0274         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="update")
0275         retVal = self.fifo.update(id, item, score, temporary, cond_score)
0276         update_report_list = []
0277         if item is not None:
0278             update_report_list.append(f"item={item}")
0279         if score is not None:
0280             update_report_list.append(f"score={score}")
0281         if temporary is not None:
0282             update_report_list.append(f"temporary={temporary}")
0283         update_report = " ".join(update_report_list)
0284         mainLog.debug(f"update id={id} cond_score={cond_score}: return={retVal}, {update_report}")
0285         return retVal
0286 
0287 
0288 # Special fifo base for non harvester-agent
0289 class SpecialFIFOBase(FIFOBase):
0290     # constructor
0291     def __init__(self, **kwarg):
0292         FIFOBase.__init__(self, **kwarg)
0293         self.fifoName = f"{self.titleName}_fifo"
0294         pluginConf = {}
0295         pluginConf.update({"titleName": self.titleName})
0296         pluginConf.update(
0297             {
0298                 "module": harvester_config.fifo.fifoModule,
0299                 "name": harvester_config.fifo.fifoClass,
0300             }
0301         )
0302         pluginFactory = PluginFactory()
0303         self.fifo = pluginFactory.get_plugin(pluginConf)
0304 
0305 
0306 # Management fifo class, for managing fifo database
0307 class ManagementFIFO(SpecialFIFOBase):
0308     titleName = "management"
0309 
0310     # clean up inactive tables from fifo database
0311     def cleanup_tables(self, age_sec=1209600):
0312         retVal = self.fifo.cleanup_tables(age_sec)
0313         return retVal
0314 
0315 
0316 # Benchmark fifo
0317 class BenchmarkFIFO(SpecialFIFOBase):
0318     titleName = "benchmark"
0319 
0320 
0321 # monitor fifo
0322 class MonitorFIFO(FIFOBase):
0323     titleName = "monitor"
0324 
0325     # constructor
0326     def __init__(self, **kwarg):
0327         FIFOBase.__init__(self, **kwarg)
0328         self._initialize_fifo()
0329 
0330     def populate(self, seconds_ago=0, clear_fifo=False):
0331         """
0332         Populate monitor fifo with all active worker chunks and timeNow as score from DB
0333         with modificationTime earlier than seconds_ago seconds ago
0334         object in fifo = [(queueName_1, [[worker_1_1], [worker_1_2], ...]), (queueName_2, ...)]
0335         """
0336         if clear_fifo:
0337             self.fifo.clear()
0338         try:
0339             fifoMaxWorkersToPopulate = self.config.fifoMaxWorkersToPopulate
0340         except AttributeError:
0341             fifoMaxWorkersToPopulate = 2**32
0342         try:
0343             fifoMaxWorkersPerChunk = self.config.fifoMaxWorkersPerChunk
0344         except AttributeError:
0345             fifoMaxWorkersPerChunk = 500
0346         workspec_iterator = self.dbProxy.get_active_workers(fifoMaxWorkersToPopulate, seconds_ago)
0347         last_queueName = None
0348         workspec_chunk = []
0349         timeNow_timestamp = time.time()
0350         score = timeNow_timestamp
0351         for workspec in workspec_iterator:
0352             workspec.set_work_params({"lastCheckAt": timeNow_timestamp})
0353             if last_queueName is None:
0354                 try:
0355                     score = timegm(workspec.modificationTime.utctimetuple())
0356                 except Exception:
0357                     pass
0358                 workspec_chunk = [[workspec]]
0359                 last_queueName = workspec.computingSite
0360             elif workspec.computingSite == last_queueName and len(workspec_chunk) < fifoMaxWorkersPerChunk:
0361                 workspec_chunk.append([workspec])
0362             else:
0363                 self.put((last_queueName, workspec_chunk), score)
0364                 try:
0365                     score = timegm(workspec.modificationTime.utctimetuple())
0366                 except Exception:
0367                     pass
0368                 workspec_chunk = [[workspec]]
0369                 last_queueName = workspec.computingSite
0370         if len(workspec_chunk) > 0:
0371             self.put((last_queueName, workspec_chunk), score)
0372 
0373     def to_check_workers(self, check_interval=harvester_config.monitor.checkInterval):
0374         """
0375         Justify whether to check any worker by the modificationTime of the first worker in fifo
0376         retVal True if OK to dequeue to check;
0377         retVal False otherwise.
0378         Return retVal, overhead_time
0379         """
0380         mainLog = self.make_logger(_logger, f"id={self.fifoName}-{self.get_pid()}", method_name="to_check_worker")
0381         retVal = False
0382         overhead_time = None
0383         timeNow_timestamp = time.time()
0384         peeked_tuple = self.peek(skip_item=True)
0385         if peeked_tuple is not None:
0386             score = peeked_tuple.score
0387             overhead_time = timeNow_timestamp - score
0388             if overhead_time > 0:
0389                 retVal = True
0390                 if score < 0:
0391                     mainLog.debug("True. Preempting")
0392                     overhead_time = None
0393                 else:
0394                     mainLog.debug("True")
0395                     mainLog.info(f"Overhead time is {overhead_time:.3f} sec")
0396             else:
0397                 mainLog.debug("False. Workers too young to check")
0398                 mainLog.debug(f"Overhead time is {overhead_time:.3f} sec")
0399         else:
0400             mainLog.debug("False. Got nothing in FIFO")
0401         return retVal, overhead_time
0402 
0403 
0404 class MonitorEventFIFO(SpecialFIFOBase):
0405     titleName = "monitorEvent"
0406 
0407     # constructor
0408     def __init__(self, **kwarg):
0409         self.config = getattr(harvester_config, "monitor")
0410         self.enabled = False
0411         if hasattr(self.config, "fifoEnable") and self.config.fifoEnable and getattr(self.config, "eventBasedEnable", False):
0412             self.enabled = True
0413         SpecialFIFOBase.__init__(self, **kwarg)