Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 download access tokens for OIDC token exchange flow
0003 """
0004 
0005 import datetime
0006 import json
0007 import os.path
0008 import pathlib
0009 
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013 
0014 from pandaserver.config import panda_config
0015 from pandaserver.srvcore.oidc_utils import get_access_token
0016 
0017 # logger
0018 _logger = PandaLogger().getLogger("token_cache")
0019 
0020 
0021 class TokenCache:
0022     """
0023     A class used to download and give access tokens for OIDC token exchange flow
0024 
0025     """
0026 
0027     # constructor
0028     def __init__(self, target_path: str = None, file_prefix: str = None, refresh_interval: int = 60, task_buffer=None):
0029         """
0030         Constructs all the necessary attributes for the TokenCache object.
0031 
0032         :param target_path: The base path to store the access tokens
0033         :param file_prefix: The prefix of the access token files
0034         :param refresh_interval: The interval to refresh the access tokens (default is 60 minutes)
0035         :param task_buffer: TaskBuffer object
0036         """
0037         if target_path:
0038             self.target_path = target_path
0039         else:
0040             self.target_path = "/tmp/proxies"
0041         if not os.path.exists(self.target_path):
0042             os.makedirs(self.target_path)
0043         if file_prefix:
0044             self.file_prefix = file_prefix
0045         else:
0046             self.file_prefix = "access_token_"
0047         self.refresh_interval = refresh_interval
0048         self.task_buffer = task_buffer
0049         # cache for access tokens
0050         self.cached_access_tokens = {}
0051 
0052     # construct target path
0053     def construct_target_path(self, client_name: str) -> str:
0054         """
0055         Constructs the target path to store an access token
0056 
0057         :param client_name: client name
0058         :return: the target path
0059         """
0060         return os.path.join(self.target_path, f"{self.file_prefix}{client_name}")
0061 
0062     # main
0063     def run(self):
0064         """ "
0065         Main function to download access tokens
0066         """
0067         tmp_log = LogWrapper(_logger)
0068         tmp_log.debug("================= start ==================")
0069         try:
0070             # check config
0071             if not hasattr(panda_config, "token_cache_config") or not panda_config.token_cache_config:
0072                 tmp_log.debug("token_cache_config is not set in panda_config")
0073             # check config path
0074             elif not os.path.exists(panda_config.token_cache_config):
0075                 tmp_log.debug(f"config file {panda_config.token_cache_config} not found")
0076             # read config
0077             else:
0078                 with open(panda_config.token_cache_config) as f:
0079                     token_cache_config = json.load(f)
0080                 for client_name, client_config in token_cache_config.items():
0081                     tmp_log.debug(f"client_name={client_name}")
0082                     # token file path
0083                     token_file_path = client_config.get("token_file_path")
0084                     if not token_file_path:
0085                         token_file_path = self.construct_target_path(client_name)
0086                     # check if fresh
0087                     is_fresh = False
0088                     if os.path.exists(token_file_path):
0089                         mod_time = datetime.datetime.fromtimestamp(os.stat(token_file_path).st_mtime, datetime.timezone.utc)
0090                         if datetime.datetime.now(datetime.timezone.utc) - mod_time < datetime.timedelta(minutes=self.refresh_interval):
0091                             tmp_log.debug(f"skip since {token_file_path} is fresh")
0092                             is_fresh = True
0093                     # get access token
0094                     if not is_fresh:
0095                         status_code, output = get_access_token(
0096                             client_config["endpoint"], client_config["client_id"], client_config["secret"], client_config.get("scope")
0097                         )
0098                         if status_code:
0099                             with open(token_file_path, "w") as f:
0100                                 f.write(output)
0101                             tmp_log.debug(f"dump access token to {token_file_path}")
0102                         else:
0103                             tmp_log.error(output)
0104                             # touch file to avoid immediate reattempt
0105                             pathlib.Path(token_file_path).touch()
0106                             tmp_log.debug(f"touch {token_file_path} to avoid immediate reattempt")
0107                     # register token keys
0108                     if client_config.get("use_token_key") is True and self.task_buffer is not None:
0109                         token_key_lifetime = client_config.get("token_key_lifetime", 96)
0110                         tmp_log.debug(f"register token key for {client_name}")
0111                         tmp_stat = self.task_buffer.register_token_key(client_name, token_key_lifetime)
0112                         if not tmp_stat:
0113                             tmp_log.error("failed")
0114         except Exception as e:
0115             tmp_log.error(f"failed with {str(e)}")
0116         tmp_log.debug("================= end ==================")
0117         tmp_log.debug("done")
0118         return
0119 
0120     # get access token for a client
0121     def get_access_token(self, client_name: str) -> str | None:
0122         """
0123         Get an access token string for a client. None is returned if the access token is not found
0124 
0125         :param client_name : client name
0126         :return: the access token
0127         """
0128         time_now = naive_utcnow()
0129         if client_name in self.cached_access_tokens and self.cached_access_tokens[client_name]["last_update"] + datetime.timedelta(minutes=10) < time_now:
0130             # use cached token since it is still fresh
0131             pass
0132         else:
0133             target_path = self.construct_target_path(client_name)
0134             token = None
0135             if os.path.exists(target_path):
0136                 with open(target_path) as f:
0137                     token = f.read()
0138             if not token:
0139                 token = None
0140             self.cached_access_tokens[client_name] = {"token": token, "last_update": time_now}
0141         return self.cached_access_tokens[client_name]["token"]