File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test authentication.
0014 """
0015
0016 try:
0017 from urllib import urlencode
0018 except ImportError:
0019 from urllib.parse import urlencode
0020
0021 raw_input = input
0022
0023 import datetime
0024 import sys
0025 import time
0026
0027 import unittest2 as unittest
0028
0029
0030 from idds.common.utils import setup_logging
0031 from idds.core.authentication import OIDCAuthentication
0032
0033
0034 setup_logging(__name__)
0035
0036
0037 class TestAuthentication(unittest.TestCase):
0038
0039 def test_oidc_authentication(self):
0040 vo = "panda_dev"
0041
0042 oidc = OIDCAuthentication()
0043 allow_vos = oidc.get_allow_vos()
0044
0045 assert vo in allow_vos
0046 auth_config = oidc.get_auth_config(vo)
0047
0048 assert "vo" in auth_config
0049 assert auth_config["vo"] == vo
0050
0051 endpoint_config = oidc.get_endpoint_config(auth_config)
0052
0053 assert "token_endpoint" in endpoint_config
0054
0055 status, sign_url = oidc.get_oidc_sign_url(vo)
0056
0057 assert "user_code" in sign_url
0058 print(
0059 (
0060 "Please go to {0} and sign in. "
0061 "Waiting until authentication is completed"
0062 ).format(sign_url["verification_uri_complete"])
0063 )
0064
0065 print("Ready to get ID token?")
0066 while True:
0067 sys.stdout.write("[y/n] \n")
0068 choice = raw_input().lower()
0069 if choice == "y":
0070 break
0071 elif choice == "n":
0072 print("aborted")
0073 return
0074
0075 if "interval" in sign_url:
0076 interval = sign_url["interval"]
0077 else:
0078 interval = 5
0079
0080 if "expires_in" in sign_url:
0081 expires_in = sign_url["expires_in"]
0082 else:
0083 expires_in = 60
0084
0085 token = None
0086 start_time = datetime.datetime.utcnow()
0087 while datetime.datetime.utcnow() - start_time < datetime.timedelta(
0088 seconds=expires_in
0089 ):
0090 try:
0091 status, output = oidc.get_id_token(vo, sign_url["device_code"])
0092 if status:
0093
0094 token = output
0095 break
0096 else:
0097 if (
0098 type(output) in [dict]
0099 and "error" in output
0100 and output["error"] == "authorization_pending"
0101 ):
0102 time.sleep(interval)
0103 else:
0104 print(output)
0105 break
0106 except Exception as error:
0107 print(error)
0108 break
0109
0110 if not token:
0111 print("Failed to get a token")
0112 else:
0113 print(token)
0114 assert "id_token" in token
0115
0116 status, new_token = oidc.refresh_id_token(vo, token["refresh_token"])
0117
0118 assert "id_token" in new_token
0119
0120 print("verifying the token")
0121 status, decoded_token, username = oidc.verify_id_token(
0122 vo, token["id_token"]
0123 )
0124 if not status:
0125 print("Failed to verify the token: %s" % decoded_token)
0126 else:
0127 print(username)
0128 print(decoded_token)