File indexing completed on 2026-04-19 08:00:02
0001 import copy
0002 import datetime
0003 import importlib
0004 import json
0005 import os
0006 import threading
0007 from json.decoder import JSONDecodeError
0008
0009 from pandaharvester.harvesterconfig import harvester_config
0010 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0011
0012 from . import core_utils
0013 from .core_utils import SingletonWithID
0014 from .db_interface import DBInterface
0015 from .db_proxy_pool import DBProxyPool as DBProxy
0016 from .panda_queue_spec import PandaQueueSpec
0017 from .plugin_factory import PluginFactory
0018 from .queue_config_dump_spec import QueueConfigDumpSpec
0019 from .work_spec import WorkSpec
0020
0021
0022 _logger = core_utils.setup_logger("queue_config_mapper")
0023 _dbInterface = DBInterface()
0024
0025
0026
0027 def _make_logger(base_log=_logger, token=None, method_name=None, send_dialog=True):
0028 if send_dialog and _dbInterface:
0029 hook = _dbInterface
0030 else:
0031 hook = None
0032 return core_utils.make_logger(base_log, token=token, method_name=method_name, hook=hook)
0033
0034
0035
0036 class QueueConfig(object):
0037 def __init__(self, queue_name):
0038 self.queueName = queue_name
0039 self.pandaQueueName = None
0040 self.prodSourceLabel = "managed"
0041
0042 self.mapType = WorkSpec.MT_OneToOne
0043 self.useJobLateBinding = False
0044 self.zipPerMB = None
0045 self.siteName = ""
0046 self.siteFamily = ""
0047 self.maxWorkers = 0
0048 self.nNewWorkers = 0
0049 self.maxNewWorkersPerCycle = 0
0050 self.noHeartbeat = ""
0051 self.runMode = "self"
0052 self.resourceType = PandaQueueSpec.RT_catchall
0053 self.jobType = PandaQueueSpec.JT_catchall
0054 self.getJobCriteria = None
0055 self.ddmEndpointIn = None
0056 self.allowJobMixture = False
0057 self.maxSubmissionAttempts = 3
0058 self.truePilot = False
0059 self.queueStatus = None
0060 self.prefetchEvents = True
0061 self.uniqueName = None
0062 self.configID = None
0063 self.initEventsMultipler = 2
0064
0065
0066 def get_no_heartbeat_status(self):
0067 return self.noHeartbeat.split(",")
0068
0069
0070 def is_no_heartbeat_status(self, status):
0071 return status in self.get_no_heartbeat_status()
0072
0073
0074 def get_source_label(self, job_type=None, is_gu=None):
0075
0076
0077
0078
0079
0080
0081 if job_type in ("user", "panda"):
0082 return "user"
0083
0084
0085 if is_gu:
0086 return "unified"
0087
0088 return self.prodSourceLabel
0089
0090
0091 def set_unique_name(self):
0092 self.uniqueName = core_utils.get_unique_queue_name(self.queueName, self.resourceType, self.prodSourceLabel)
0093
0094
0095 def update_attributes(self, data):
0096 for k, v in data.items():
0097 setattr(self, k, v)
0098
0099
0100 def get_synchronization_level(self):
0101 if self.mapType == WorkSpec.MT_NoJob or self.truePilot or self.is_no_heartbeat_status("finished"):
0102 return 1
0103 return None
0104
0105
0106 def __str__(self):
0107 header = self.queueName + "\n" + "-" * len(self.queueName) + "\n"
0108 tmpStr = ""
0109 pluginStr = ""
0110 keys = sorted(self.__dict__.keys())
0111 for key in keys:
0112 val = self.__dict__[key]
0113 if isinstance(val, dict):
0114 pluginStr += f" {key} :\n"
0115 pKeys = sorted(val.keys())
0116 for pKey in pKeys:
0117 pVal = val[pKey]
0118 pluginStr += f" {pKey} = {pVal}\n"
0119 else:
0120 tmpStr += f" {key} = {val}\n"
0121 return header + tmpStr + pluginStr
0122
0123
0124
0125 class QueueConfigMapper(metaclass=SingletonWithID):
0126 """
0127 Using some acronyms here:
0128 LT = local template, written in local queueconfig file
0129 RT = remote template, on http source fetched by cacher
0130 FT = final template of a queue
0131 LQ = local queue configuration, written in local queueconfig file
0132 RQ = remote queue configuration, on http source fetched by cacher, static
0133 DQ = dynamic queue configuration, configured with information from resolver
0134
0135 Relations:
0136 FT = LT if existing else RT
0137 FQ = FT updated with RQ, then updated with DQ, then updated with LQ
0138 """
0139
0140 mandatory_attrs = set(
0141 [
0142 "messenger",
0143 "monitor",
0144 "preparator",
0145 "stager",
0146 "submitter",
0147 "sweeper",
0148 "workerMaker",
0149 ]
0150 )
0151 queue_limit_attrs = set(
0152 [
0153 "maxWorkers",
0154 "maxNewWorkersPerCycle",
0155 "nQueueLimitWorker",
0156 "nQueueLimitWorkerMax",
0157 "nQueueLimitWorkerRatio",
0158 "nQueueLimitWorkerMin",
0159 "nQueueLimitWorkerCores",
0160 "nQueueLimitWorkerCoresRatio",
0161 "nQueueLimitWorkerCoresMin",
0162 "nQueueLimitWorkerMemory",
0163 "nQueueLimitWorkerMemoryRatio",
0164 "nQueueLimitWorkerMemoryMin",
0165 "nQueueLimitJob",
0166 "nQueueLimitJobMax",
0167 "nQueueLimitJobRatio",
0168 "nQueueLimitJobMin",
0169 "nQueueLimitJobCores",
0170 "nQueueLimitJobCoresRatio",
0171 "nQueueLimitJobCoresMin",
0172 ]
0173 )
0174 updatable_plugin_attrs = set(
0175 ["common", "messenger", "monitor", "preparator", "stager", "submitter", "sweeper", "workerMaker", "throttler", "zipper", "aux_preparator", "extractor"]
0176 )
0177
0178
0179 def __init__(self, update_db=True):
0180 self.lock = threading.Lock()
0181 self.lastUpdate = None
0182 self.lastReload = None
0183 self.lastCheck = None
0184 self.last_cache_ts = None
0185 self.dbProxy = DBProxy()
0186 self.toUpdateDB = update_db
0187 try:
0188 self.configFromCacher = harvester_config.qconf.configFromCacher
0189 except AttributeError:
0190 self.configFromCacher = False
0191 try:
0192 self.updateInterval = harvester_config.qconf.updateInterval
0193 except AttributeError:
0194 self.updateInterval = 600
0195 try:
0196 self.checkInterval = harvester_config.qconf.checkInterval
0197 except AttributeError:
0198 self.checkInterval = 5
0199 finally:
0200 self.checkInterval = min(self.checkInterval, self.updateInterval)
0201
0202
0203 def _load_config_from_cache(self):
0204 mainLog = _make_logger(token=f"id={core_utils.get_pid()}", method_name="_load_config_from_cache")
0205
0206 if self.configFromCacher:
0207 queueConfig_cacheSpec = self.dbProxy.get_cache("queues_config_file", from_local_cache=False)
0208 if queueConfig_cacheSpec is not None:
0209 queueConfigJson = queueConfig_cacheSpec.data
0210 if isinstance(queueConfigJson, dict):
0211 return queueConfigJson, queueConfig_cacheSpec.lastUpdate
0212 else:
0213 mainLog.error("Invalid JSON in cache queues_config_file. Skipped")
0214 else:
0215 mainLog.debug("queues config not fount in cache. Skipped")
0216 else:
0217 mainLog.debug("queues config URL not set. Skipped")
0218 return None, None
0219
0220
0221 @staticmethod
0222 def _load_config_from_file():
0223 mainLog = _make_logger(token=f"id={core_utils.get_pid()}", method_name="_load_config_from_file")
0224
0225 if os.path.isabs(harvester_config.qconf.configFile):
0226 confFilePath = harvester_config.qconf.configFile
0227 else:
0228
0229 confFilePath = None
0230 if "PANDA_HOME" in os.environ:
0231 confFilePath = os.path.join(os.environ["PANDA_HOME"], "etc/panda", harvester_config.qconf.configFile)
0232 if not os.path.exists(confFilePath):
0233 confFilePath = None
0234
0235 if confFilePath is None:
0236 confFilePath = os.path.join("/etc/panda", harvester_config.qconf.configFile)
0237
0238 try:
0239 with open(confFilePath) as f:
0240 queueConfigJson = json.load(f)
0241 except OSError as e:
0242 mainLog.error(f"Cannot read file: {confFilePath} ; {e}")
0243 return None
0244 except JSONDecodeError as e:
0245 mainLog.error(f"Invalid JSON in file: {confFilePath} ; {e}")
0246 return None
0247 return queueConfigJson
0248
0249
0250 @staticmethod
0251 def _get_resolver():
0252 if hasattr(harvester_config.qconf, "resolverModule") and hasattr(harvester_config.qconf, "resolverClass"):
0253 pluginConf = {"module": harvester_config.qconf.resolverModule, "name": harvester_config.qconf.resolverClass}
0254 pluginFactory = PluginFactory()
0255 resolver = pluginFactory.get_plugin(pluginConf)
0256 else:
0257 resolver = None
0258 return resolver
0259
0260
0261 def _update_last_reload_time(self, the_time=None):
0262
0263 got_update_lock = self.dbProxy.get_process_lock("qconf_reload", "qconf_universal", self.updateInterval)
0264 if got_update_lock:
0265 if the_time is None:
0266 the_time = core_utils.naive_utcnow()
0267 ts = the_time.timestamp()
0268 new_ts_info = f"{ts:.3f}"
0269 ret_val = self.dbProxy.refresh_cache("_qconf_last_reload", "_universal", new_ts_info)
0270 if ret_val:
0271 return True
0272 else:
0273 return False
0274 else:
0275 return None
0276
0277
0278 def _get_last_reload_time(self):
0279 cacheSpec = self.dbProxy.get_cache("_qconf_last_reload", "_universal", from_local_cache=False)
0280 if cacheSpec is None:
0281 return None
0282 timestamp = float(cacheSpec.data)
0283 return core_utils.naive_utcfromtimestamp(timestamp)
0284
0285
0286 def _update_pq_table(self, cache_time=None, refill_table=False):
0287
0288 got_update_lock = self.dbProxy.get_process_lock("pq_table_fill", "qconf_universal", 120)
0289 if got_update_lock:
0290 fill_ret_val = self.dbProxy.fill_panda_queue_table(self.activeQueues.keys(), self, refill_table=refill_table)
0291 now_time = core_utils.naive_utcnow()
0292 if fill_ret_val:
0293 now_ts = now_time.timestamp()
0294 now_ts_info = f"{now_ts:.3f}"
0295 self.dbProxy.refresh_cache("_pq_table_last_fill", "_universal", now_ts_info)
0296 if cache_time:
0297 cache_ts = cache_time.timestamp()
0298 cache_ts_info = f"{cache_ts:.3f}"
0299 self.dbProxy.refresh_cache("_cache_to_fill_pq_table", "_universal", cache_ts_info)
0300 self.dbProxy.release_process_lock("pq_table_fill", "qconf_universal")
0301 if fill_ret_val:
0302 return True
0303 else:
0304 return False
0305 else:
0306 return None
0307
0308
0309 def _get_last_pq_table_fill_time(self):
0310 cacheSpec = self.dbProxy.get_cache("_pq_table_last_fill", "_universal", from_local_cache=False)
0311 if cacheSpec is None:
0312 return None
0313 timestamp = float(cacheSpec.data)
0314 return core_utils.naive_utcfromtimestamp(timestamp)
0315
0316
0317 def _get_cache_to_fill_pq_table_time(self):
0318 cacheSpec = self.dbProxy.get_cache("_cache_to_fill_pq_table", "_universal", from_local_cache=False)
0319 if cacheSpec is None:
0320 return None
0321 timestamp = float(cacheSpec.data)
0322 return core_utils.naive_utcfromtimestamp(timestamp)
0323
0324
0325 def load_data(self, refill_table=False):
0326 mainLog = _make_logger(token=f"id={core_utils.get_pid()}", method_name="load_data")
0327
0328 with self.lock:
0329 now_time = core_utils.naive_utcnow()
0330 updateInterval_td = datetime.timedelta(seconds=self.updateInterval)
0331 checkInterval_td = datetime.timedelta(seconds=self.checkInterval)
0332
0333 if self.lastCheck is not None and now_time - self.lastCheck < checkInterval_td:
0334 return
0335 self.lastCheck = now_time
0336
0337 self.lastReload = self._get_last_reload_time()
0338
0339 self.last_pq_table_fill_time = self._get_last_pq_table_fill_time()
0340
0341 self.cache_to_fill_pq_table_time = self._get_cache_to_fill_pq_table_time()
0342
0343 if self.lastReload and self.lastUpdate:
0344 min_last_reload = self.lastReload
0345 if self.last_cache_ts:
0346 min_last_reload = min(min_last_reload, self.last_cache_ts + updateInterval_td * 0.25)
0347 if self.lastReload < self.lastUpdate and now_time - min_last_reload < updateInterval_td:
0348 return
0349
0350 with self.lock:
0351
0352 now_time = core_utils.naive_utcnow()
0353 update_last_reload_ret_val = self._update_last_reload_time(now_time)
0354 if update_last_reload_ret_val:
0355 self.lastReload = now_time
0356 mainLog.debug("updated last reload timestamp")
0357 elif update_last_reload_ret_val is None:
0358 mainLog.debug("did not get qconf_reload timestamp lock. Skipped to update last reload timestamp")
0359 else:
0360 mainLog.warning("failed to update last reload timestamp. Skipped")
0361
0362 newQueueConfig = dict()
0363 localTemplatesDict = dict()
0364 remoteTemplatesDict = dict()
0365 finalTemplatesDict = dict()
0366 localQueuesDict = dict()
0367 remoteQueuesDict = dict()
0368 dynamicQueuesDict = dict()
0369 allQueuesNameList = set()
0370 getQueuesDynamic = False
0371 invalidQueueList = set()
0372 pandaQueueDict = PandaQueuesDict()
0373
0374 resolver = self._get_resolver()
0375 if resolver is None:
0376 mainLog.debug("No resolver is configured")
0377
0378 queueConfigJson_cacher, self.last_cache_ts = self._load_config_from_cache()
0379 if self.last_cache_ts and self.cache_to_fill_pq_table_time and (self.last_cache_ts < self.cache_to_fill_pq_table_time):
0380
0381 mainLog.warning(f"Found cacher data outdated ({str(self.last_cache_ts)} < {str(self.cache_to_fill_pq_table_time)})")
0382 if queueConfigJson_cacher is not None:
0383 mainLog.debug("Applying cacher data")
0384 for queueName, queueDict in queueConfigJson_cacher.items():
0385 if queueDict.get("isTemplateQueue") is True or queueName.endswith("_TEMPLATE"):
0386
0387 queueDict["isTemplateQueue"] = True
0388 queueDict.pop("templateQueueName", None)
0389 remoteTemplatesDict[queueName] = queueDict
0390 else:
0391
0392 queueDict["isTemplateQueue"] = False
0393 remoteQueuesDict[queueName] = queueDict
0394
0395 queueConfigJson_local = self._load_config_from_file()
0396 if queueConfigJson_local is not None:
0397 mainLog.debug("Applying local config")
0398 for queueName, queueDict in queueConfigJson_local.items():
0399 if queueDict.get("isTemplateQueue") is True or queueName.endswith("_TEMPLATE"):
0400
0401 queueDict["isTemplateQueue"] = True
0402 queueDict.pop("templateQueueName", None)
0403 localTemplatesDict[queueName] = queueDict
0404 else:
0405
0406 queueDict["isTemplateQueue"] = False
0407 localQueuesDict[queueName] = queueDict
0408 else:
0409 mainLog.warning("Failed to load config from local json file. Skipped")
0410
0411 finalTemplatesDict.update(remoteTemplatesDict)
0412 finalTemplatesDict.update(localTemplatesDict)
0413 finalTemplatesDict.pop(None, None)
0414
0415 for acr, queuesDict in [("RQ", remoteQueuesDict), ("LQ", localQueuesDict)]:
0416 for queueName, queueDict in queuesDict.copy().items():
0417 templateQueueName = queueDict.get("templateQueueName")
0418 if templateQueueName is not None and templateQueueName not in finalTemplatesDict:
0419 del queuesDict[queueName]
0420 mainLog.warning(f'Invalid templateQueueName "{templateQueueName}" for {queueName} ({acr}). Skipped')
0421
0422 if resolver is not None and "DYNAMIC" in harvester_config.qconf.queueList:
0423 getQueuesDynamic = True
0424 dynamicQueuesNameList = resolver.get_all_queue_names()
0425 for queueName in dynamicQueuesNameList.copy():
0426 queueDict = dict()
0427
0428 templateQueueName = None
0429 resolver_harvester_template = None
0430 if resolver is not None:
0431 resolver_harvester_template = resolver.get_harvester_template(queueName)
0432 resolver_type, resolver_workflow = resolver.get_type_workflow(queueName)
0433 if resolver_harvester_template:
0434 templateQueueName = resolver_harvester_template
0435 elif not (resolver_type is None or resolver_workflow is None):
0436 templateQueueName = f"{resolver_type}.{resolver_workflow}"
0437 else:
0438 templateQueueName = harvester_config.qconf.defaultTemplateQueueName
0439 if templateQueueName not in finalTemplatesDict:
0440
0441 dynamicQueuesNameList.discard(queueName)
0442 mainLog.warning(f'Invalid templateQueueName "{templateQueueName}" for {queueName} (DQ). Skipped')
0443 continue
0444
0445 resolver_harvester_params = resolver.get_harvester_params(queueName)
0446 for key, val in resolver_harvester_params.items():
0447 if key in self.queue_limit_attrs:
0448 queueDict[key] = val
0449
0450 queueDict["templateQueueName"] = templateQueueName
0451 queueDict["isTemplateQueue"] = False
0452 dynamicQueuesDict[queueName] = queueDict
0453
0454 allQueuesNameList |= set(remoteQueuesDict)
0455 allQueuesNameList |= set(dynamicQueuesDict)
0456 allQueuesNameList |= set(localQueuesDict)
0457 allQueuesNameList.discard(None)
0458
0459 for queueName in allQueuesNameList:
0460
0461 queueSourceList = []
0462 templateSourceList = []
0463
0464 templateQueueName = None
0465 for queuesDict in [remoteQueuesDict, dynamicQueuesDict, localQueuesDict]:
0466 if queueName not in queuesDict:
0467 continue
0468 tmp_queueDict = queuesDict[queueName]
0469 tmp_templateQueueName = tmp_queueDict.get("templateQueueName")
0470 if tmp_templateQueueName is not None:
0471 templateQueueName = tmp_templateQueueName
0472
0473 queueDict = dict()
0474 if templateQueueName in finalTemplatesDict:
0475 queueDict.update(copy.deepcopy(finalTemplatesDict[templateQueueName]))
0476 for acr, templatesDict in [("RT", remoteTemplatesDict), ("LT", localTemplatesDict)]:
0477 if templateQueueName in templatesDict:
0478 templateSourceList.append(acr)
0479
0480 for acr, queuesDict in [("RQ", remoteQueuesDict), ("DQ", dynamicQueuesDict), ("LQ", localQueuesDict)]:
0481 if queueName not in queuesDict:
0482 continue
0483 queueSourceList.append(acr)
0484 tmp_queueDict = queuesDict[queueName]
0485 for key, val in tmp_queueDict.items():
0486 val = copy.deepcopy(val)
0487 if key in self.updatable_plugin_attrs and isinstance(queueDict.get(key), dict) and isinstance(val, dict):
0488
0489 queueDict[key].update(val)
0490 else:
0491 queueDict[key] = val
0492
0493 if templateQueueName:
0494 mainLog.debug(
0495 f"queue {queueName} comes from {','.join(queueSourceList)} (with template {templateQueueName} from {','.join(templateSourceList)})"
0496 )
0497 else:
0498 mainLog.debug(f"queue {queueName} comes from {','.join(queueSourceList)}")
0499
0500 if queueName in newQueueConfig:
0501 queueConfig = newQueueConfig[queueName]
0502 else:
0503 queueConfig = QueueConfig(queueName)
0504
0505 queueConfig.siteName = queueConfig.queueName.split("/")[0]
0506 if queueConfig.siteName != queueConfig.queueName:
0507 queueConfig.resourceType = queueConfig.queueName.split("/")[-1]
0508 if not queueConfig.siteFamily:
0509 queueConfig.siteFamily = queueConfig.siteName
0510
0511 commonAttrDict = dict()
0512 if isinstance(queueDict.get("common"), dict):
0513 commonAttrDict = queueDict.get("common")
0514
0515 for key, val in queueDict.items():
0516 if isinstance(val, dict) and "module" in val and "name" in val:
0517
0518 val = copy.deepcopy(val)
0519
0520 for c_key, c_val in commonAttrDict.items():
0521 if c_key not in val and c_key not in ("module", "name"):
0522 val[c_key] = c_val
0523
0524 try:
0525 _t3mP_1Mp0R7_mO6U1e__ = importlib.import_module(val["module"])
0526 _t3mP_1Mp0R7_N4m3__ = getattr(_t3mP_1Mp0R7_mO6U1e__, val["name"])
0527 except Exception as _e:
0528 invalidQueueList.add(queueConfig.queueName)
0529 mainLog.error(f"Module or class not found. Omitted {queueConfig.queueName} in queue config ({_e})")
0530 continue
0531 else:
0532 del _t3mP_1Mp0R7_mO6U1e__
0533 del _t3mP_1Mp0R7_N4m3__
0534
0535 if "siteName" not in val:
0536 val["siteName"] = queueConfig.siteName
0537 if "queueName" not in val:
0538 val["queueName"] = queueConfig.queueName
0539
0540 if "middleware" in val and val["middleware"] in queueDict:
0541
0542 val["original_config"] = copy.deepcopy(val)
0543
0544 for m_key, m_val in queueDict[val["middleware"]].items():
0545 val[m_key] = m_val
0546 setattr(queueConfig, key, val)
0547
0548 try:
0549 if getattr(queueConfig, "isTemplateQueue"):
0550 mainLog.error(f"Internal error: isTemplateQueue is True. Omitted {queueConfig.queueName} in queue config")
0551 invalidQueueList.add(queueConfig.queueName)
0552 else:
0553 delattr(queueConfig, "isTemplateQueue")
0554 except AttributeError as _e:
0555 mainLog.error(f'Internal error with attr "isTemplateQueue". Omitted {queueConfig.queueName} in queue config ({_e})')
0556 invalidQueueList.add(queueConfig.queueName)
0557
0558 if resolver is not None:
0559 queueConfig.pandaQueueName = resolver.get_panda_queue_name(queueConfig.siteName)
0560
0561 if queueConfig.getJobCriteria is not None:
0562 tmpCriteria = dict()
0563 for tmpItem in queueConfig.getJobCriteria.split(","):
0564 tmpKey, tmpVal = tmpItem.split("=")
0565 tmpCriteria[tmpKey] = tmpVal
0566 if len(tmpCriteria) == 0:
0567 queueConfig.getJobCriteria = None
0568 else:
0569 queueConfig.getJobCriteria = tmpCriteria
0570
0571 if queueConfig.mapType == WorkSpec.MT_NoJob:
0572 for attName in [
0573 "nQueueLimitJob",
0574 "nQueueLimitJobRatio",
0575 "nQueueLimitJobMax",
0576 "nQueueLimitJobMin",
0577 "nQueueLimitJobCores",
0578 "nQueueLimitJobCoresRatio",
0579 "nQueueLimitJobCoresMin",
0580 ]:
0581 setattr(queueConfig, attName, None)
0582
0583 if queueConfig.mapType != WorkSpec.MT_NoJob:
0584 for attName in ["nQueueLimitWorkerRatio", "nQueueLimitWorkerMin"]:
0585 setattr(queueConfig, attName, None)
0586
0587 for key in self.queue_limit_attrs:
0588 val = getattr(queueConfig, key, None)
0589 if val is not None:
0590 if not isinstance(val, int):
0591 setattr(queueConfig, key, None)
0592 elif val < 0:
0593 setattr(queueConfig, key, 0)
0594 if getattr(queueConfig, "maxWorkers", None) is None:
0595 queueConfig.maxWorkers = 0
0596 if getattr(queueConfig, "nQueueLimitWorker", None) is not None and getattr(queueConfig, "nQueueLimitWorkerMax", None) is not None:
0597 max_queue_workers = min(queueConfig.nQueueLimitWorker, queueConfig.nQueueLimitWorkerMax, queueConfig.maxWorkers)
0598 queueConfig.nQueueLimitWorker = max_queue_workers
0599 queueConfig.nQueueLimitWorkerMax = max_queue_workers
0600 elif getattr(queueConfig, "nQueueLimitWorker", None) is not None:
0601 queueConfig.nQueueLimitWorker = min(queueConfig.nQueueLimitWorker, queueConfig.maxWorkers)
0602 elif getattr(queueConfig, "nQueueLimitWorkerMax", None) is not None:
0603 queueConfig.nQueueLimitWorkerMax = min(queueConfig.nQueueLimitWorkerMax, queueConfig.maxWorkers)
0604 queueConfig.nQueueLimitWorker = queueConfig.nQueueLimitWorkerMax
0605 if getattr(queueConfig, "nQueueLimitWorkerMin", None) is not None and getattr(queueConfig, "nQueueLimitWorker", None) is not None:
0606 queueConfig.nQueueLimitWorkerMin = min(queueConfig.nQueueLimitWorkerMin, queueConfig.nQueueLimitWorker, queueConfig.maxWorkers)
0607 if getattr(queueConfig, "nQueueLimitWorkerCoresMin", None) is not None and getattr(queueConfig, "nQueueLimitWorkerCores", None) is not None:
0608 queueConfig.nQueueLimitWorkerCoresMin = min(queueConfig.nQueueLimitWorkerCoresMin, queueConfig.nQueueLimitWorkerCores)
0609 if getattr(queueConfig, "nQueueLimitWorkerMemoryMin", None) is not None and getattr(queueConfig, "nQueueLimitWorkerMemory", None) is not None:
0610 queueConfig.nQueueLimitWorkerMemoryMin = min(queueConfig.nQueueLimitWorkerMemoryMin, queueConfig.nQueueLimitWorkerMemory)
0611 if getattr(queueConfig, "nQueueLimitJob", None) is not None and getattr(queueConfig, "nQueueLimitJobMax", None) is not None:
0612 max_queue_jobs = min(queueConfig.nQueueLimitJob, queueConfig.nQueueLimitJobMax)
0613 queueConfig.nQueueLimitJob = max_queue_jobs
0614 queueConfig.nQueueLimitJobMax = max_queue_jobs
0615 elif getattr(queueConfig, "nQueueLimitJobMax", None) is not None:
0616 queueConfig.nQueueLimitJob = queueConfig.nQueueLimitJobMax
0617 if getattr(queueConfig, "nQueueLimitJobMin", None) is not None and getattr(queueConfig, "nQueueLimitJob", None) is not None:
0618 queueConfig.nQueueLimitJobMin = min(queueConfig.nQueueLimitJobMin, queueConfig.nQueueLimitJob)
0619 if getattr(queueConfig, "nQueueLimitJobCoresMin", None) is not None and getattr(queueConfig, "nQueueLimitJobCores", None) is not None:
0620 queueConfig.nQueueLimitJobCoresMin = min(queueConfig.nQueueLimitJobCoresMin, queueConfig.nQueueLimitJobCores)
0621
0622 if queueConfig.truePilot and queueConfig.noHeartbeat == "":
0623 queueConfig.noHeartbeat = "running,transferring,finished,failed"
0624
0625 queueConfig.set_unique_name()
0626
0627 newQueueConfig[queueName] = queueConfig
0628
0629 if queueName in newQueueConfig:
0630 queueConfig = newQueueConfig[queueName]
0631 missing_attr_list = []
0632 for _attr in self.mandatory_attrs:
0633 if not hasattr(queueConfig, _attr):
0634 invalidQueueList.add(queueConfig.queueName)
0635 missing_attr_list.append(_attr)
0636 if missing_attr_list:
0637 mainLog.error(f"Missing mandatory attributes {','.join(missing_attr_list)} . Omitted {queueConfig.queueName} in queue config")
0638
0639 for invalidQueueName in invalidQueueList:
0640 if invalidQueueName in newQueueConfig:
0641 del newQueueConfig[invalidQueueName]
0642
0643 autoBlacklist = False
0644 if resolver is not None and hasattr(harvester_config.qconf, "autoBlacklist") and harvester_config.qconf.autoBlacklist:
0645 autoBlacklist = True
0646
0647 queueConfigDumps = self.dbProxy.get_queue_config_dumps()
0648
0649 activeQueues = dict()
0650 for queueName, queueConfig in newQueueConfig.items():
0651
0652 if queueConfig.queueStatus is None and autoBlacklist:
0653 queueConfig.queueStatus = resolver.get_queue_status(queueName)
0654
0655 if "DYNAMIC" in harvester_config.qconf.queueList:
0656
0657 if resolver is not None and resolver.is_ups_queue(queueName):
0658 queueConfig.runMode = "slave"
0659 queueConfig.mapType = "NoJob"
0660
0661 if queueConfig.queueStatus is None:
0662 queueConfig.queueStatus = "online"
0663 queueConfig.queueStatus = queueConfig.queueStatus.lower()
0664
0665 dumpSpec = QueueConfigDumpSpec()
0666 dumpSpec.queueName = queueName
0667 dumpSpec.set_data(vars(queueConfig))
0668 if dumpSpec.dumpUniqueName in queueConfigDumps:
0669 dumpSpec = queueConfigDumps[dumpSpec.dumpUniqueName]
0670 else:
0671
0672 dumpSpec.creationTime = core_utils.naive_utcnow()
0673 dumpSpec.configID = self.dbProxy.get_next_seq_number("SEQ_configID")
0674 tmpStat = self.dbProxy.add_queue_config_dump(dumpSpec)
0675 if not tmpStat:
0676 dumpSpec.configID = self.dbProxy.get_config_id_dump(dumpSpec)
0677 if dumpSpec.configID is None:
0678 mainLog.error(f"failed to get configID for {dumpSpec.dumpUniqueName}")
0679 continue
0680 queueConfigDumps[dumpSpec.dumpUniqueName] = dumpSpec
0681 queueConfig.configID = dumpSpec.configID
0682
0683 if queueConfig.queueStatus == "offline":
0684 mainLog.debug(f"{queueName} inactive due to offline")
0685 continue
0686
0687 if (
0688 hasattr(harvester_config.qconf, "pilotVersion")
0689 and pandaQueueDict.get(queueConfig.siteName) is not None
0690 and pandaQueueDict.get(queueConfig.siteName).get("pilot_version") != str(harvester_config.qconf.pilotVersion)
0691 ):
0692 mainLog.debug(f"{queueName} inactive due to unmatched pilot version")
0693 continue
0694
0695 if (
0696 "ALL" not in harvester_config.qconf.queueList
0697 and "DYNAMIC" not in harvester_config.qconf.queueList
0698 and queueName not in harvester_config.qconf.queueList
0699 ):
0700 mainLog.debug(f"{queueName} inactive due to not in static queue list")
0701 continue
0702
0703 activeQueues[queueName] = queueConfig
0704 self.queueConfig = newQueueConfig.copy()
0705 self.activeQueues = activeQueues.copy()
0706 newQueueConfigWithID = dict()
0707 for dumpSpec in queueConfigDumps.values():
0708 queueConfig = QueueConfig(dumpSpec.queueName)
0709 queueConfig.update_attributes(dumpSpec.data)
0710 queueConfig.configID = dumpSpec.configID
0711 newQueueConfigWithID[dumpSpec.configID] = queueConfig
0712 self.queueConfigWithID = newQueueConfigWithID
0713
0714 self.lastUpdate = core_utils.naive_utcnow()
0715 self.lastCheck = self.lastUpdate
0716
0717 if self.toUpdateDB:
0718 retVal = self._update_pq_table(cache_time=self.last_cache_ts, refill_table=refill_table)
0719 if retVal:
0720 if self.last_cache_ts:
0721 mainLog.debug(f"updated pq_table (last cache updated at {str(self.last_cache_ts)})")
0722 else:
0723 mainLog.debug("updated pq_table")
0724 elif retVal is None:
0725 mainLog.debug("did not get pq_table_fill lock. Skipped to update pq_table")
0726 else:
0727 mainLog.warning("failed to update pq_table. Skipped")
0728
0729 mainLog.debug("done")
0730
0731
0732 def has_queue(self, queue_name, config_id=None):
0733 self.load_data()
0734 if config_id is not None:
0735 return config_id in self.queueConfigWithID
0736 return queue_name in self.queueConfig
0737
0738
0739 def get_queue(self, queue_name, config_id=None):
0740 self.load_data()
0741 if config_id is not None and config_id in self.queueConfigWithID:
0742 return self.queueConfigWithID[config_id]
0743 if queue_name in self.queueConfig:
0744 return self.queueConfig[queue_name]
0745 return None
0746
0747
0748 def get_all_queues(self):
0749 self.load_data()
0750 return self.queueConfig
0751
0752
0753 def get_active_queues(self):
0754 self.load_data()
0755 return self.activeQueues
0756
0757 def get_active_ups_queues(self):
0758 """
0759 Get active UPS candidates
0760 :return:
0761 """
0762 active_ups_queues = []
0763 active_queues = self.get_active_queues()
0764 for queue_name, queue_attribs in active_queues.items():
0765 try:
0766 if queue_attribs.runMode == "slave" and queue_attribs.mapType == "NoJob":
0767 active_ups_queues.append(queue_name)
0768 except KeyError:
0769 continue
0770 return active_ups_queues
0771
0772
0773 def get_all_queues_with_config_ids(self):
0774 self.load_data()
0775 return self.queueConfigWithID