Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import re
0002 import threading
0003 import time
0004 
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore.core_utils import SingletonWithID
0007 from pandaharvester.harvestercore.db_interface import DBInterface
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 
0010 harvesterID = harvester_config.master.harvester_id
0011 resolver_config = getattr(harvester_config.qconf, "resolverConfig", {})
0012 
0013 
0014 class PandaQueuesDict(dict, PluginBase, metaclass=SingletonWithID):
0015     """
0016     Dictionary of PanDA queue info from DB by cacher
0017     Key is PanDA Resource name (rather than PanDA Queue name)
0018     Able to query with either PanDA Queue name or PanDA Resource name
0019     """
0020 
0021     candidate_per_core_attrs = (
0022         "maxrss",
0023         "minrss",
0024         "maxwdir",
0025     )
0026 
0027     def __init__(self, **kwargs):
0028         dict.__init__(self)
0029         PluginBase.__init__(self, **kwargs)
0030         self.lock = threading.Lock()
0031         self.dbInterface = DBInterface()
0032         self.cacher_key = kwargs.get("cacher_key", "panda_queues.json")
0033         self.refresh_period = resolver_config.get("refreshPeriod", 300)
0034         self.last_refresh_ts = 0
0035         self._refresh()
0036 
0037     def _is_fresh(self):
0038         now_ts = time.time()
0039         if self.last_refresh_ts + self.refresh_period > now_ts:
0040             return True
0041         return False
0042 
0043     @staticmethod
0044     def has_value_in_catchall(panda_queues_dict, key):
0045         """
0046         Check if specific value is in catchall attributes
0047         """
0048         catchall_str = panda_queues_dict.get("catchall")
0049         if catchall_str is None:
0050             return False
0051         for tmp_key in catchall_str.split(","):
0052             tmp_match = re.search(f"^{key}(=|)*", tmp_key)
0053             if tmp_match is not None:
0054                 return True
0055         return False
0056 
0057     @staticmethod
0058     def use_per_core_attr(panda_queues_dict):
0059         """
0060         Check if treating all attributes as per-core
0061         """
0062         return PandaQueuesDict.has_value_in_catchall(panda_queues_dict, "per_core_attr")
0063 
0064     def _refresh(self):
0065         with self.lock:
0066             if self._is_fresh():
0067                 return
0068             panda_queues_cache = self.dbInterface.get_cache(self.cacher_key)
0069             if panda_queues_cache and isinstance(panda_queues_cache.data, dict):
0070                 panda_queues_dict = panda_queues_cache.data
0071                 for k, v in panda_queues_dict.items():
0072                     try:
0073                         panda_resource = v["panda_resource"]
0074                         assert k == v["nickname"]
0075                     except Exception:
0076                         pass
0077                     else:
0078                         self[panda_resource] = v
0079                     # handle per-core attributes: scale with corecount if per-core
0080                     if PandaQueuesDict.use_per_core_attr(v):
0081                         core_count = v.get("corecount", 1)
0082                         for attr in self.candidate_per_core_attrs:
0083                             if attr in v and core_count > 0:
0084                                 v[attr] = v[attr] * core_count
0085                 # successfully refreshed from cache
0086                 self.last_refresh_ts = time.time()
0087             else:
0088                 # not getting cache; shorten next period into 5 sec
0089                 self.last_refresh_ts = time.time() - self.refresh_period + 5
0090 
0091     def to_refresh(func):
0092         """
0093         Decorator to refresh
0094         """
0095 
0096         def wrapped_func(self, *args, **kwargs):
0097             self._refresh()
0098             return func(self, *args, **kwargs)
0099 
0100         return wrapped_func
0101 
0102     @to_refresh
0103     def __getitem__(self, panda_resource):
0104         if panda_resource in self:
0105             return dict.__getitem__(self, panda_resource)
0106         else:
0107             panda_queue = self.get_panda_queue_name(panda_resource)
0108             return dict.__getitem__(self, panda_queue)
0109 
0110     @to_refresh
0111     def get(self, panda_resource, default=None):
0112         if panda_resource in self:
0113             return dict.get(self, panda_resource, default)
0114         else:
0115             panda_queue = self.get_panda_queue_name(panda_resource)
0116             return dict.get(self, panda_queue, default)
0117 
0118     def get_panda_queue_name(self, panda_resource):
0119         """
0120         Return PanDA Queue name with specified PanDA Resource name
0121         """
0122         try:
0123             panda_queue = self.get(panda_resource).get("nickname")
0124             return panda_queue
0125         except Exception:
0126             return None
0127 
0128     # get queue status for auto blacklisting
0129     def get_queue_status(self, panda_resource):
0130         panda_queue_dict = self.get(panda_resource)
0131         if panda_queue_dict is None:
0132             return None
0133         # offline if not with harvester or not of this harvester instance
0134         if panda_queue_dict.get("pilot_manager") not in ["Harvester"] or panda_queue_dict.get("harvester") != harvesterID:
0135             return "offline"
0136         return panda_queue_dict["status"]
0137 
0138     # get all queue names of this harvester instance
0139     @to_refresh
0140     def get_all_queue_names(self):
0141         names = set()
0142         for queue_name, queue_dict in self.items():
0143             if queue_dict.get("pilot_manager") in ["Harvester"] and queue_dict.get("harvester") == harvesterID:
0144                 names.add(queue_name)
0145         return names
0146 
0147     # is UPS queue
0148     def is_ups_queue(self, panda_resource):
0149         panda_queue_dict = self.get(panda_resource)
0150         if panda_queue_dict is None:
0151             return False
0152         if panda_queue_dict.get("capability") == "ucore" and panda_queue_dict.get("workflow") == "pull_ups":
0153             return True
0154         return False
0155 
0156     # is grandly unified queue, i.e. runs analysis and production
0157     def is_grandly_unified_queue(self, panda_resource):
0158         panda_queue_dict = self.get(panda_resource)
0159         if panda_queue_dict is None:
0160             return False
0161         # initial, temporary nomenclature
0162         if "grandly_unified" in panda_queue_dict.get("catchall") or panda_queue_dict.get("type") == "unified":
0163             return True
0164         return False
0165 
0166     # get harvester params
0167     def get_harvester_params(self, panda_resource):
0168         panda_queue_dict = self.get(panda_resource)
0169         if panda_queue_dict is None:
0170             return dict()
0171         else:
0172             return panda_queue_dict.get("params", dict())
0173 
0174     # get harvester_template
0175     def get_harvester_template(self, panda_resource):
0176         panda_queue_dict = self.get(panda_resource)
0177         if panda_queue_dict is None:
0178             return None
0179         else:
0180             return panda_queue_dict.get("harvester_template", "")
0181 
0182     # get a tuple of type (production, analysis, etc.) and workflow
0183     def get_type_workflow(self, panda_resource):
0184         panda_queue_dict = self.get(panda_resource)
0185         if panda_queue_dict is None:
0186             pq_type = None
0187             workflow = None
0188         else:
0189             pq_type = panda_queue_dict.get("type")
0190             if pq_type == "unified":  # use production templates
0191                 pq_type = "production"
0192             workflow = panda_queue_dict.get("workflow")
0193         return pq_type, workflow
0194 
0195     def get_prorated_maxwdir_GiB(self, panda_resource, worker_corecount):
0196         try:
0197             panda_queue_dict = self.get(panda_resource)
0198             maxwdir = panda_queue_dict.get("maxwdir") / 1024  # convert to GiB
0199             corecount = panda_queue_dict.get("corecount")
0200             if panda_queue_dict.get("capability") == "ucore":
0201                 maxwdir_prorated = maxwdir * worker_corecount / corecount
0202             else:
0203                 maxwdir_prorated = maxwdir
0204         except Exception:
0205             maxwdir_prorated = 0
0206 
0207         return maxwdir_prorated