File indexing completed on 2026-04-10 08:39:02
0001 import datetime
0002 import hashlib
0003 import os
0004 import shutil
0005 import subprocess
0006
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009
0010 _logger = PandaLogger().getLogger("ProxyCache")
0011
0012
0013 def execute(program, log_stream):
0014 """Run a program on the command line. Return stderr, stdout and status."""
0015 log_stream.info(f"executable: {program}")
0016 pipe = subprocess.Popen(
0017 program,
0018 bufsize=-1,
0019 shell=True,
0020 close_fds=False,
0021 cwd="/tmp",
0022 stdout=subprocess.PIPE,
0023 stderr=subprocess.PIPE,
0024 )
0025 stdout, stderr = pipe.communicate()
0026 return stdout, stderr, pipe.wait()
0027
0028
0029 def cat(filename):
0030 """Given filename, print its text contents."""
0031 with open(filename, "r") as f:
0032 out = f.read()
0033 return out
0034
0035
0036 class MyProxyInterface(object):
0037 """Class to store and retrieve proxies from my proxies."""
0038
0039 def __init__(self):
0040 self.__target_path = "/tmp/proxies"
0041 self.__cred_name = "panda"
0042 if not os.path.exists(self.__target_path):
0043 os.makedirs(self.__target_path)
0044
0045 def store(
0046 self,
0047 user_dn,
0048 cred_name,
0049 production=False,
0050 server_name="myproxy.cern.ch",
0051 role=None,
0052 log_stream=None,
0053 ):
0054 log_stream.info("store proxy")
0055
0056
0057 proxy_path = os.path.join(
0058 self.__target_path,
0059 hashlib.sha1(f"{user_dn}.plain".encode("utf-8")).hexdigest(),
0060 )
0061
0062
0063 if os.path.exists(proxy_path) and os.stat(proxy_path).st_size == 0:
0064 if datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.fromtimestamp(
0065 os.path.getctime(proxy_path), datetime.timezone.utc
0066 ) < datetime.timedelta(hours=1):
0067 log_stream.info(f"skip too early to try again according to timestamp of {proxy_path}")
0068 return 2
0069 cmd = f"myproxy-logon -s {server_name} --no_passphrase --out {proxy_path} -l '{user_dn}' -k {cred_name} -t 0"
0070
0071 stdout, stderr, status = execute(cmd, log_stream)
0072 if stdout:
0073 log_stream.info(f"stdout is {stdout} ")
0074 if stderr:
0075 log_stream.info(f"stderr is {stderr} ")
0076
0077 open(proxy_path, "w").close()
0078 log_stream.info(f"test the status of plain... {status}")
0079 if status == 1:
0080 return status
0081
0082 if role is not None:
0083 log_stream.info(f"proxy needs {role} - need to add voms attributes and store it in the cache")
0084 tmpExtension = self.getExtension(role)
0085 prodproxy_path = os.path.join(
0086 self.__target_path,
0087 str(hashlib.sha1((user_dn + tmpExtension).encode("utf-8")).hexdigest()),
0088 )
0089 prodcmd = f"voms-proxy-init -vomses /etc/vomses -valid 96:00 -rfc -cert {proxy_path} -key {proxy_path} -out {prodproxy_path} -n -voms {role}"
0090 stdout, stderr, status = execute(prodcmd, log_stream)
0091 if stdout:
0092 log_stream.info(f"stdout is {stdout} ")
0093 if stderr:
0094 log_stream.info(f"stderr is {stderr} ")
0095 log_stream.debug(f"test the status of production... {status}")
0096 elif production:
0097 log_stream.info("production proxy needed - need to add voms attributes and store it in the cache")
0098 prodproxy_path = os.path.join(
0099 self.__target_path,
0100 str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()),
0101 )
0102 log_stream.info(prodproxy_path)
0103 prodcmd = f"voms-proxy-init -vomses /etc/vomses -valid 96:00 -rfc -cert {proxy_path} -key {proxy_path} -out {prodproxy_path} -n -voms atlas:/atlas/Role=production"
0104 stdout, stderr, status = execute(prodcmd, log_stream)
0105 if stdout:
0106 log_stream.info(f"stdout is {stdout} ")
0107 if stderr:
0108 log_stream.info(f"stderr is {stderr} ")
0109 log_stream.info(f"test the status of production... {status}")
0110 else:
0111
0112 atlasproxy_path = os.path.join(self.__target_path, hashlib.sha1(user_dn.encode("utf-8")).hexdigest())
0113 atlasrolescmd = f"voms-proxy-init -vomses /etc/vomses -valid 96:00 -rfc -cert {proxy_path} -key {proxy_path} -out {atlasproxy_path} -n -voms atlas"
0114 stdout, stderr, status = execute(atlasrolescmd, log_stream)
0115 if stdout:
0116 log_stream.info(f"stdout is {stdout} ")
0117 if stderr:
0118 log_stream.info(f"stderr is {stderr} ")
0119 log_stream.info(f"test the status of atlas... {status}")
0120
0121 if status != 0 and not os.path.exists(proxy_path):
0122 open(proxy_path, "w").close()
0123 return status
0124
0125 def retrieve(self, user_dn, production=False, role=None):
0126 """Retrieve proxy from proxy cache."""
0127 if role is not None:
0128 tmpExtension = self.getExtension(role)
0129 proxy_path = os.path.join(
0130 self.__target_path,
0131 str(hashlib.sha1((user_dn + tmpExtension).encode("utf-8")).hexdigest()),
0132 )
0133 elif production:
0134 proxy_path = os.path.join(
0135 self.__target_path,
0136 str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()),
0137 )
0138 else:
0139 proxy_path = os.path.join(self.__target_path, hashlib.sha1(user_dn.encode("utf-8")).hexdigest())
0140 if os.path.isfile(proxy_path):
0141 return cat(proxy_path)
0142 else:
0143 _logger.warning(f"proxy file does not exist : DN:{user_dn} role:{role} file:{proxy_path}")
0144
0145
0146 def get_proxy_path(self, user_dn, production, role):
0147 if role is not None:
0148 tmpExtension = self.getExtension(role)
0149 return os.path.join(
0150 self.__target_path,
0151 str(hashlib.sha1((user_dn + tmpExtension).encode("utf-8")).hexdigest()),
0152 )
0153 elif production:
0154 return os.path.join(
0155 self.__target_path,
0156 str(hashlib.sha1(f"{user_dn}.prod".encode("utf-8")).hexdigest()),
0157 )
0158 else:
0159 return os.path.join(
0160 self.__target_path,
0161 hashlib.sha1(user_dn.encode("utf-8")).hexdigest(),
0162 )
0163
0164 def checkProxy(self, user_dn, production=False, role=None, name=None):
0165 log_stream = LogWrapper(_logger, f'< name="{name}" role={role} >')
0166 log_stream.info(f"check proxy for {user_dn}")
0167
0168
0169 proxy_path = self.get_proxy_path(user_dn, production, role)
0170 is_ok = False
0171 if os.path.isfile(proxy_path):
0172 log_stream.info("proxy is there. Need to check validity")
0173 cmd = f"voms-proxy-info -exists -hours 94 -file {proxy_path}"
0174 stdout, stderr, status = execute(cmd, log_stream)
0175 if stdout:
0176 log_stream.info(f"stdout is {stdout} ")
0177 if stderr:
0178 log_stream.info(f"stderr is {stderr} ")
0179 if status == 1:
0180 log_stream.info("proxy expires in 94h or less. We need to renew proxy!")
0181 ret = self.store(
0182 user_dn,
0183 self.__cred_name,
0184 production,
0185 role=role,
0186 log_stream=log_stream,
0187 )
0188 if ret == 0:
0189 log_stream.info("proxy retrieval successful")
0190
0191 alt_proxy_path = self.get_proxy_path(name, production, role)
0192 shutil.copyfile(proxy_path, alt_proxy_path)
0193 is_ok = True
0194 elif ret == 2:
0195 log_stream.info("proxy retrieval on hold")
0196 else:
0197 log_stream.error("proxy retrieval failed")
0198 else:
0199 log_stream.info("proxy is valid for more than 3 days")
0200 is_ok = True
0201 else:
0202 log_stream.info("proxy is not in the cache repo. will try to get it from myproxy")
0203 ret = self.store(user_dn, self.__cred_name, production, role=role, log_stream=log_stream)
0204 if ret == 0:
0205 log_stream.info("proxy stored successfully")
0206 alt_proxy_path = self.get_proxy_path(name, production, role)
0207 shutil.copyfile(proxy_path, alt_proxy_path)
0208 is_ok = True
0209 elif ret == 2:
0210 log_stream.info("proxy retrieval on hold")
0211 else:
0212 log_stream.error("proxy retrieval failed")
0213 if is_ok:
0214 plain_path = os.path.join(
0215 self.__target_path,
0216 hashlib.sha1(f"{user_dn}.plain".encode("utf-8")).hexdigest(),
0217 )
0218 if os.path.isfile(plain_path):
0219 return self.checkValidity(plain_path, log_stream)
0220 else:
0221 log_stream.error("plain proxy not there at the moment!")
0222
0223 def checkValidity(self, proxy_path, log_stream):
0224 log_stream.info("Need to check validity and expiry!")
0225 time_left_thresholds = [24, 94, 168]
0226 status = 0
0227 for threshold in time_left_thresholds:
0228 cmd = f"voms-proxy-info -exists -hours {threshold} -file {proxy_path}"
0229 stdout, stderr, status = execute(cmd, log_stream)
0230 if status == 1:
0231 log_stream.warning(f"proxy expires in {threshold} hours")
0232 return threshold
0233
0234 return status
0235
0236
0237 def getExtension(self, role):
0238 if role is not None:
0239 return "." + role.split("=")[-1]
0240 return None