File indexing completed on 2026-04-20 07:58:56
0001 import itertools
0002 import re
0003
0004 from pandaharvester.harvesterbody.agent_base import AgentBase
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0008 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0009
0010
0011 _logger = core_utils.setup_logger("cred_manager")
0012
0013
0014
0015 class CredManager(AgentBase):
0016
0017 def __init__(self, queue_config_mapper, single_mode=False):
0018 AgentBase.__init__(self, single_mode)
0019 self.queue_config_mapper = queue_config_mapper
0020 self.pluginFactory = PluginFactory()
0021 self.dbProxy = DBProxy()
0022
0023 self.exe_cores = []
0024 self.queue_exe_cores = []
0025
0026 self.get_cores_from_harvester_config()
0027
0028 self.update_cores_from_queue_config()
0029
0030
0031 def get_list(self, data):
0032 if isinstance(data, list):
0033 return data
0034 else:
0035 return [data]
0036
0037
0038 def get_cores_from_harvester_config(self):
0039
0040 if hasattr(harvester_config.credmanager, "moduleName"):
0041 moduleNames = self.get_list(harvester_config.credmanager.moduleName)
0042 else:
0043 moduleNames = []
0044 if hasattr(harvester_config.credmanager, "className"):
0045 classNames = self.get_list(harvester_config.credmanager.className)
0046 else:
0047 classNames = []
0048
0049 if hasattr(harvester_config.credmanager, "inCertFile"):
0050 inCertFiles = self.get_list(harvester_config.credmanager.inCertFile)
0051 elif hasattr(harvester_config.credmanager, "certFile"):
0052 inCertFiles = self.get_list(harvester_config.credmanager.certFile)
0053 else:
0054 inCertFiles = []
0055
0056 if hasattr(harvester_config.credmanager, "outCertFile"):
0057 outCertFiles = self.get_list(harvester_config.credmanager.outCertFile)
0058 else:
0059
0060 outCertFiles = self.get_list(harvester_config.pandacon.cert_file)
0061
0062 if hasattr(harvester_config.credmanager, "voms"):
0063 vomses = self.get_list(harvester_config.credmanager.voms)
0064 else:
0065 vomses = []
0066
0067 if hasattr(harvester_config.credmanager, "pluginConfigs"):
0068 pluginConfigs = harvester_config.credmanager.pluginConfigs
0069 else:
0070 pluginConfigs = []
0071
0072 for moduleName, className, inCertFile, outCertFile, voms in zip(moduleNames, classNames, inCertFiles, outCertFiles, vomses):
0073 plugin_params = {}
0074 plugin_params["module"] = moduleName
0075 plugin_params["name"] = className
0076 plugin_params["inCertFile"] = inCertFile
0077 plugin_params["outCertFile"] = outCertFile
0078 plugin_params["voms"] = voms
0079 try:
0080 exe_core = self.pluginFactory.get_plugin(plugin_params)
0081 self.exe_cores.append(exe_core)
0082 except Exception:
0083 _logger.error(f"failed to launch credmanager with traditional attributes for {plugin_params}")
0084 core_utils.dump_error_message(_logger)
0085
0086 for pc in pluginConfigs:
0087 try:
0088 setup_maps = pc["configs"]
0089 for setup_name, setup_map in setup_maps.items():
0090 try:
0091 plugin_params = {"module": pc["module"], "name": pc["name"], "setup_name": setup_name}
0092 plugin_params.update(setup_map)
0093 exe_core = self.pluginFactory.get_plugin(plugin_params)
0094 self.exe_cores.append(exe_core)
0095 except Exception:
0096 _logger.error(f"failed to launch credmanager in pluginConfigs for {plugin_params}")
0097 core_utils.dump_error_message(_logger)
0098 except Exception:
0099 _logger.error(f"failed to parse pluginConfigs {pc}")
0100 core_utils.dump_error_message(_logger)
0101
0102
0103 def update_cores_from_queue_config(self):
0104 self.queue_exe_cores = []
0105 for queue_name, queue_config in self.queue_config_mapper.get_all_queues().items():
0106 if queue_config.queueStatus == "offline" or not hasattr(queue_config, "credmanagers") or not isinstance(queue_config.credmanagers, list):
0107 continue
0108 for cm_setup in queue_config.credmanagers:
0109 try:
0110 plugin_params = {"module": cm_setup["module"], "name": cm_setup["name"], "setup_name": queue_name, "queueName": queue_name}
0111 for k, v in cm_setup.items():
0112 if k in ("module", "name"):
0113 pass
0114 if isinstance(v, str) and "$" in v:
0115
0116 value = v
0117 patts = re.findall("\$\{([a-zA-Z\d_.]+)\}", v)
0118 for patt in patts:
0119 tmp_ph = "${" + patt + "}"
0120 tmp_val = None
0121 if patt == "harvesterID":
0122 tmp_val = harvester_config.master.harvester_id
0123 elif patt == "queueName":
0124 tmp_val = queue_name
0125 elif patt.startswith("common."):
0126
0127 attr = patt.replace("common.", "")
0128 if hasattr(queue_config, "common") and attr in queue_config.common:
0129 tmp_val = queue_config.common[attr]
0130 if tmp_val is not None:
0131 value = value.replace(tmp_ph, tmp_val)
0132
0133 plugin_params[k] = value
0134 else:
0135
0136 plugin_params[k] = v
0137 exe_core = self.pluginFactory.get_plugin(plugin_params)
0138 self.queue_exe_cores.append(exe_core)
0139 except Exception:
0140 _logger.error(f"failed to launch plugin for queue={queue_name} and {plugin_params}")
0141 core_utils.dump_error_message(_logger)
0142
0143
0144 def run(self):
0145 while True:
0146
0147 self.update_cores_from_queue_config()
0148
0149
0150 self.execute()
0151
0152
0153 if self.terminated(harvester_config.credmanager.sleepTime, randomize=False):
0154 return
0155
0156
0157 def execute(self):
0158
0159 locked = self.dbProxy.get_process_lock("credmanager", self.get_pid(), harvester_config.credmanager.sleepTime)
0160 if not locked:
0161 return
0162
0163 for exe_core in itertools.chain(self.exe_cores, self.queue_exe_cores):
0164
0165 if exe_core is None:
0166 continue
0167
0168 if hasattr(exe_core, "setup_name"):
0169 credmanager_name = exe_core.setup_name
0170 else:
0171 credmanager_name = f"{exe_core.inCertFile} {exe_core.outCertFile}"
0172 mainLog = self.make_logger(_logger, f"{exe_core.__class__.__name__} {credmanager_name}", method_name="execute")
0173 try:
0174
0175 mainLog.debug("check credential")
0176 isValid = exe_core.check_credential()
0177 if isValid:
0178 mainLog.debug("valid")
0179 elif not isValid:
0180
0181 mainLog.debug("invalid")
0182 mainLog.debug("renew credential")
0183 tmpStat, tmpOut = exe_core.renew_credential()
0184 if not tmpStat:
0185 mainLog.error(f"failed : {tmpOut}")
0186 continue
0187 except Exception:
0188 core_utils.dump_error_message(mainLog)
0189 mainLog.debug("done")
0190
0191
0192 def execute_monit(self):
0193 self.update_cores_from_queue_config()
0194
0195 metrics = {}
0196
0197 for exe_core in itertools.chain(self.exe_cores, self.queue_exe_cores):
0198
0199 if exe_core is None:
0200 continue
0201
0202
0203 if hasattr(exe_core, "setup_name"):
0204 credmanager_name = exe_core.setup_name
0205 else:
0206 credmanager_name = f"{exe_core.inCertFile} {exe_core.outCertFile}"
0207
0208 sub_log = self.make_logger(_logger, f"{exe_core.__class__.__name__} {credmanager_name}", method_name="execute_monit")
0209 try:
0210
0211 sub_log.debug("check credential lifetime")
0212 lifetime = exe_core.check_credential_lifetime()
0213 if lifetime is not None:
0214 metrics[exe_core.outCertFile] = lifetime
0215 except Exception:
0216 core_utils.dump_error_message(sub_log)
0217
0218 sub_log.debug("done")
0219
0220 return metrics