Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 import itertools
0002 import re
0003 
0004 from pandaharvester.harvesterbody.agent_base import AgentBase
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0008 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0009 
0010 # logger
0011 _logger = core_utils.setup_logger("cred_manager")
0012 
0013 
0014 # credential manager
0015 class CredManager(AgentBase):
0016     # constructor
0017     def __init__(self, queue_config_mapper, single_mode=False):
0018         AgentBase.__init__(self, single_mode)
0019         self.queue_config_mapper = queue_config_mapper
0020         self.pluginFactory = PluginFactory()
0021         self.dbProxy = DBProxy()
0022         # plugin cores
0023         self.exe_cores = []  # general cred managers that are not queue based, e.g. VOMS renewal on harvester instance
0024         self.queue_exe_cores = []  # cred manages that are queue based, e.g. VOMS refresh on K8S clusters
0025         # get plugin from harvester config
0026         self.get_cores_from_harvester_config()
0027         # update plugin cores from queue config
0028         self.update_cores_from_queue_config()
0029 
0030     # get list
0031     def get_list(self, data):
0032         if isinstance(data, list):
0033             return data
0034         else:
0035             return [data]
0036 
0037     # get plugin cores from harvester config
0038     def get_cores_from_harvester_config(self):
0039         # get module and class names
0040         if hasattr(harvester_config.credmanager, "moduleName"):
0041             moduleNames = self.get_list(harvester_config.credmanager.moduleName)
0042         else:
0043             moduleNames = []
0044         if hasattr(harvester_config.credmanager, "className"):
0045             classNames = self.get_list(harvester_config.credmanager.className)
0046         else:
0047             classNames = []
0048         # file names of original certificates
0049         if hasattr(harvester_config.credmanager, "inCertFile"):
0050             inCertFiles = self.get_list(harvester_config.credmanager.inCertFile)
0051         elif hasattr(harvester_config.credmanager, "certFile"):
0052             inCertFiles = self.get_list(harvester_config.credmanager.certFile)
0053         else:
0054             inCertFiles = []
0055         # file names of certificates to be generated
0056         if hasattr(harvester_config.credmanager, "outCertFile"):
0057             outCertFiles = self.get_list(harvester_config.credmanager.outCertFile)
0058         else:
0059             # use the file name of the certificate for panda connection as output name
0060             outCertFiles = self.get_list(harvester_config.pandacon.cert_file)
0061         # VOMS
0062         if hasattr(harvester_config.credmanager, "voms"):
0063             vomses = self.get_list(harvester_config.credmanager.voms)
0064         else:
0065             vomses = []
0066         # direct and merged plugin configuration in json
0067         if hasattr(harvester_config.credmanager, "pluginConfigs"):
0068             pluginConfigs = harvester_config.credmanager.pluginConfigs
0069         else:
0070             pluginConfigs = []
0071         # from traditional attributes
0072         for moduleName, className, inCertFile, outCertFile, voms in zip(moduleNames, classNames, inCertFiles, outCertFiles, vomses):
0073             plugin_params = {}
0074             plugin_params["module"] = moduleName
0075             plugin_params["name"] = className
0076             plugin_params["inCertFile"] = inCertFile
0077             plugin_params["outCertFile"] = outCertFile
0078             plugin_params["voms"] = voms
0079             try:
0080                 exe_core = self.pluginFactory.get_plugin(plugin_params)
0081                 self.exe_cores.append(exe_core)
0082             except Exception:
0083                 _logger.error(f"failed to launch credmanager with traditional attributes for {plugin_params}")
0084                 core_utils.dump_error_message(_logger)
0085         # from pluginConfigs
0086         for pc in pluginConfigs:
0087             try:
0088                 setup_maps = pc["configs"]
0089                 for setup_name, setup_map in setup_maps.items():
0090                     try:
0091                         plugin_params = {"module": pc["module"], "name": pc["name"], "setup_name": setup_name}
0092                         plugin_params.update(setup_map)
0093                         exe_core = self.pluginFactory.get_plugin(plugin_params)
0094                         self.exe_cores.append(exe_core)
0095                     except Exception:
0096                         _logger.error(f"failed to launch credmanager in pluginConfigs for {plugin_params}")
0097                         core_utils.dump_error_message(_logger)
0098             except Exception:
0099                 _logger.error(f"failed to parse pluginConfigs {pc}")
0100                 core_utils.dump_error_message(_logger)
0101 
0102     # update plugin cores from queue config
0103     def update_cores_from_queue_config(self):
0104         self.queue_exe_cores = []
0105         for queue_name, queue_config in self.queue_config_mapper.get_all_queues().items():
0106             if queue_config.queueStatus == "offline" or not hasattr(queue_config, "credmanagers") or not isinstance(queue_config.credmanagers, list):
0107                 continue
0108             for cm_setup in queue_config.credmanagers:
0109                 try:
0110                     plugin_params = {"module": cm_setup["module"], "name": cm_setup["name"], "setup_name": queue_name, "queueName": queue_name}
0111                     for k, v in cm_setup.items():
0112                         if k in ("module", "name"):
0113                             pass
0114                         if isinstance(v, str) and "$" in v:
0115                             # replace placeholders
0116                             value = v
0117                             patts = re.findall("\$\{([a-zA-Z\d_.]+)\}", v)
0118                             for patt in patts:
0119                                 tmp_ph = "${" + patt + "}"
0120                                 tmp_val = None
0121                                 if patt == "harvesterID":
0122                                     tmp_val = harvester_config.master.harvester_id
0123                                 elif patt == "queueName":
0124                                     tmp_val = queue_name
0125                                 elif patt.startswith("common."):
0126                                     # values from common blocks
0127                                     attr = patt.replace("common.", "")
0128                                     if hasattr(queue_config, "common") and attr in queue_config.common:
0129                                         tmp_val = queue_config.common[attr]
0130                                 if tmp_val is not None:
0131                                     value = value.replace(tmp_ph, tmp_val)
0132                             # fill in
0133                             plugin_params[k] = value
0134                         else:
0135                             # fill in
0136                             plugin_params[k] = v
0137                     exe_core = self.pluginFactory.get_plugin(plugin_params)
0138                     self.queue_exe_cores.append(exe_core)
0139                 except Exception:
0140                     _logger.error(f"failed to launch plugin for queue={queue_name} and {plugin_params}")
0141                     core_utils.dump_error_message(_logger)
0142 
0143     # main loop
0144     def run(self):
0145         while True:
0146             # update plugin cores from queue config
0147             self.update_cores_from_queue_config()
0148 
0149             # execute
0150             self.execute()  # this is the main run
0151 
0152             # check if being terminated
0153             if self.terminated(harvester_config.credmanager.sleepTime, randomize=False):
0154                 return
0155 
0156     # main
0157     def execute(self):
0158         # get lock
0159         locked = self.dbProxy.get_process_lock("credmanager", self.get_pid(), harvester_config.credmanager.sleepTime)
0160         if not locked:
0161             return
0162         # loop over all plugins
0163         for exe_core in itertools.chain(self.exe_cores, self.queue_exe_cores):
0164             # do nothing
0165             if exe_core is None:
0166                 continue
0167             # make logger
0168             if hasattr(exe_core, "setup_name"):
0169                 credmanager_name = exe_core.setup_name
0170             else:
0171                 credmanager_name = f"{exe_core.inCertFile} {exe_core.outCertFile}"
0172             mainLog = self.make_logger(_logger, f"{exe_core.__class__.__name__} {credmanager_name}", method_name="execute")
0173             try:
0174                 # check credential
0175                 mainLog.debug("check credential")
0176                 isValid = exe_core.check_credential()
0177                 if isValid:
0178                     mainLog.debug("valid")
0179                 elif not isValid:
0180                     # renew it if necessary
0181                     mainLog.debug("invalid")
0182                     mainLog.debug("renew credential")
0183                     tmpStat, tmpOut = exe_core.renew_credential()
0184                     if not tmpStat:
0185                         mainLog.error(f"failed : {tmpOut}")
0186                         continue
0187             except Exception:
0188                 core_utils.dump_error_message(mainLog)
0189             mainLog.debug("done")
0190 
0191     # monit main
0192     def execute_monit(self):
0193         self.update_cores_from_queue_config()
0194 
0195         metrics = {}
0196         # loop over all plugins
0197         for exe_core in itertools.chain(self.exe_cores, self.queue_exe_cores):
0198             # do nothing
0199             if exe_core is None:
0200                 continue
0201 
0202             # make logger
0203             if hasattr(exe_core, "setup_name"):
0204                 credmanager_name = exe_core.setup_name
0205             else:
0206                 credmanager_name = f"{exe_core.inCertFile} {exe_core.outCertFile}"
0207 
0208             sub_log = self.make_logger(_logger, f"{exe_core.__class__.__name__} {credmanager_name}", method_name="execute_monit")
0209             try:
0210                 # check credential
0211                 sub_log.debug("check credential lifetime")
0212                 lifetime = exe_core.check_credential_lifetime()
0213                 if lifetime is not None:
0214                     metrics[exe_core.outCertFile] = lifetime
0215             except Exception:
0216                 core_utils.dump_error_message(sub_log)
0217 
0218             sub_log.debug("done")
0219 
0220         return metrics