Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import json
0002 import os
0003 import re
0004 import time
0005 import traceback
0006 
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0010 from pandaharvester.harvestermisc.token_utils import (
0011     IssuerBroker,
0012     WLCG_scopes,
0013     endpoint_to_filename,
0014 )
0015 
0016 from .base_cred_manager import BaseCredManager
0017 
0018 # logger
0019 _logger = core_utils.setup_logger("iam_token_cred_manager")
0020 
0021 # allowed target types
0022 ALL_TARGET_TYPES = ["common", "ce", "panda"]
0023 
0024 # default port for CEs
0025 default_port_map = {
0026     "htcondor-ce": 9619,
0027     "arc-ce": 443,
0028 }
0029 
0030 # credential manager with IAM token
0031 
0032 
0033 class IamTokenCredManager(BaseCredManager):
0034     # constructor
0035     def __init__(self, **kwarg):
0036         BaseCredManager.__init__(self, **kwarg)
0037         # make logger
0038         tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="__init__")
0039         # attributes
0040         if hasattr(self, "inFile"):
0041             # parse inFile setup configuration
0042             try:
0043                 with open(self.inFile) as f:
0044                     self.setupMap = json.load(f)
0045             except Exception as e:
0046                 tmp_log.error(f"Error with inFile. {e.__class__.__name__}: {e}")
0047                 self.setupMap = {}
0048                 raise
0049         else:
0050             # set up with direct attributes
0051             self.setupMap = dict(vars(self))
0052         # validate setupMap
0053         try:
0054             self.client_cred_file = self.setupMap["client_cred_file"]
0055             with open(self.client_cred_file) as f:
0056                 client_cred_dict = json.load(f)
0057                 self.issuer = client_cred_dict["issuer"]
0058                 self.client_id = client_cred_dict["client_id"]
0059                 self.client_secret = client_cred_dict["client_secret"]
0060             self.target_type = self.setupMap["target_type"]
0061             self.out_dir = self.setupMap["out_dir"]
0062             self.panda_token_filename = self.setupMap.get("panda_token_filename", "panda_token")
0063             self.lifetime = self.setupMap.get("lifetime", 14 * 24 * 60 * 60)
0064             self.target_list = self.setupMap.get("target_list")
0065             self.target_list_file = self.setupMap.get("target_list_file")
0066             self.update_ts_path = self.setupMap.get("update_ts_path", os.path.join(self.out_dir, "_last_update"))
0067             self.check_interval = self.setupMap.get("check_interval", 300)
0068             self.refresh_interval = self.setupMap.get("refresh_interval", 3600)
0069         except KeyError as e:
0070             tmp_log.error(f"Missing attributes in setup. {traceback.format_exc()}")
0071             raise
0072         else:
0073             if self.target_type not in ALL_TARGET_TYPES:
0074                 tmp_log.error(f"Unsupported target_type: {self.target_type}")
0075                 raise Exception("Unsupported target_type")
0076         # initialize
0077         self.targets_dict = dict()
0078         # handle targets
0079         self._handle_target_types()
0080         # issuer broker
0081         self.issuer_broker = IssuerBroker(self.issuer, self.client_id, self.client_secret, name=self.setup_name)
0082 
0083     def _is_updated(self):
0084         now_time = time.time()
0085         ret = False
0086         if os.path.isfile(self.update_ts_path) and now_time - os.path.getmtime(self.update_ts_path) < self.check_interval:
0087             ret = True
0088         return ret
0089 
0090     def _is_fresh(self, token_path):
0091         now_time = time.time()
0092         ret = False
0093         if os.path.isfile(token_path) and os.path.getsize(token_path) > 0 and now_time - os.path.getmtime(token_path) < self.refresh_interval:
0094             ret = True
0095         return ret
0096 
0097     def _update_ts(self):
0098         tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="_update_ts")
0099         with open(self.update_ts_path, "w") as f:
0100             f.write(str(self.out_dir))
0101         tmp_log.debug(f"updated timestamp file {self.update_ts_path}")
0102 
0103     def _clean_up(self):
0104         tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="_clean_up")
0105         now_time = time.time()
0106         for filename in os.listdir(self.out_dir):
0107             file_path = os.path.join(self.out_dir, filename)
0108             if now_time - os.path.getmtime(file_path) > self.lifetime:
0109                 if os.path.isfile(file_path):
0110                     os.remove(file_path)
0111                     tmp_log.debug(f"deleted old token file {file_path}")
0112 
0113     def _handle_target_types(self):
0114         # make logger
0115         tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="_handle_target_types")
0116         try:
0117             self.panda_queues_dict = PandaQueuesDict()
0118         except Exception as e:
0119             tmp_log.error(f"Problem calling PandaQueuesDict. {traceback.format_exc()}")
0120             raise
0121         if self.target_type == "common":
0122             if not self.target_list:
0123                 pass
0124             else:
0125                 for target in self.target_list:
0126                     self.targets_dict[target] = {}
0127                 # scope
0128                 self.scope = ""
0129         elif self.target_type == "panda":
0130             # panda server
0131             panda_server_target = None
0132             try:
0133                 panda_server_url = harvester_config.pandacon.pandaURLSSL
0134                 panda_server_target_match = re.match(r"https://[^:/]+", panda_server_url)
0135                 if panda_server_target_match:
0136                     panda_server_target = panda_server_target_match[0]
0137             except AttributeError:
0138                 pass
0139             self.target_list = [panda_server_target]
0140             for target in self.target_list:
0141                 self.targets_dict[target] = {}
0142                 self.scope = ""
0143         elif self.target_type == "ce":
0144             try:
0145                 # retrieve CEs from CRIC
0146                 for site, val in self.panda_queues_dict.items():
0147                     if val.get("status") == "offline":
0148                         # do not generate token for offline PQs, but for online, brokeroff, pause, ...
0149                         continue
0150                     ce_q_list = val.get("queues")
0151                     if ce_q_list:
0152                         # loop over all ce queues
0153                         for ce_q in ce_q_list:
0154                             # ce_status = ce_q.get('ce_status')
0155                             # if not ce_status or ce_status == 'DISABLED':
0156                             #     # skip disabled ce queues
0157                             #     continue
0158                             ce_endpoint = ce_q.get("ce_endpoint")
0159                             ce_hostname = re.sub(":\w*", "", ce_endpoint)
0160                             ce_flavour = ce_q.get("ce_flavour")
0161                             ce_flavour_str = str(ce_flavour).lower()
0162                             ce_endpoint_modified = ce_endpoint
0163                             if ce_endpoint == ce_hostname:
0164                                 # no port, add default port
0165                                 if ce_flavour_str in default_port_map:
0166                                     default_port = default_port_map[ce_flavour_str]
0167                                     ce_endpoint_modified = f"{ce_hostname}:{default_port}"
0168                             if ce_endpoint_modified and ce_flavour:
0169                                 target_attr_dict = {
0170                                     "ce_flavour": ce_flavour,
0171                                 }
0172                                 self.targets_dict[ce_endpoint_modified] = target_attr_dict
0173                     else:
0174                         # do not generate token if no queues of CE
0175                         continue
0176             except Exception as e:
0177                 tmp_log.error(f"Problem retrieving CEs from CRIC. {traceback.format_exc()}")
0178                 raise
0179             # retrieve CEs from local file
0180             if self.target_list_file:
0181                 try:
0182                     with open(self.target_list_file) as f:
0183                         for target_str in f.readlines():
0184                             if target_str:
0185                                 target = target_str.rstrip()
0186                                 target_attr_dict = {
0187                                     "ce_flavour": None,
0188                                 }
0189                                 self.targets_dict[target] = target_attr_dict
0190                 except Exception as e:
0191                     tmp_log.error(f"Problem retrieving CEs from local file. {traceback.format_exc()}")
0192                     raise
0193             # scope for CE
0194             self.scope = WLCG_scopes.COMPUTE_ALL
0195 
0196     # check proxy
0197     def check_credential(self):
0198         # make logger
0199         tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="check_credential")
0200         # clean up and handle targets
0201         self._clean_up()
0202         self._handle_target_types()
0203         # same update period as credmanager agent
0204         is_fresh = self._is_updated()
0205         if is_fresh:
0206             tmp_log.debug("last renewal is still recent, skipped")
0207         else:
0208             tmp_log.debug("to renew tokens")
0209         return is_fresh
0210 
0211     # renew proxy
0212     def renew_credential(self):
0213         # make logger
0214         tmp_log = self.make_logger(_logger, f"config={self.setup_name}", method_name="renew_credential")
0215         # go
0216         all_ok = True
0217         all_err_str = ""
0218         for target in self.targets_dict:
0219             try:
0220                 # write to file
0221                 if self.target_type == "panda":
0222                     token_filename = self.panda_token_filename
0223                 else:
0224                     token_filename = endpoint_to_filename(target)
0225                 token_path = os.path.join(self.out_dir, token_filename)
0226                 # check token freshness
0227                 if self._is_fresh(token_path):
0228                     # token still fresh, skip it
0229                     tmp_log.debug(f"token for {target} at {token_path} still fresh; skipped")
0230                 else:
0231                     # renew access token of target
0232                     access_token = self.issuer_broker.get_access_token(aud=target, scope=self.scope)
0233                     with open(token_path, "w") as f:
0234                         f.write(access_token)
0235                     tmp_log.info(f"renewed token for {target} at {token_path}")
0236             except Exception as e:
0237                 err_str = f"Problem getting token for {target}. {traceback.format_exc()}"
0238                 tmp_log.error(err_str)
0239                 all_ok = False
0240                 all_err_str = "failed to get some tokens. Check the plugin log for details "
0241                 continue
0242         # update last timestamp
0243         self._update_ts()
0244         tmp_log.debug("done")
0245         # return
0246         return all_ok, all_err_str