Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2021
0010 
0011 import json
0012 import traceback
0013 
0014 from flask import Blueprint
0015 
0016 from idds.common import exceptions
0017 from idds.common.constants import HTTP_STATUS_CODE
0018 from idds.core.authentication import OIDCAuthentication
0019 from idds.rest.v1.controller import IDDSController
0020 
0021 
0022 class OIDCAuthenticationSignURL(IDDSController):
0023     """ OIDCAuthentication Sign URL"""
0024 
0025     def get(self, vo, auth_type='oidc'):
0026         """ Get sign url for user to approve.
0027         HTTP Success:
0028             200 OK
0029         HTTP Error:
0030             404 Not Found
0031             500 InternalError
0032         :returns: dictionary with sign url.
0033         """
0034 
0035         try:
0036             if auth_type == 'oidc':
0037                 oidc = OIDCAuthentication()
0038                 status, sign_url = oidc.get_oidc_sign_url(vo)
0039                 if status:
0040                     rets = sign_url
0041                     return self.generate_http_response(HTTP_STATUS_CODE.OK, data=rets)
0042                 else:
0043                     raise exceptions.IDDSException("Failed to get oidc sign url: %s" % str(sign_url))
0044             else:
0045                 raise exceptions.AuthenticationNotSupported("auth_type %s is not supported to call this function." % str(auth_type))
0046         except exceptions.NoObject as error:
0047             return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error)
0048         except exceptions.IDDSException as error:
0049             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0050         except Exception as error:
0051             print(error)
0052             print(traceback.format_exc())
0053             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0054 
0055 
0056 class OIDCAuthenticationToken(IDDSController):
0057     """ OIDCAuthentication Token"""
0058 
0059     def get(self, vo, device_code, interval=5, expires_in=60):
0060         """ Get id token.
0061         HTTP Success:
0062             200 OK
0063         HTTP Error:
0064             404 Not Found
0065             500 InternalError
0066         :returns: dictionary with sign url.
0067         """
0068 
0069         try:
0070             oidc = OIDCAuthentication()
0071             status, id_token = oidc.get_id_token(vo, device_code, interval, expires_in)
0072             if status:
0073                 return self.generate_http_response(HTTP_STATUS_CODE.OK, data=id_token)
0074             else:
0075                 if 'error' in id_token and 'authorization_pending' in id_token['error']:
0076                     raise exceptions.AuthenticationPending(str(id_token))
0077                 else:
0078                     raise exceptions.IDDSException("Failed to get oidc token: %s" % str(id_token))
0079         except exceptions.NoObject as error:
0080             return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error)
0081         except exceptions.IDDSException as error:
0082             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0083         except Exception as error:
0084             print(error)
0085             print(traceback.format_exc())
0086             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0087 
0088     def post(self, vo):
0089         """ Refresh the token.
0090         HTTP Success:
0091             200 OK
0092         HTTP Error:
0093             400 Bad request
0094             500 Internal Error
0095         """
0096         try:
0097             parameters = self.get_request().data and json.loads(self.get_request().data)
0098             refresh_token = parameters['refresh_token']
0099 
0100             oidc = OIDCAuthentication()
0101             status, id_token = oidc.refresh_id_token(vo, refresh_token)
0102             if status:
0103                 return self.generate_http_response(HTTP_STATUS_CODE.OK, data=id_token)
0104             else:
0105                 raise exceptions.IDDSException("Failed to refresh oidc token: %s" % str(id_token))
0106         except exceptions.NoObject as error:
0107             return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error)
0108         except exceptions.IDDSException as error:
0109             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0110         except Exception as error:
0111             print(error)
0112             print(traceback.format_exc())
0113             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0114 
0115     def post_test(self):
0116         import pprint
0117         pprint.pprint(self.get_request())
0118         pprint.pprint(self.get_request().endpoint)
0119         pprint.pprint(self.get_request().url_rule)
0120 
0121 
0122 """----------------------
0123    Web service url maps
0124 ----------------------"""
0125 
0126 
0127 def get_blueprint():
0128     bp = Blueprint('auth', __name__)
0129 
0130     url_view = OIDCAuthenticationSignURL.as_view('url')
0131     bp.add_url_rule('/auth/url/<vo>', view_func=url_view, methods=['get'])
0132     bp.add_url_rule('/auth/url/<vo>/<auth_type>', view_func=url_view, methods=['get'])
0133 
0134     token_view = OIDCAuthenticationToken.as_view('token')
0135     bp.add_url_rule('/auth/token/<vo>/<device_code>', view_func=token_view, methods=['get'])
0136     bp.add_url_rule('/auth/token/<vo>/<device_code>/<interval>', view_func=token_view, methods=['get'])
0137     bp.add_url_rule('/auth/token/<vo>/<device_code>/<interval>/<expires_in>', view_func=token_view, methods=['get'])
0138     bp.add_url_rule('/auth/token/<vo>', view_func=token_view, methods=['post'])
0139     return bp