Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2021 - 2024
0010 
0011 
0012 """
0013 Test authentication.
0014 """
0015 
0016 try:
0017     from urllib import urlencode  # noqa F401
0018 except ImportError:
0019     from urllib.parse import urlencode  # noqa F401
0020 
0021     raw_input = input
0022 
0023 import datetime
0024 import sys
0025 import time
0026 
0027 import unittest2 as unittest  # noqa F401
0028 
0029 # from nose.tools import assert_equal
0030 from idds.common.utils import setup_logging
0031 from idds.core.authentication import OIDCAuthentication
0032 
0033 
0034 setup_logging(__name__)
0035 
0036 
0037 class TestAuthentication:
0038 
0039     def test_oidc_authentication(self):
0040         vo = "iamdev"
0041 
0042         oidc = OIDCAuthentication()
0043         allow_vos = oidc.get_allow_vos()
0044         print("allow_vos")
0045         print(allow_vos)
0046         assert vo in allow_vos
0047         auth_config = oidc.get_auth_config(vo)
0048         print("auth_config")
0049         print(auth_config)
0050         assert "vo" in auth_config
0051         assert auth_config["vo"] == vo
0052 
0053         endpoint_config = oidc.get_endpoint_config(auth_config)
0054         print("endpoint_config")
0055         print(endpoint_config)
0056         assert "token_endpoint" in endpoint_config
0057 
0058         status, sign_url = oidc.get_oidc_sign_url(vo)
0059         print("sign_url")
0060         print(sign_url)
0061         assert "user_code" in sign_url
0062         print(
0063             (
0064                 "Please go to {0} and sign in. "
0065                 "Waiting until authentication is completed"
0066             ).format(sign_url["verification_uri_complete"])
0067         )
0068 
0069         print("Ready to get ID token?")
0070         while True:
0071             sys.stdout.write("[y/n] \n")
0072             choice = raw_input().lower()
0073             if choice == "y":
0074                 break
0075             elif choice == "n":
0076                 print("aborted")
0077                 return
0078 
0079         if "interval" in sign_url:
0080             interval = sign_url["interval"]
0081         else:
0082             interval = 5
0083 
0084         if "expires_in" in sign_url:
0085             expires_in = sign_url["expires_in"]
0086         else:
0087             expires_in = 60
0088 
0089         token = None
0090         start_time = datetime.datetime.utcnow()
0091         while datetime.datetime.utcnow() - start_time < datetime.timedelta(
0092             seconds=expires_in
0093         ):
0094             try:
0095                 status, output = oidc.get_id_token(vo, sign_url["device_code"])
0096                 if status:
0097                     # print(output)
0098                     token = output
0099                     break
0100                 else:
0101                     if (
0102                         type(output) in [dict]
0103                         and "error" in output
0104                         and output["error"] == "authorization_pending"
0105                     ):
0106                         time.sleep(interval)
0107                     else:
0108                         print(output)
0109                         break
0110             except Exception as error:
0111                 print(error)
0112                 break
0113 
0114         if not token:
0115             print("Failed to get a token")
0116         else:
0117             print(token)
0118             assert "id_token" in token
0119 
0120             status, new_token = oidc.refresh_id_token(vo, token["refresh_token"])
0121             # print(new_token)
0122             assert "id_token" in new_token
0123 
0124             print("verifying the token")
0125             status, decoded_token, username = oidc.verify_id_token(
0126                 vo, token["id_token"]
0127             )
0128             if not status:
0129                 print("Failed to verify the token: %s" % decoded_token)
0130             else:
0131                 print(username)
0132                 print(decoded_token)
0133 
0134 
0135 if __name__ == "__main__":
0136     test = TestAuthentication()
0137     test.test_oidc_authentication()