Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import subprocess
0002 
0003 from pandaharvester.harvestercore import core_utils
0004 
0005 from .base_cred_manager import BaseCredManager
0006 
0007 # logger
0008 _logger = core_utils.setup_logger("no_voms_cred_manager")
0009 
0010 
0011 # credential manager with no-voms proxy
0012 class NoVomsCredManager(BaseCredManager):
0013     # constructor
0014     def __init__(self, **kwarg):
0015         BaseCredManager.__init__(self, **kwarg)
0016         # make logger
0017         main_log = self.make_logger(_logger, method_name="__init__")
0018         # set up with direct attributes
0019         self.setupMap = dict(vars(self))
0020         # setupMap
0021         self.genFromKeyCert = self.setupMap.get("genFromKeyCert")
0022         self.key = self.setupMap.get("key")
0023         self.cert = self.setupMap.get("cert")
0024         self.checkPeriod = self.setupMap.get("checkPeriod", 1)
0025         self.lifetime = self.setupMap.get("lifetime", 96)
0026         self.renewCommand = self.setupMap.get("renewCommand", "voms-proxy-init")
0027         self.extraRenewOpts = self.setupMap.get("extraRenewOpts", "")
0028         self.lifetimeOptFormat = self.setupMap.get("lifetimeOptFormat", "-valid {lifetime}:00")
0029 
0030     # check proxy lifetime for monitoring/alerting purposes
0031     def check_credential_lifetime(self):
0032         main_log = self.make_logger(_logger, method_name="check_credential_lifetime")
0033         lifetime = None
0034         try:
0035             command_str = f"voms-proxy-info -timeleft -file {self.outCertFile}"
0036             p = subprocess.Popen(command_str.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0037             stdout, stderr = p.communicate()
0038             return_code = p.returncode
0039             main_log.debug(f"retCode={return_code} stdout={stdout} stderr={stderr}")
0040             if return_code == 0:  # OK
0041                 lifetime = int(stdout) / 3600
0042         except Exception:
0043             core_utils.dump_error_message(main_log)
0044         if isinstance(lifetime, float):
0045             main_log.debug(f"returning lifetime {lifetime:.3f}")
0046         else:
0047             main_log.debug(f"returning lifetime {lifetime}")
0048         return lifetime
0049 
0050     # check proxy
0051     def check_credential(self):
0052         # make logger
0053         main_log = self.make_logger(_logger, method_name="check_credential")
0054         # lifetime threshold to trigger renew in hour
0055         threshold = max(self.lifetime - self.checkPeriod, 0)
0056         comStr = f"voms-proxy-info -exists -hours {threshold} -file {self.outCertFile}"
0057         main_log.debug(comStr)
0058         try:
0059             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0060             stdOut, stdErr = p.communicate()
0061             retCode = p.returncode
0062         except Exception:
0063             core_utils.dump_error_message(main_log)
0064             return False
0065         main_log.debug(f"retCode={retCode} stdOut={stdOut} stdErr={stdErr}")
0066         return retCode == 0
0067 
0068     # renew proxy
0069     def renew_credential(self):
0070         # make logger
0071         main_log = self.make_logger(_logger, method_name="renew_credential")
0072         # voms or no-voms
0073         voms_option = ""
0074         if self.voms is not None:
0075             voms_option = f"-voms {self.voms}"
0076         # generate proxy with a long lifetime proxy (default) or from key/cert pair
0077         if self.genFromKeyCert:
0078             noregen_option = ""
0079             usercert_value = self.cert
0080             userkey_value = self.key
0081         else:
0082             noregen_option = "-noregen"
0083             usercert_value = self.inCertFile
0084             userkey_value = self.inCertFile
0085         lifetimeOpt = self.lifetimeOptFormat.format(lifetime=self.lifetime)
0086         # command
0087         comStr = "{renew_command} -rfc {noregen_option} {voms_option} " "-out {out} {lifetime} -cert={cert} -key={key} {extrea_renew_opts}".format(
0088             renew_command=self.renewCommand,
0089             noregen_option=noregen_option,
0090             voms_option=voms_option,
0091             out=self.outCertFile,
0092             lifetime=lifetimeOpt,
0093             cert=usercert_value,
0094             key=userkey_value,
0095             extrea_renew_opts=self.extraRenewOpts,
0096         )
0097         main_log.debug(comStr)
0098         try:
0099             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0100             stdOut, stdErr = p.communicate()
0101             retCode = p.returncode
0102             main_log.debug(f"retCode={retCode} stdOut={stdOut} stdErr={stdErr}")
0103         except Exception:
0104             stdOut = ""
0105             stdErr = core_utils.dump_error_message(main_log)
0106             retCode = -1
0107         return retCode == 0, f"{stdOut} {stdErr}"