File indexing completed on 2026-04-20 07:58:58
0001 import json
0002 import os
0003 import re
0004 import time
0005 import traceback
0006
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0010 from pandaharvester.harvestermisc.token_utils import (
0011 IssuerBroker,
0012 WLCG_scopes,
0013 endpoint_to_filename,
0014 )
0015
0016 from .base_cred_manager import BaseCredManager
0017
0018
0019 _logger = core_utils.setup_logger("iam_token_cred_manager")
0020
0021
0022 ALL_TARGET_TYPES = ["common", "ce", "panda"]
0023
0024
0025 default_port_map = {
0026 "htcondor-ce": 9619,
0027 "arc-ce": 443,
0028 }
0029
0030
0031
0032
0033 class IamTokenCredManager(BaseCredManager):
0034
0035 def __init__(self, **kwarg):
0036 BaseCredManager.__init__(self, **kwarg)
0037
0038 tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="__init__")
0039
0040 if hasattr(self, "inFile"):
0041
0042 try:
0043 with open(self.inFile) as f:
0044 self.setupMap = json.load(f)
0045 except Exception as e:
0046 tmp_log.error(f"Error with inFile. {e.__class__.__name__}: {e}")
0047 self.setupMap = {}
0048 raise
0049 else:
0050
0051 self.setupMap = dict(vars(self))
0052
0053 try:
0054 self.client_cred_file = self.setupMap["client_cred_file"]
0055 with open(self.client_cred_file) as f:
0056 client_cred_dict = json.load(f)
0057 self.issuer = client_cred_dict["issuer"]
0058 self.client_id = client_cred_dict["client_id"]
0059 self.client_secret = client_cred_dict["client_secret"]
0060 self.target_type = self.setupMap["target_type"]
0061 self.out_dir = self.setupMap["out_dir"]
0062 self.panda_token_filename = self.setupMap.get("panda_token_filename", "panda_token")
0063 self.lifetime = self.setupMap.get("lifetime", 14 * 24 * 60 * 60)
0064 self.target_list = self.setupMap.get("target_list")
0065 self.target_list_file = self.setupMap.get("target_list_file")
0066 self.update_ts_path = self.setupMap.get("update_ts_path", os.path.join(self.out_dir, "_last_update"))
0067 self.check_interval = self.setupMap.get("check_interval", 300)
0068 self.refresh_interval = self.setupMap.get("refresh_interval", 3600)
0069 except KeyError as e:
0070 tmp_log.error(f"Missing attributes in setup. {traceback.format_exc()}")
0071 raise
0072 else:
0073 if self.target_type not in ALL_TARGET_TYPES:
0074 tmp_log.error(f"Unsupported target_type: {self.target_type}")
0075 raise Exception("Unsupported target_type")
0076
0077 self.targets_dict = dict()
0078
0079 self._handle_target_types()
0080
0081 self.issuer_broker = IssuerBroker(self.issuer, self.client_id, self.client_secret, name=self.setup_name)
0082
0083 def _is_updated(self):
0084 now_time = time.time()
0085 ret = False
0086 if os.path.isfile(self.update_ts_path) and now_time - os.path.getmtime(self.update_ts_path) < self.check_interval:
0087 ret = True
0088 return ret
0089
0090 def _is_fresh(self, token_path):
0091 now_time = time.time()
0092 ret = False
0093 if os.path.isfile(token_path) and os.path.getsize(token_path) > 0 and now_time - os.path.getmtime(token_path) < self.refresh_interval:
0094 ret = True
0095 return ret
0096
0097 def _update_ts(self):
0098 tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="_update_ts")
0099 with open(self.update_ts_path, "w") as f:
0100 f.write(str(self.out_dir))
0101 tmp_log.debug(f"updated timestamp file {self.update_ts_path}")
0102
0103 def _clean_up(self):
0104 tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="_clean_up")
0105 now_time = time.time()
0106 for filename in os.listdir(self.out_dir):
0107 file_path = os.path.join(self.out_dir, filename)
0108 if now_time - os.path.getmtime(file_path) > self.lifetime:
0109 if os.path.isfile(file_path):
0110 os.remove(file_path)
0111 tmp_log.debug(f"deleted old token file {file_path}")
0112
0113 def _handle_target_types(self):
0114
0115 tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="_handle_target_types")
0116 try:
0117 self.panda_queues_dict = PandaQueuesDict()
0118 except Exception as e:
0119 tmp_log.error(f"Problem calling PandaQueuesDict. {traceback.format_exc()}")
0120 raise
0121 if self.target_type == "common":
0122 if not self.target_list:
0123 pass
0124 else:
0125 for target in self.target_list:
0126 self.targets_dict[target] = {}
0127
0128 self.scope = ""
0129 elif self.target_type == "panda":
0130
0131 panda_server_target = None
0132 try:
0133 panda_server_url = harvester_config.pandacon.pandaURLSSL
0134 panda_server_target_match = re.match(r"https://[^:/]+", panda_server_url)
0135 if panda_server_target_match:
0136 panda_server_target = panda_server_target_match[0]
0137 except AttributeError:
0138 pass
0139 self.target_list = [panda_server_target]
0140 for target in self.target_list:
0141 self.targets_dict[target] = {}
0142 self.scope = ""
0143 elif self.target_type == "ce":
0144 try:
0145
0146 for site, val in self.panda_queues_dict.items():
0147 if val.get("status") == "offline":
0148
0149 continue
0150 ce_q_list = val.get("queues")
0151 if ce_q_list:
0152
0153 for ce_q in ce_q_list:
0154
0155
0156
0157
0158 ce_endpoint = ce_q.get("ce_endpoint")
0159 ce_hostname = re.sub(":\w*", "", ce_endpoint)
0160 ce_flavour = ce_q.get("ce_flavour")
0161 ce_flavour_str = str(ce_flavour).lower()
0162 ce_endpoint_modified = ce_endpoint
0163 if ce_endpoint == ce_hostname:
0164
0165 if ce_flavour_str in default_port_map:
0166 default_port = default_port_map[ce_flavour_str]
0167 ce_endpoint_modified = f"{ce_hostname}:{default_port}"
0168 if ce_endpoint_modified and ce_flavour:
0169 target_attr_dict = {
0170 "ce_flavour": ce_flavour,
0171 }
0172 self.targets_dict[ce_endpoint_modified] = target_attr_dict
0173 else:
0174
0175 continue
0176 except Exception as e:
0177 tmp_log.error(f"Problem retrieving CEs from CRIC. {traceback.format_exc()}")
0178 raise
0179
0180 if self.target_list_file:
0181 try:
0182 with open(self.target_list_file) as f:
0183 for target_str in f.readlines():
0184 if target_str:
0185 target = target_str.rstrip()
0186 target_attr_dict = {
0187 "ce_flavour": None,
0188 }
0189 self.targets_dict[target] = target_attr_dict
0190 except Exception as e:
0191 tmp_log.error(f"Problem retrieving CEs from local file. {traceback.format_exc()}")
0192 raise
0193
0194 self.scope = WLCG_scopes.COMPUTE_ALL
0195
0196
0197 def check_credential(self):
0198
0199 tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="check_credential")
0200
0201 self._clean_up()
0202 self._handle_target_types()
0203
0204 is_fresh = self._is_updated()
0205 if is_fresh:
0206 tmp_log.debug("last renewal is still recent, skipped")
0207 else:
0208 tmp_log.debug("to renew tokens")
0209 return is_fresh
0210
0211
0212 def renew_credential(self):
0213
0214 tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="renew_credential")
0215
0216 all_ok = True
0217 all_err_str = ""
0218 for target in self.targets_dict:
0219 try:
0220
0221 if self.target_type == "panda":
0222 token_filename = self.panda_token_filename
0223 else:
0224 token_filename = endpoint_to_filename(target)
0225 token_path = os.path.join(self.out_dir, token_filename)
0226
0227 if self._is_fresh(token_path):
0228
0229 tmp_log.debug(f"token for {target} at {token_path} still fresh; skipped")
0230 else:
0231
0232 access_token = self.issuer_broker.get_access_token(aud=target, scope=self.scope)
0233 with open(token_path, "w") as f:
0234 f.write(access_token)
0235 tmp_log.info(f"renewed token for {target} at {token_path}")
0236 except Exception as e:
0237 err_str = f"Problem getting token for {target}. {traceback.format_exc()}"
0238 tmp_log.error(err_str)
0239 all_ok = False
0240 all_err_str = "failed to get some tokens. Check the plugin log for details "
0241 continue
0242
0243 self._update_ts()
0244 tmp_log.debug("done")
0245
0246 return all_ok, all_err_str