Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:02

0001 import copy
0002 import datetime
0003 import importlib
0004 import json
0005 import os
0006 import threading
0007 from json.decoder import JSONDecodeError
0008 
0009 from pandaharvester.harvesterconfig import harvester_config
0010 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0011 
0012 from . import core_utils
0013 from .core_utils import SingletonWithID
0014 from .db_interface import DBInterface
0015 from .db_proxy_pool import DBProxyPool as DBProxy
0016 from .panda_queue_spec import PandaQueueSpec
0017 from .plugin_factory import PluginFactory
0018 from .queue_config_dump_spec import QueueConfigDumpSpec
0019 from .work_spec import WorkSpec
0020 
0021 # logger
0022 _logger = core_utils.setup_logger("queue_config_mapper")
0023 _dbInterface = DBInterface()
0024 
0025 
0026 # make logger
0027 def _make_logger(base_log=_logger, token=None, method_name=None, send_dialog=True):
0028     if send_dialog and _dbInterface:
0029         hook = _dbInterface
0030     else:
0031         hook = None
0032     return core_utils.make_logger(base_log, token=token, method_name=method_name, hook=hook)
0033 
0034 
0035 # class for queue config
0036 class QueueConfig(object):
0037     def __init__(self, queue_name):
0038         self.queueName = queue_name
0039         self.pandaQueueName = None
0040         self.prodSourceLabel = "managed"
0041         # default parameters
0042         self.mapType = WorkSpec.MT_OneToOne
0043         self.useJobLateBinding = False
0044         self.zipPerMB = None
0045         self.siteName = ""
0046         self.siteFamily = ""
0047         self.maxWorkers = 0
0048         self.nNewWorkers = 0
0049         self.maxNewWorkersPerCycle = 0
0050         self.noHeartbeat = ""
0051         self.runMode = "self"
0052         self.resourceType = PandaQueueSpec.RT_catchall
0053         self.jobType = PandaQueueSpec.JT_catchall
0054         self.getJobCriteria = None
0055         self.ddmEndpointIn = None
0056         self.allowJobMixture = False
0057         self.maxSubmissionAttempts = 3
0058         self.truePilot = False
0059         self.queueStatus = None
0060         self.prefetchEvents = True
0061         self.uniqueName = None
0062         self.configID = None
0063         self.initEventsMultipler = 2
0064 
0065     # get list of status without heartbeat
0066     def get_no_heartbeat_status(self):
0067         return self.noHeartbeat.split(",")
0068 
0069     # check if status without heartbeat
0070     def is_no_heartbeat_status(self, status):
0071         return status in self.get_no_heartbeat_status()
0072 
0073     # get prodSourceLabel
0074     def get_source_label(self, job_type=None, is_gu=None):
0075         # if queue is in test status, only submit workers for HC jobs
0076         # No longer need this as panda only dispatches test jobs when PQ in test
0077         # if self.queueStatus == 'test':
0078         #     return 'test'
0079 
0080         # grandly unified queues: prodsourcelabel in job has precedence over queue prodsourcelabel
0081         if job_type in ("user", "panda"):
0082             return "user"
0083 
0084         # grandly unified queues: call to getJobs should not request for a particular prodSourceLabel
0085         if is_gu:
0086             return "unified"
0087 
0088         return self.prodSourceLabel
0089 
0090     # set unique name
0091     def set_unique_name(self):
0092         self.uniqueName = core_utils.get_unique_queue_name(self.queueName, self.resourceType, self.prodSourceLabel)
0093 
0094     # update attributes
0095     def update_attributes(self, data):
0096         for k, v in data.items():
0097             setattr(self, k, v)
0098 
0099     # get synchronization level between job and worker
0100     def get_synchronization_level(self):
0101         if self.mapType == WorkSpec.MT_NoJob or self.truePilot or self.is_no_heartbeat_status("finished"):
0102             return 1
0103         return None
0104 
0105     # str
0106     def __str__(self):
0107         header = self.queueName + "\n" + "-" * len(self.queueName) + "\n"
0108         tmpStr = ""
0109         pluginStr = ""
0110         keys = sorted(self.__dict__.keys())
0111         for key in keys:
0112             val = self.__dict__[key]
0113             if isinstance(val, dict):
0114                 pluginStr += f" {key} :\n"
0115                 pKeys = sorted(val.keys())
0116                 for pKey in pKeys:
0117                     pVal = val[pKey]
0118                     pluginStr += f"  {pKey} = {pVal}\n"
0119             else:
0120                 tmpStr += f" {key} = {val}\n"
0121         return header + tmpStr + pluginStr
0122 
0123 
0124 # mapper
0125 class QueueConfigMapper(metaclass=SingletonWithID):
0126     """
0127     Using some acronyms here:
0128     LT = local template, written in local queueconfig file
0129     RT = remote template, on http source fetched by cacher
0130     FT = final template of a queue
0131     LQ = local queue configuration, written in local queueconfig file
0132     RQ = remote queue configuration, on http source fetched by cacher, static
0133     DQ = dynamic queue configuration, configured with information from resolver
0134 
0135     Relations:
0136     FT = LT if existing else RT
0137     FQ = FT updated with RQ, then updated with DQ, then updated with LQ
0138     """
0139 
0140     mandatory_attrs = set(
0141         [
0142             "messenger",
0143             "monitor",
0144             "preparator",
0145             "stager",
0146             "submitter",
0147             "sweeper",
0148             "workerMaker",
0149         ]
0150     )
0151     queue_limit_attrs = set(
0152         [
0153             "maxWorkers",
0154             "maxNewWorkersPerCycle",
0155             "nQueueLimitWorker",
0156             "nQueueLimitWorkerMax",
0157             "nQueueLimitWorkerRatio",
0158             "nQueueLimitWorkerMin",
0159             "nQueueLimitWorkerCores",
0160             "nQueueLimitWorkerCoresRatio",
0161             "nQueueLimitWorkerCoresMin",
0162             "nQueueLimitWorkerMemory",
0163             "nQueueLimitWorkerMemoryRatio",
0164             "nQueueLimitWorkerMemoryMin",
0165             "nQueueLimitJob",
0166             "nQueueLimitJobMax",
0167             "nQueueLimitJobRatio",
0168             "nQueueLimitJobMin",
0169             "nQueueLimitJobCores",
0170             "nQueueLimitJobCoresRatio",
0171             "nQueueLimitJobCoresMin",
0172         ]
0173     )
0174     updatable_plugin_attrs = set(
0175         ["common", "messenger", "monitor", "preparator", "stager", "submitter", "sweeper", "workerMaker", "throttler", "zipper", "aux_preparator", "extractor"]
0176     )
0177 
0178     # constructor
0179     def __init__(self, update_db=True):
0180         self.lock = threading.Lock()
0181         self.lastUpdate = None
0182         self.lastReload = None
0183         self.lastCheck = None
0184         self.last_cache_ts = None
0185         self.dbProxy = DBProxy()
0186         self.toUpdateDB = update_db
0187         try:
0188             self.configFromCacher = harvester_config.qconf.configFromCacher
0189         except AttributeError:
0190             self.configFromCacher = False
0191         try:
0192             self.updateInterval = harvester_config.qconf.updateInterval
0193         except AttributeError:
0194             self.updateInterval = 600
0195         try:
0196             self.checkInterval = harvester_config.qconf.checkInterval
0197         except AttributeError:
0198             self.checkInterval = 5
0199         finally:
0200             self.checkInterval = min(self.checkInterval, self.updateInterval)
0201 
0202     # load config from DB cache of URL with validation
0203     def _load_config_from_cache(self):
0204         mainLog = _make_logger(token=f"id={core_utils.get_pid()}", method_name="_load_config_from_cache")
0205         # load config json on URL
0206         if self.configFromCacher:
0207             queueConfig_cacheSpec = self.dbProxy.get_cache("queues_config_file", from_local_cache=False)
0208             if queueConfig_cacheSpec is not None:
0209                 queueConfigJson = queueConfig_cacheSpec.data
0210                 if isinstance(queueConfigJson, dict):
0211                     return queueConfigJson, queueConfig_cacheSpec.lastUpdate
0212                 else:
0213                     mainLog.error("Invalid JSON in cache queues_config_file. Skipped")
0214             else:
0215                 mainLog.debug("queues config not fount in cache. Skipped")
0216         else:
0217             mainLog.debug("queues config URL not set. Skipped")
0218         return None, None
0219 
0220     # load config from local json file with syntax validation
0221     @staticmethod
0222     def _load_config_from_file():
0223         mainLog = _make_logger(token=f"id={core_utils.get_pid()}", method_name="_load_config_from_file")
0224         # define config file path
0225         if os.path.isabs(harvester_config.qconf.configFile):
0226             confFilePath = harvester_config.qconf.configFile
0227         else:
0228             # check if in PANDA_HOME
0229             confFilePath = None
0230             if "PANDA_HOME" in os.environ:
0231                 confFilePath = os.path.join(os.environ["PANDA_HOME"], "etc/panda", harvester_config.qconf.configFile)
0232                 if not os.path.exists(confFilePath):
0233                     confFilePath = None
0234             # look into /etc/panda
0235             if confFilePath is None:
0236                 confFilePath = os.path.join("/etc/panda", harvester_config.qconf.configFile)
0237         # load from config file
0238         try:
0239             with open(confFilePath) as f:
0240                 queueConfigJson = json.load(f)
0241         except OSError as e:
0242             mainLog.error(f"Cannot read file: {confFilePath} ; {e}")
0243             return None
0244         except JSONDecodeError as e:
0245             mainLog.error(f"Invalid JSON in file: {confFilePath} ; {e}")
0246             return None
0247         return queueConfigJson
0248 
0249     # get resolver module
0250     @staticmethod
0251     def _get_resolver():
0252         if hasattr(harvester_config.qconf, "resolverModule") and hasattr(harvester_config.qconf, "resolverClass"):
0253             pluginConf = {"module": harvester_config.qconf.resolverModule, "name": harvester_config.qconf.resolverClass}
0254             pluginFactory = PluginFactory()
0255             resolver = pluginFactory.get_plugin(pluginConf)
0256         else:
0257             resolver = None
0258         return resolver
0259 
0260     # update last reload time
0261     def _update_last_reload_time(self, the_time=None):
0262         # update timestamp of last reload, lock with check interval
0263         got_update_lock = self.dbProxy.get_process_lock("qconf_reload", "qconf_universal", self.updateInterval)
0264         if got_update_lock:
0265             if the_time is None:
0266                 the_time = core_utils.naive_utcnow()
0267             ts = the_time.timestamp()
0268             new_ts_info = f"{ts:.3f}"
0269             ret_val = self.dbProxy.refresh_cache("_qconf_last_reload", "_universal", new_ts_info)
0270             if ret_val:
0271                 return True
0272             else:
0273                 return False
0274         else:
0275             return None
0276 
0277     # get last reload time
0278     def _get_last_reload_time(self):
0279         cacheSpec = self.dbProxy.get_cache("_qconf_last_reload", "_universal", from_local_cache=False)
0280         if cacheSpec is None:
0281             return None
0282         timestamp = float(cacheSpec.data)
0283         return core_utils.naive_utcfromtimestamp(timestamp)
0284 
0285     # update last pq_table fill time
0286     def _update_pq_table(self, cache_time=None, refill_table=False):
0287         # update timestamp of last reload, lock with check interval
0288         got_update_lock = self.dbProxy.get_process_lock("pq_table_fill", "qconf_universal", 120)
0289         if got_update_lock:
0290             fill_ret_val = self.dbProxy.fill_panda_queue_table(self.activeQueues.keys(), self, refill_table=refill_table)
0291             now_time = core_utils.naive_utcnow()
0292             if fill_ret_val:
0293                 now_ts = now_time.timestamp()
0294                 now_ts_info = f"{now_ts:.3f}"
0295                 self.dbProxy.refresh_cache("_pq_table_last_fill", "_universal", now_ts_info)
0296                 if cache_time:
0297                     cache_ts = cache_time.timestamp()
0298                     cache_ts_info = f"{cache_ts:.3f}"
0299                     self.dbProxy.refresh_cache("_cache_to_fill_pq_table", "_universal", cache_ts_info)
0300             self.dbProxy.release_process_lock("pq_table_fill", "qconf_universal")
0301             if fill_ret_val:
0302                 return True
0303             else:
0304                 return False
0305         else:
0306             return None
0307 
0308     # get last pq_table fill time
0309     def _get_last_pq_table_fill_time(self):
0310         cacheSpec = self.dbProxy.get_cache("_pq_table_last_fill", "_universal", from_local_cache=False)
0311         if cacheSpec is None:
0312             return None
0313         timestamp = float(cacheSpec.data)
0314         return core_utils.naive_utcfromtimestamp(timestamp)
0315 
0316     # get time of last cache used to fill pq_table
0317     def _get_cache_to_fill_pq_table_time(self):
0318         cacheSpec = self.dbProxy.get_cache("_cache_to_fill_pq_table", "_universal", from_local_cache=False)
0319         if cacheSpec is None:
0320             return None
0321         timestamp = float(cacheSpec.data)
0322         return core_utils.naive_utcfromtimestamp(timestamp)
0323 
0324     # load data
0325     def load_data(self, refill_table=False):
0326         mainLog = _make_logger(token=f"id={core_utils.get_pid()}", method_name="load_data")
0327         # check if to update
0328         with self.lock:
0329             now_time = core_utils.naive_utcnow()
0330             updateInterval_td = datetime.timedelta(seconds=self.updateInterval)
0331             checkInterval_td = datetime.timedelta(seconds=self.checkInterval)
0332             # skip if lastCheck is fresh (within checkInterval)
0333             if self.lastCheck is not None and now_time - self.lastCheck < checkInterval_td:
0334                 return
0335             self.lastCheck = now_time
0336             # get last_reload_timestamp from DB
0337             self.lastReload = self._get_last_reload_time()
0338             # get last_pq_table_fill_time from DB
0339             self.last_pq_table_fill_time = self._get_last_pq_table_fill_time()
0340             # get time of last cache used to fill pq_table from DB
0341             self.cache_to_fill_pq_table_time = self._get_cache_to_fill_pq_table_time()
0342             # skip if min_last_reload is fresh and lastUpdate fresher than lastReload (within updateInterval)
0343             if self.lastReload and self.lastUpdate:
0344                 min_last_reload = self.lastReload
0345                 if self.last_cache_ts:
0346                     min_last_reload = min(min_last_reload, self.last_cache_ts + updateInterval_td * 0.25)
0347                 if self.lastReload < self.lastUpdate and now_time - min_last_reload < updateInterval_td:
0348                     return
0349         # start
0350         with self.lock:
0351             # update timestamp of last reload, lock with check interval
0352             now_time = core_utils.naive_utcnow()
0353             update_last_reload_ret_val = self._update_last_reload_time(now_time)
0354             if update_last_reload_ret_val:
0355                 self.lastReload = now_time
0356                 mainLog.debug("updated last reload timestamp")
0357             elif update_last_reload_ret_val is None:
0358                 mainLog.debug("did not get qconf_reload timestamp lock. Skipped to update last reload timestamp")
0359             else:
0360                 mainLog.warning("failed to update last reload timestamp. Skipped")
0361             # init
0362             newQueueConfig = dict()
0363             localTemplatesDict = dict()
0364             remoteTemplatesDict = dict()
0365             finalTemplatesDict = dict()
0366             localQueuesDict = dict()
0367             remoteQueuesDict = dict()
0368             dynamicQueuesDict = dict()
0369             allQueuesNameList = set()
0370             getQueuesDynamic = False
0371             invalidQueueList = set()
0372             pandaQueueDict = PandaQueuesDict()
0373             # get resolver
0374             resolver = self._get_resolver()
0375             if resolver is None:
0376                 mainLog.debug("No resolver is configured")
0377             # load config json from cacher (RT & RQ)
0378             queueConfigJson_cacher, self.last_cache_ts = self._load_config_from_cache()
0379             if self.last_cache_ts and self.cache_to_fill_pq_table_time and (self.last_cache_ts < self.cache_to_fill_pq_table_time):
0380                 # cacher data outdated compared with pq_table fill time; warn
0381                 mainLog.warning(f"Found cacher data outdated ({str(self.last_cache_ts)} < {str(self.cache_to_fill_pq_table_time)})")
0382             if queueConfigJson_cacher is not None:
0383                 mainLog.debug("Applying cacher data")
0384                 for queueName, queueDict in queueConfigJson_cacher.items():
0385                     if queueDict.get("isTemplateQueue") is True or queueName.endswith("_TEMPLATE"):
0386                         # is RT
0387                         queueDict["isTemplateQueue"] = True
0388                         queueDict.pop("templateQueueName", None)
0389                         remoteTemplatesDict[queueName] = queueDict
0390                     else:
0391                         # is RQ
0392                         queueDict["isTemplateQueue"] = False
0393                         remoteQueuesDict[queueName] = queueDict
0394             # load config from local json file (LT & LQ)
0395             queueConfigJson_local = self._load_config_from_file()
0396             if queueConfigJson_local is not None:
0397                 mainLog.debug("Applying local config")
0398                 for queueName, queueDict in queueConfigJson_local.items():
0399                     if queueDict.get("isTemplateQueue") is True or queueName.endswith("_TEMPLATE"):
0400                         # is LT
0401                         queueDict["isTemplateQueue"] = True
0402                         queueDict.pop("templateQueueName", None)
0403                         localTemplatesDict[queueName] = queueDict
0404                     else:
0405                         # is LQ
0406                         queueDict["isTemplateQueue"] = False
0407                         localQueuesDict[queueName] = queueDict
0408             else:
0409                 mainLog.warning("Failed to load config from local json file. Skipped")
0410             # fill in final template (FT)
0411             finalTemplatesDict.update(remoteTemplatesDict)
0412             finalTemplatesDict.update(localTemplatesDict)
0413             finalTemplatesDict.pop(None, None)
0414             # remove queues with invalid templateQueueName
0415             for acr, queuesDict in [("RQ", remoteQueuesDict), ("LQ", localQueuesDict)]:
0416                 for queueName, queueDict in queuesDict.copy().items():
0417                     templateQueueName = queueDict.get("templateQueueName")
0418                     if templateQueueName is not None and templateQueueName not in finalTemplatesDict:
0419                         del queuesDict[queueName]
0420                         mainLog.warning(f'Invalid templateQueueName "{templateQueueName}" for {queueName} ({acr}). Skipped')
0421             # get queue names from resolver and fill in dynamic queue (DQ)
0422             if resolver is not None and "DYNAMIC" in harvester_config.qconf.queueList:
0423                 getQueuesDynamic = True
0424                 dynamicQueuesNameList = resolver.get_all_queue_names()
0425                 for queueName in dynamicQueuesNameList.copy():
0426                     queueDict = dict()
0427                     # template and default template via workflow
0428                     templateQueueName = None
0429                     resolver_harvester_template = None
0430                     if resolver is not None:
0431                         resolver_harvester_template = resolver.get_harvester_template(queueName)
0432                         resolver_type, resolver_workflow = resolver.get_type_workflow(queueName)
0433                     if resolver_harvester_template:
0434                         templateQueueName = resolver_harvester_template
0435                     elif not (resolver_type is None or resolver_workflow is None):
0436                         templateQueueName = f"{resolver_type}.{resolver_workflow}"
0437                     else:
0438                         templateQueueName = harvester_config.qconf.defaultTemplateQueueName
0439                     if templateQueueName not in finalTemplatesDict:
0440                         # remove queues with invalid templateQueueName
0441                         dynamicQueuesNameList.discard(queueName)
0442                         mainLog.warning(f'Invalid templateQueueName "{templateQueueName}" for {queueName} (DQ). Skipped')
0443                         continue
0444                     # parameters
0445                     resolver_harvester_params = resolver.get_harvester_params(queueName)
0446                     for key, val in resolver_harvester_params.items():
0447                         if key in self.queue_limit_attrs:
0448                             queueDict[key] = val
0449                     # fill in dynamic queue configs
0450                     queueDict["templateQueueName"] = templateQueueName
0451                     queueDict["isTemplateQueue"] = False
0452                     dynamicQueuesDict[queueName] = queueDict
0453             # fill in all queue name list (names of RQ + DQ + LQ)
0454             allQueuesNameList |= set(remoteQueuesDict)
0455             allQueuesNameList |= set(dynamicQueuesDict)
0456             allQueuesNameList |= set(localQueuesDict)
0457             allQueuesNameList.discard(None)
0458             # set attributes
0459             for queueName in allQueuesNameList:
0460                 # sources or queues and templates
0461                 queueSourceList = []
0462                 templateSourceList = []
0463                 # prepare templateQueueName
0464                 templateQueueName = None
0465                 for queuesDict in [remoteQueuesDict, dynamicQueuesDict, localQueuesDict]:
0466                     if queueName not in queuesDict:
0467                         continue
0468                     tmp_queueDict = queuesDict[queueName]
0469                     tmp_templateQueueName = tmp_queueDict.get("templateQueueName")
0470                     if tmp_templateQueueName is not None:
0471                         templateQueueName = tmp_templateQueueName
0472                 # prepare queueDict
0473                 queueDict = dict()
0474                 if templateQueueName in finalTemplatesDict:
0475                     queueDict.update(copy.deepcopy(finalTemplatesDict[templateQueueName]))
0476                 for acr, templatesDict in [("RT", remoteTemplatesDict), ("LT", localTemplatesDict)]:
0477                     if templateQueueName in templatesDict:
0478                         templateSourceList.append(acr)
0479                 # update queueDict
0480                 for acr, queuesDict in [("RQ", remoteQueuesDict), ("DQ", dynamicQueuesDict), ("LQ", localQueuesDict)]:
0481                     if queueName not in queuesDict:
0482                         continue
0483                     queueSourceList.append(acr)
0484                     tmp_queueDict = queuesDict[queueName]
0485                     for key, val in tmp_queueDict.items():
0486                         val = copy.deepcopy(val)
0487                         if key in self.updatable_plugin_attrs and isinstance(queueDict.get(key), dict) and isinstance(val, dict):
0488                             # update plugin parameters instead of overwriting whole plugin section
0489                             queueDict[key].update(val)
0490                         else:
0491                             queueDict[key] = val
0492                 # record sources of the queue config and its templates in log
0493                 if templateQueueName:
0494                     mainLog.debug(
0495                         f"queue {queueName} comes from {','.join(queueSourceList)} (with template {templateQueueName} from {','.join(templateSourceList)})"
0496                     )
0497                 else:
0498                     mainLog.debug(f"queue {queueName} comes from {','.join(queueSourceList)}")
0499                 # prepare queueConfig
0500                 if queueName in newQueueConfig:
0501                     queueConfig = newQueueConfig[queueName]
0502                 else:
0503                     queueConfig = QueueConfig(queueName)
0504                 # queueName = siteName/resourceType
0505                 queueConfig.siteName = queueConfig.queueName.split("/")[0]
0506                 if queueConfig.siteName != queueConfig.queueName:
0507                     queueConfig.resourceType = queueConfig.queueName.split("/")[-1]
0508                 if not queueConfig.siteFamily:
0509                     queueConfig.siteFamily = queueConfig.siteName
0510                 # get common attributes
0511                 commonAttrDict = dict()
0512                 if isinstance(queueDict.get("common"), dict):
0513                     commonAttrDict = queueDict.get("common")
0514                 # according to queueDict
0515                 for key, val in queueDict.items():
0516                     if isinstance(val, dict) and "module" in val and "name" in val:
0517                         # plugin attributes
0518                         val = copy.deepcopy(val)
0519                         # fill in common attributes for all plugins
0520                         for c_key, c_val in commonAttrDict.items():
0521                             if c_key not in val and c_key not in ("module", "name"):
0522                                 val[c_key] = c_val
0523                         # check module and class name
0524                         try:
0525                             _t3mP_1Mp0R7_mO6U1e__ = importlib.import_module(val["module"])
0526                             _t3mP_1Mp0R7_N4m3__ = getattr(_t3mP_1Mp0R7_mO6U1e__, val["name"])
0527                         except Exception as _e:
0528                             invalidQueueList.add(queueConfig.queueName)
0529                             mainLog.error(f"Module or class not found. Omitted {queueConfig.queueName} in queue config ({_e})")
0530                             continue
0531                         else:
0532                             del _t3mP_1Mp0R7_mO6U1e__
0533                             del _t3mP_1Mp0R7_N4m3__
0534                         # fill in siteName and queueName
0535                         if "siteName" not in val:
0536                             val["siteName"] = queueConfig.siteName
0537                         if "queueName" not in val:
0538                             val["queueName"] = queueConfig.queueName
0539                         # middleware
0540                         if "middleware" in val and val["middleware"] in queueDict:
0541                             # keep original config
0542                             val["original_config"] = copy.deepcopy(val)
0543                             # overwrite with middleware config
0544                             for m_key, m_val in queueDict[val["middleware"]].items():
0545                                 val[m_key] = m_val
0546                     setattr(queueConfig, key, val)
0547                 # delete isTemplateQueue attribute
0548                 try:
0549                     if getattr(queueConfig, "isTemplateQueue"):
0550                         mainLog.error(f"Internal error: isTemplateQueue is True. Omitted {queueConfig.queueName} in queue config")
0551                         invalidQueueList.add(queueConfig.queueName)
0552                     else:
0553                         delattr(queueConfig, "isTemplateQueue")
0554                 except AttributeError as _e:
0555                     mainLog.error(f'Internal error with attr "isTemplateQueue". Omitted {queueConfig.queueName} in queue config ({_e})')
0556                     invalidQueueList.add(queueConfig.queueName)
0557                 # get Panda Queue Name
0558                 if resolver is not None:
0559                     queueConfig.pandaQueueName = resolver.get_panda_queue_name(queueConfig.siteName)
0560                 # additional criteria for getJob
0561                 if queueConfig.getJobCriteria is not None:
0562                     tmpCriteria = dict()
0563                     for tmpItem in queueConfig.getJobCriteria.split(","):
0564                         tmpKey, tmpVal = tmpItem.split("=")
0565                         tmpCriteria[tmpKey] = tmpVal
0566                     if len(tmpCriteria) == 0:
0567                         queueConfig.getJobCriteria = None
0568                     else:
0569                         queueConfig.getJobCriteria = tmpCriteria
0570                 # nullify all job limit attributes if NoJob mapType (PULL)
0571                 if queueConfig.mapType == WorkSpec.MT_NoJob:
0572                     for attName in [
0573                         "nQueueLimitJob",
0574                         "nQueueLimitJobRatio",
0575                         "nQueueLimitJobMax",
0576                         "nQueueLimitJobMin",
0577                         "nQueueLimitJobCores",
0578                         "nQueueLimitJobCoresRatio",
0579                         "nQueueLimitJobCoresMin",
0580                     ]:
0581                         setattr(queueConfig, attName, None)
0582                 # nullify worker ratio limit attributes if jobful mapTypes (PUSH)
0583                 if queueConfig.mapType != WorkSpec.MT_NoJob:
0584                     for attName in ["nQueueLimitWorkerRatio", "nQueueLimitWorkerMin"]:
0585                         setattr(queueConfig, attName, None)
0586                 # sanity for worker and job limit attributes
0587                 for key in self.queue_limit_attrs:
0588                     val = getattr(queueConfig, key, None)
0589                     if val is not None:
0590                         if not isinstance(val, int):
0591                             setattr(queueConfig, key, None)
0592                         elif val < 0:
0593                             setattr(queueConfig, key, 0)
0594                 if getattr(queueConfig, "maxWorkers", None) is None:
0595                     queueConfig.maxWorkers = 0
0596                 if getattr(queueConfig, "nQueueLimitWorker", None) is not None and getattr(queueConfig, "nQueueLimitWorkerMax", None) is not None:
0597                     max_queue_workers = min(queueConfig.nQueueLimitWorker, queueConfig.nQueueLimitWorkerMax, queueConfig.maxWorkers)
0598                     queueConfig.nQueueLimitWorker = max_queue_workers
0599                     queueConfig.nQueueLimitWorkerMax = max_queue_workers
0600                 elif getattr(queueConfig, "nQueueLimitWorker", None) is not None:
0601                     queueConfig.nQueueLimitWorker = min(queueConfig.nQueueLimitWorker, queueConfig.maxWorkers)
0602                 elif getattr(queueConfig, "nQueueLimitWorkerMax", None) is not None:
0603                     queueConfig.nQueueLimitWorkerMax = min(queueConfig.nQueueLimitWorkerMax, queueConfig.maxWorkers)
0604                     queueConfig.nQueueLimitWorker = queueConfig.nQueueLimitWorkerMax
0605                 if getattr(queueConfig, "nQueueLimitWorkerMin", None) is not None and getattr(queueConfig, "nQueueLimitWorker", None) is not None:
0606                     queueConfig.nQueueLimitWorkerMin = min(queueConfig.nQueueLimitWorkerMin, queueConfig.nQueueLimitWorker, queueConfig.maxWorkers)
0607                 if getattr(queueConfig, "nQueueLimitWorkerCoresMin", None) is not None and getattr(queueConfig, "nQueueLimitWorkerCores", None) is not None:
0608                     queueConfig.nQueueLimitWorkerCoresMin = min(queueConfig.nQueueLimitWorkerCoresMin, queueConfig.nQueueLimitWorkerCores)
0609                 if getattr(queueConfig, "nQueueLimitWorkerMemoryMin", None) is not None and getattr(queueConfig, "nQueueLimitWorkerMemory", None) is not None:
0610                     queueConfig.nQueueLimitWorkerMemoryMin = min(queueConfig.nQueueLimitWorkerMemoryMin, queueConfig.nQueueLimitWorkerMemory)
0611                 if getattr(queueConfig, "nQueueLimitJob", None) is not None and getattr(queueConfig, "nQueueLimitJobMax", None) is not None:
0612                     max_queue_jobs = min(queueConfig.nQueueLimitJob, queueConfig.nQueueLimitJobMax)
0613                     queueConfig.nQueueLimitJob = max_queue_jobs
0614                     queueConfig.nQueueLimitJobMax = max_queue_jobs
0615                 elif getattr(queueConfig, "nQueueLimitJobMax", None) is not None:
0616                     queueConfig.nQueueLimitJob = queueConfig.nQueueLimitJobMax
0617                 if getattr(queueConfig, "nQueueLimitJobMin", None) is not None and getattr(queueConfig, "nQueueLimitJob", None) is not None:
0618                     queueConfig.nQueueLimitJobMin = min(queueConfig.nQueueLimitJobMin, queueConfig.nQueueLimitJob)
0619                 if getattr(queueConfig, "nQueueLimitJobCoresMin", None) is not None and getattr(queueConfig, "nQueueLimitJobCores", None) is not None:
0620                     queueConfig.nQueueLimitJobCoresMin = min(queueConfig.nQueueLimitJobCoresMin, queueConfig.nQueueLimitJobCores)
0621                 # heartbeat suppression
0622                 if queueConfig.truePilot and queueConfig.noHeartbeat == "":
0623                     queueConfig.noHeartbeat = "running,transferring,finished,failed"
0624                 # set unique name
0625                 queueConfig.set_unique_name()
0626                 # put into new queue configs
0627                 newQueueConfig[queueName] = queueConfig
0628                 # Check existence of mandatory attributes
0629                 if queueName in newQueueConfig:
0630                     queueConfig = newQueueConfig[queueName]
0631                     missing_attr_list = []
0632                     for _attr in self.mandatory_attrs:
0633                         if not hasattr(queueConfig, _attr):
0634                             invalidQueueList.add(queueConfig.queueName)
0635                             missing_attr_list.append(_attr)
0636                     if missing_attr_list:
0637                         mainLog.error(f"Missing mandatory attributes {','.join(missing_attr_list)} . Omitted {queueConfig.queueName} in queue config")
0638             # delete invalid queues
0639             for invalidQueueName in invalidQueueList:
0640                 if invalidQueueName in newQueueConfig:
0641                     del newQueueConfig[invalidQueueName]
0642             # auto blacklisting
0643             autoBlacklist = False
0644             if resolver is not None and hasattr(harvester_config.qconf, "autoBlacklist") and harvester_config.qconf.autoBlacklist:
0645                 autoBlacklist = True
0646             # get queue dumps
0647             queueConfigDumps = self.dbProxy.get_queue_config_dumps()
0648             # get active queues
0649             activeQueues = dict()
0650             for queueName, queueConfig in newQueueConfig.items():
0651                 # get status
0652                 if queueConfig.queueStatus is None and autoBlacklist:
0653                     queueConfig.queueStatus = resolver.get_queue_status(queueName)
0654                 # get dynamic information
0655                 if "DYNAMIC" in harvester_config.qconf.queueList:
0656                     # UPS queue
0657                     if resolver is not None and resolver.is_ups_queue(queueName):
0658                         queueConfig.runMode = "slave"
0659                         queueConfig.mapType = "NoJob"
0660                 # set online if undefined
0661                 if queueConfig.queueStatus is None:
0662                     queueConfig.queueStatus = "online"
0663                 queueConfig.queueStatus = queueConfig.queueStatus.lower()
0664                 # look for configID
0665                 dumpSpec = QueueConfigDumpSpec()
0666                 dumpSpec.queueName = queueName
0667                 dumpSpec.set_data(vars(queueConfig))
0668                 if dumpSpec.dumpUniqueName in queueConfigDumps:
0669                     dumpSpec = queueConfigDumps[dumpSpec.dumpUniqueName]
0670                 else:
0671                     # add dump
0672                     dumpSpec.creationTime = core_utils.naive_utcnow()
0673                     dumpSpec.configID = self.dbProxy.get_next_seq_number("SEQ_configID")
0674                     tmpStat = self.dbProxy.add_queue_config_dump(dumpSpec)
0675                     if not tmpStat:
0676                         dumpSpec.configID = self.dbProxy.get_config_id_dump(dumpSpec)
0677                         if dumpSpec.configID is None:
0678                             mainLog.error(f"failed to get configID for {dumpSpec.dumpUniqueName}")
0679                             continue
0680                     queueConfigDumps[dumpSpec.dumpUniqueName] = dumpSpec
0681                 queueConfig.configID = dumpSpec.configID
0682                 # ignore offline
0683                 if queueConfig.queueStatus == "offline":
0684                     mainLog.debug(f"{queueName} inactive due to offline")
0685                     continue
0686                 # filter for pilot version
0687                 if (
0688                     hasattr(harvester_config.qconf, "pilotVersion")
0689                     and pandaQueueDict.get(queueConfig.siteName) is not None
0690                     and pandaQueueDict.get(queueConfig.siteName).get("pilot_version") != str(harvester_config.qconf.pilotVersion)
0691                 ):
0692                     mainLog.debug(f"{queueName} inactive due to unmatched pilot version")
0693                     continue
0694                 # filter if static queue list
0695                 if (
0696                     "ALL" not in harvester_config.qconf.queueList
0697                     and "DYNAMIC" not in harvester_config.qconf.queueList
0698                     and queueName not in harvester_config.qconf.queueList
0699                 ):
0700                     mainLog.debug(f"{queueName} inactive due to not in static queue list")
0701                     continue
0702                 # added to active queues
0703                 activeQueues[queueName] = queueConfig
0704             self.queueConfig = newQueueConfig.copy()
0705             self.activeQueues = activeQueues.copy()
0706             newQueueConfigWithID = dict()
0707             for dumpSpec in queueConfigDumps.values():
0708                 queueConfig = QueueConfig(dumpSpec.queueName)
0709                 queueConfig.update_attributes(dumpSpec.data)
0710                 queueConfig.configID = dumpSpec.configID
0711                 newQueueConfigWithID[dumpSpec.configID] = queueConfig
0712             self.queueConfigWithID = newQueueConfigWithID
0713             # update lastUpdate and lastCheck
0714             self.lastUpdate = core_utils.naive_utcnow()
0715             self.lastCheck = self.lastUpdate
0716         # update database pq_table
0717         if self.toUpdateDB:
0718             retVal = self._update_pq_table(cache_time=self.last_cache_ts, refill_table=refill_table)
0719             if retVal:
0720                 if self.last_cache_ts:
0721                     mainLog.debug(f"updated pq_table (last cache updated at {str(self.last_cache_ts)})")
0722                 else:
0723                     mainLog.debug("updated pq_table")
0724             elif retVal is None:
0725                 mainLog.debug("did not get pq_table_fill lock. Skipped to update pq_table")
0726             else:
0727                 mainLog.warning("failed to update pq_table. Skipped")
0728         # done
0729         mainLog.debug("done")
0730 
0731     # check if valid queue
0732     def has_queue(self, queue_name, config_id=None):
0733         self.load_data()
0734         if config_id is not None:
0735             return config_id in self.queueConfigWithID
0736         return queue_name in self.queueConfig
0737 
0738     # get queue config
0739     def get_queue(self, queue_name, config_id=None):
0740         self.load_data()
0741         if config_id is not None and config_id in self.queueConfigWithID:
0742             return self.queueConfigWithID[config_id]
0743         if queue_name in self.queueConfig:
0744             return self.queueConfig[queue_name]
0745         return None
0746 
0747     # all queue configs
0748     def get_all_queues(self):
0749         self.load_data()
0750         return self.queueConfig
0751 
0752     # all active queue config
0753     def get_active_queues(self):
0754         self.load_data()
0755         return self.activeQueues
0756 
0757     def get_active_ups_queues(self):
0758         """
0759         Get active UPS candidates
0760         :return:
0761         """
0762         active_ups_queues = []
0763         active_queues = self.get_active_queues()
0764         for queue_name, queue_attribs in active_queues.items():
0765             try:
0766                 if queue_attribs.runMode == "slave" and queue_attribs.mapType == "NoJob":
0767                     active_ups_queues.append(queue_name)
0768             except KeyError:
0769                 continue
0770         return active_ups_queues
0771 
0772     # all queues with config IDs
0773     def get_all_queues_with_config_ids(self):
0774         self.load_data()
0775         return self.queueConfigWithID