File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test authentication.
0014 """
0015
0016 try:
0017 from urllib import urlencode
0018 except ImportError:
0019 from urllib.parse import urlencode
0020
0021 raw_input = input
0022
0023 import datetime
0024 import sys
0025 import time
0026
0027 import unittest2 as unittest
0028
0029
0030 from idds.common.utils import setup_logging
0031 from idds.core.authentication import OIDCAuthentication
0032
0033
0034 setup_logging(__name__)
0035
0036
0037 class TestAuthentication:
0038
0039 def test_oidc_authentication(self):
0040 vo = "iamdev"
0041
0042 oidc = OIDCAuthentication()
0043 allow_vos = oidc.get_allow_vos()
0044 print("allow_vos")
0045 print(allow_vos)
0046 assert vo in allow_vos
0047 auth_config = oidc.get_auth_config(vo)
0048 print("auth_config")
0049 print(auth_config)
0050 assert "vo" in auth_config
0051 assert auth_config["vo"] == vo
0052
0053 endpoint_config = oidc.get_endpoint_config(auth_config)
0054 print("endpoint_config")
0055 print(endpoint_config)
0056 assert "token_endpoint" in endpoint_config
0057
0058 status, sign_url = oidc.get_oidc_sign_url(vo)
0059 print("sign_url")
0060 print(sign_url)
0061 assert "user_code" in sign_url
0062 print(
0063 (
0064 "Please go to {0} and sign in. "
0065 "Waiting until authentication is completed"
0066 ).format(sign_url["verification_uri_complete"])
0067 )
0068
0069 print("Ready to get ID token?")
0070 while True:
0071 sys.stdout.write("[y/n] \n")
0072 choice = raw_input().lower()
0073 if choice == "y":
0074 break
0075 elif choice == "n":
0076 print("aborted")
0077 return
0078
0079 if "interval" in sign_url:
0080 interval = sign_url["interval"]
0081 else:
0082 interval = 5
0083
0084 if "expires_in" in sign_url:
0085 expires_in = sign_url["expires_in"]
0086 else:
0087 expires_in = 60
0088
0089 token = None
0090 start_time = datetime.datetime.utcnow()
0091 while datetime.datetime.utcnow() - start_time < datetime.timedelta(
0092 seconds=expires_in
0093 ):
0094 try:
0095 status, output = oidc.get_id_token(vo, sign_url["device_code"])
0096 if status:
0097
0098 token = output
0099 break
0100 else:
0101 if (
0102 type(output) in [dict]
0103 and "error" in output
0104 and output["error"] == "authorization_pending"
0105 ):
0106 time.sleep(interval)
0107 else:
0108 print(output)
0109 break
0110 except Exception as error:
0111 print(error)
0112 break
0113
0114 if not token:
0115 print("Failed to get a token")
0116 else:
0117 print(token)
0118 assert "id_token" in token
0119
0120 status, new_token = oidc.refresh_id_token(vo, token["refresh_token"])
0121
0122 assert "id_token" in new_token
0123
0124 print("verifying the token")
0125 status, decoded_token, username = oidc.verify_id_token(
0126 vo, token["id_token"]
0127 )
0128 if not status:
0129 print("Failed to verify the token: %s" % decoded_token)
0130 else:
0131 print(username)
0132 print(decoded_token)
0133
0134
0135 if __name__ == "__main__":
0136 test = TestAuthentication()
0137 test.test_oidc_authentication()