Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /panda-server/pandaserver/pandamcp/client_utils/get_panda_token.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 #!/usr/bin/env python3
0002 """
0003 get_panda_token.py
0004 Replicates panda-client's device authorization flow to get an id_token.
0005 Usage: python get_panda_token.py
0006 """
0007 
0008 import json
0009 import os
0010 import sys
0011 import time
0012 from datetime import datetime, timedelta
0013 from urllib import error, parse, request
0014 
0015 PANDA_SERVER = os.environ.get("PANDA_SERVER", "https://pandaserver.cern.ch:25443")
0016 VO = os.environ.get("VO", "atlas")
0017 TOKEN_FILE = os.environ.get("TOKEN_FILE", os.path.join(os.path.expanduser("~"), ".panda_id_token"))
0018 
0019 
0020 def fetch_json(url, post_data=None):
0021     import ssl
0022 
0023     ctx = ssl.create_default_context()
0024     ctx.check_hostname = False
0025     ctx.verify_mode = ssl.CERT_NONE
0026 
0027     if post_data:
0028         data = parse.urlencode(post_data).encode()
0029         req = request.Request(url, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"})
0030     else:
0031         req = request.Request(url)
0032 
0033     with request.urlopen(req, context=ctx) as resp:
0034         return json.loads(resp.read().decode())
0035 
0036 
0037 def main():
0038     print(f"==> Fetching auth config from {PANDA_SERVER}/auth/{VO}_auth_config.json")
0039     try:
0040         auth_config = fetch_json(f"{PANDA_SERVER}/auth/{VO}_auth_config.json")
0041     except Exception as e:
0042         print(f"ERROR: Failed to fetch auth config: {e}")
0043         sys.exit(1)
0044 
0045     client_id = auth_config["client_id"]
0046     client_secret = auth_config.get("client_secret") or ""
0047     audience = auth_config["audience"]
0048     oidc_config_url = auth_config["oidc_config_url"]
0049 
0050     print(f"==> client_id: {client_id}")
0051     print(f"==> audience:  {audience}")
0052 
0053     try:
0054         oidc_config = fetch_json(oidc_config_url)
0055     except Exception as e:
0056         print(f"ERROR: Failed to fetch OIDC config: {e}")
0057         sys.exit(1)
0058 
0059     device_endpoint = oidc_config["device_authorization_endpoint"]
0060     token_endpoint = oidc_config["token_endpoint"]
0061 
0062     print("==> Requesting device code...")
0063     try:
0064         device_response = fetch_json(
0065             device_endpoint,
0066             {
0067                 "client_id": client_id,
0068                 "scope": "openid profile email offline_access",
0069                 "audience": audience,
0070             },
0071         )
0072     except Exception as e:
0073         print(f"ERROR: Failed to request device code: {e}")
0074         sys.exit(1)
0075 
0076     device_code = device_response["device_code"]
0077     verification_uri = device_response["verification_uri_complete"]
0078     expires_in = int(device_response["expires_in"])
0079     interval = int(device_response.get("interval", 5))
0080 
0081     print()
0082     print("==> Please open the following URL in your browser and sign in:")
0083     print()
0084     print(f"    {verification_uri}")
0085     print()
0086     input("Press Enter once you have signed in...")
0087 
0088     print("==> Polling for token...")
0089     elapsed = 0
0090     id_token = None
0091     token_expires_in = None
0092 
0093     while elapsed < expires_in:
0094         try:
0095             token_response = fetch_json(
0096                 token_endpoint,
0097                 {
0098                     "client_id": client_id,
0099                     "client_secret": client_secret,
0100                     "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
0101                     "device_code": device_code,
0102                 },
0103             )
0104         except Exception as e:
0105             print(f"ERROR: Token request failed: {e}")
0106             sys.exit(1)
0107 
0108         err = token_response.get("error", "")
0109 
0110         if err == "authorization_pending":
0111             # RFC 8628: wait at least interval seconds between polls; +1 adds a small buffer
0112             sleep_time = interval + 1
0113             time.sleep(sleep_time)
0114             elapsed += sleep_time
0115             continue
0116         elif not err:
0117             id_token = token_response["id_token"]
0118             token_expires_in = token_response.get("expires_in")
0119             with open(TOKEN_FILE, "w") as f:
0120                 json.dump(token_response, f)
0121             expiry_str = (datetime.now() + timedelta(seconds=int(token_expires_in))).strftime("%Y-%m-%d %H:%M:%S") if token_expires_in else "unknown"
0122             print()
0123             print(f"==> Token saved to {TOKEN_FILE}")
0124             print(f"==> Token expires at: {expiry_str}")
0125             break
0126         else:
0127             print(f"ERROR: {json.dumps(token_response)}")
0128             sys.exit(1)
0129 
0130     if not id_token:
0131         print("ERROR: Timed out waiting for authentication")
0132         sys.exit(1)
0133 
0134     print("==> Token generated successfully!")
0135     print()
0136     print("Run the following to set your token:")
0137     print()
0138     if sys.platform == "win32":
0139         print("  PowerShell:")
0140         print(f'    $env:ACCESS_TOKEN="{id_token}"')
0141         print(f'    $env:TOKEN_FILE="{TOKEN_FILE}"')
0142         print("  CMD:")
0143         print(f"    set ACCESS_TOKEN={id_token}")
0144         print(f"    set TOKEN_FILE={TOKEN_FILE}")
0145     else:
0146         print(f"  export ACCESS_TOKEN={id_token}")
0147         print(f"  export TOKEN_FILE={TOKEN_FILE}")
0148 
0149 
0150 if __name__ == "__main__":
0151     main()