Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2021 - 2024
0010 
0011 
0012 """
0013 Test authentication.
0014 """
0015 
0016 try:
0017     from urllib import urlencode  # noqa F401
0018 except ImportError:
0019     from urllib.parse import urlencode  # noqa F401
0020 
0021     raw_input = input
0022 
0023 import datetime
0024 import sys
0025 import time
0026 
0027 import unittest2 as unittest
0028 
0029 # from nose.tools import assert_equal
0030 from idds.common.utils import setup_logging
0031 from idds.core.authentication import OIDCAuthentication
0032 
0033 
0034 setup_logging(__name__)
0035 
0036 
0037 class TestAuthentication(unittest.TestCase):
0038 
0039     def test_oidc_authentication(self):
0040         vo = "panda_dev"
0041 
0042         oidc = OIDCAuthentication()
0043         allow_vos = oidc.get_allow_vos()
0044         # print(allow_vos)
0045         assert vo in allow_vos
0046         auth_config = oidc.get_auth_config(vo)
0047         # print(auth_config)
0048         assert "vo" in auth_config
0049         assert auth_config["vo"] == vo
0050 
0051         endpoint_config = oidc.get_endpoint_config(auth_config)
0052         # print(endpoint_config)
0053         assert "token_endpoint" in endpoint_config
0054 
0055         status, sign_url = oidc.get_oidc_sign_url(vo)
0056         # print(sign_url)
0057         assert "user_code" in sign_url
0058         print(
0059             (
0060                 "Please go to {0} and sign in. "
0061                 "Waiting until authentication is completed"
0062             ).format(sign_url["verification_uri_complete"])
0063         )
0064 
0065         print("Ready to get ID token?")
0066         while True:
0067             sys.stdout.write("[y/n] \n")
0068             choice = raw_input().lower()
0069             if choice == "y":
0070                 break
0071             elif choice == "n":
0072                 print("aborted")
0073                 return
0074 
0075         if "interval" in sign_url:
0076             interval = sign_url["interval"]
0077         else:
0078             interval = 5
0079 
0080         if "expires_in" in sign_url:
0081             expires_in = sign_url["expires_in"]
0082         else:
0083             expires_in = 60
0084 
0085         token = None
0086         start_time = datetime.datetime.utcnow()
0087         while datetime.datetime.utcnow() - start_time < datetime.timedelta(
0088             seconds=expires_in
0089         ):
0090             try:
0091                 status, output = oidc.get_id_token(vo, sign_url["device_code"])
0092                 if status:
0093                     # print(output)
0094                     token = output
0095                     break
0096                 else:
0097                     if (
0098                         type(output) in [dict]
0099                         and "error" in output
0100                         and output["error"] == "authorization_pending"
0101                     ):
0102                         time.sleep(interval)
0103                     else:
0104                         print(output)
0105                         break
0106             except Exception as error:
0107                 print(error)
0108                 break
0109 
0110         if not token:
0111             print("Failed to get a token")
0112         else:
0113             print(token)
0114             assert "id_token" in token
0115 
0116             status, new_token = oidc.refresh_id_token(vo, token["refresh_token"])
0117             # print(new_token)
0118             assert "id_token" in new_token
0119 
0120             print("verifying the token")
0121             status, decoded_token, username = oidc.verify_id_token(
0122                 vo, token["id_token"]
0123             )
0124             if not status:
0125                 print("Failed to verify the token: %s" % decoded_token)
0126             else:
0127                 print(username)
0128                 print(decoded_token)