File indexing completed on 2026-04-20 07:58:59
0001 import re
0002 import threading
0003 import time
0004
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore.core_utils import SingletonWithID
0007 from pandaharvester.harvestercore.db_interface import DBInterface
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009
0010 harvesterID = harvester_config.master.harvester_id
0011 resolver_config = getattr(harvester_config.qconf, "resolverConfig", {})
0012
0013
0014 class PandaQueuesDict(dict, PluginBase, metaclass=SingletonWithID):
0015 """
0016 Dictionary of PanDA queue info from DB by cacher
0017 Key is PanDA Resource name (rather than PanDA Queue name)
0018 Able to query with either PanDA Queue name or PanDA Resource name
0019 """
0020
0021 candidate_per_core_attrs = (
0022 "maxrss",
0023 "minrss",
0024 "maxwdir",
0025 )
0026
0027 def __init__(self, **kwargs):
0028 dict.__init__(self)
0029 PluginBase.__init__(self, **kwargs)
0030 self.lock = threading.Lock()
0031 self.dbInterface = DBInterface()
0032 self.cacher_key = kwargs.get("cacher_key", "panda_queues.json")
0033 self.refresh_period = resolver_config.get("refreshPeriod", 300)
0034 self.last_refresh_ts = 0
0035 self._refresh()
0036
0037 def _is_fresh(self):
0038 now_ts = time.time()
0039 if self.last_refresh_ts + self.refresh_period > now_ts:
0040 return True
0041 return False
0042
0043 @staticmethod
0044 def has_value_in_catchall(panda_queues_dict, key):
0045 """
0046 Check if specific value is in catchall attributes
0047 """
0048 catchall_str = panda_queues_dict.get("catchall")
0049 if catchall_str is None:
0050 return False
0051 for tmp_key in catchall_str.split(","):
0052 tmp_match = re.search(f"^{key}(=|)*", tmp_key)
0053 if tmp_match is not None:
0054 return True
0055 return False
0056
0057 @staticmethod
0058 def use_per_core_attr(panda_queues_dict):
0059 """
0060 Check if treating all attributes as per-core
0061 """
0062 return PandaQueuesDict.has_value_in_catchall(panda_queues_dict, "per_core_attr")
0063
0064 def _refresh(self):
0065 with self.lock:
0066 if self._is_fresh():
0067 return
0068 panda_queues_cache = self.dbInterface.get_cache(self.cacher_key)
0069 if panda_queues_cache and isinstance(panda_queues_cache.data, dict):
0070 panda_queues_dict = panda_queues_cache.data
0071 for k, v in panda_queues_dict.items():
0072 try:
0073 panda_resource = v["panda_resource"]
0074 assert k == v["nickname"]
0075 except Exception:
0076 pass
0077 else:
0078 self[panda_resource] = v
0079
0080 if PandaQueuesDict.use_per_core_attr(v):
0081 core_count = v.get("corecount", 1)
0082 for attr in self.candidate_per_core_attrs:
0083 if attr in v and core_count > 0:
0084 v[attr] = v[attr] * core_count
0085
0086 self.last_refresh_ts = time.time()
0087 else:
0088
0089 self.last_refresh_ts = time.time() - self.refresh_period + 5
0090
0091 def to_refresh(func):
0092 """
0093 Decorator to refresh
0094 """
0095
0096 def wrapped_func(self, *args, **kwargs):
0097 self._refresh()
0098 return func(self, *args, **kwargs)
0099
0100 return wrapped_func
0101
0102 @to_refresh
0103 def __getitem__(self, panda_resource):
0104 if panda_resource in self:
0105 return dict.__getitem__(self, panda_resource)
0106 else:
0107 panda_queue = self.get_panda_queue_name(panda_resource)
0108 return dict.__getitem__(self, panda_queue)
0109
0110 @to_refresh
0111 def get(self, panda_resource, default=None):
0112 if panda_resource in self:
0113 return dict.get(self, panda_resource, default)
0114 else:
0115 panda_queue = self.get_panda_queue_name(panda_resource)
0116 return dict.get(self, panda_queue, default)
0117
0118 def get_panda_queue_name(self, panda_resource):
0119 """
0120 Return PanDA Queue name with specified PanDA Resource name
0121 """
0122 try:
0123 panda_queue = self.get(panda_resource).get("nickname")
0124 return panda_queue
0125 except Exception:
0126 return None
0127
0128
0129 def get_queue_status(self, panda_resource):
0130 panda_queue_dict = self.get(panda_resource)
0131 if panda_queue_dict is None:
0132 return None
0133
0134 if panda_queue_dict.get("pilot_manager") not in ["Harvester"] or panda_queue_dict.get("harvester") != harvesterID:
0135 return "offline"
0136 return panda_queue_dict["status"]
0137
0138
0139 @to_refresh
0140 def get_all_queue_names(self):
0141 names = set()
0142 for queue_name, queue_dict in self.items():
0143 if queue_dict.get("pilot_manager") in ["Harvester"] and queue_dict.get("harvester") == harvesterID:
0144 names.add(queue_name)
0145 return names
0146
0147
0148 def is_ups_queue(self, panda_resource):
0149 panda_queue_dict = self.get(panda_resource)
0150 if panda_queue_dict is None:
0151 return False
0152 if panda_queue_dict.get("capability") == "ucore" and panda_queue_dict.get("workflow") == "pull_ups":
0153 return True
0154 return False
0155
0156
0157 def is_grandly_unified_queue(self, panda_resource):
0158 panda_queue_dict = self.get(panda_resource)
0159 if panda_queue_dict is None:
0160 return False
0161
0162 if "grandly_unified" in panda_queue_dict.get("catchall") or panda_queue_dict.get("type") == "unified":
0163 return True
0164 return False
0165
0166
0167 def get_harvester_params(self, panda_resource):
0168 panda_queue_dict = self.get(panda_resource)
0169 if panda_queue_dict is None:
0170 return dict()
0171 else:
0172 return panda_queue_dict.get("params", dict())
0173
0174
0175 def get_harvester_template(self, panda_resource):
0176 panda_queue_dict = self.get(panda_resource)
0177 if panda_queue_dict is None:
0178 return None
0179 else:
0180 return panda_queue_dict.get("harvester_template", "")
0181
0182
0183 def get_type_workflow(self, panda_resource):
0184 panda_queue_dict = self.get(panda_resource)
0185 if panda_queue_dict is None:
0186 pq_type = None
0187 workflow = None
0188 else:
0189 pq_type = panda_queue_dict.get("type")
0190 if pq_type == "unified":
0191 pq_type = "production"
0192 workflow = panda_queue_dict.get("workflow")
0193 return pq_type, workflow
0194
0195 def get_prorated_maxwdir_GiB(self, panda_resource, worker_corecount):
0196 try:
0197 panda_queue_dict = self.get(panda_resource)
0198 maxwdir = panda_queue_dict.get("maxwdir") / 1024
0199 corecount = panda_queue_dict.get("corecount")
0200 if panda_queue_dict.get("capability") == "ucore":
0201 maxwdir_prorated = maxwdir * worker_corecount / corecount
0202 else:
0203 maxwdir_prorated = maxwdir
0204 except Exception:
0205 maxwdir_prorated = 0
0206
0207 return maxwdir_prorated