File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import json
0012 import traceback
0013
0014 from flask import Blueprint
0015
0016 from idds.common import exceptions
0017 from idds.common.constants import HTTP_STATUS_CODE
0018 from idds.core.authentication import OIDCAuthentication
0019 from idds.rest.v1.controller import IDDSController
0020
0021
0022 class OIDCAuthenticationSignURL(IDDSController):
0023 """ OIDCAuthentication Sign URL"""
0024
0025 def get(self, vo, auth_type='oidc'):
0026 """ Get sign url for user to approve.
0027 HTTP Success:
0028 200 OK
0029 HTTP Error:
0030 404 Not Found
0031 500 InternalError
0032 :returns: dictionary with sign url.
0033 """
0034
0035 try:
0036 if auth_type == 'oidc':
0037 oidc = OIDCAuthentication()
0038 status, sign_url = oidc.get_oidc_sign_url(vo)
0039 if status:
0040 rets = sign_url
0041 return self.generate_http_response(HTTP_STATUS_CODE.OK, data=rets)
0042 else:
0043 raise exceptions.IDDSException("Failed to get oidc sign url: %s" % str(sign_url))
0044 else:
0045 raise exceptions.AuthenticationNotSupported("auth_type %s is not supported to call this function." % str(auth_type))
0046 except exceptions.NoObject as error:
0047 return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error)
0048 except exceptions.IDDSException as error:
0049 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0050 except Exception as error:
0051 print(error)
0052 print(traceback.format_exc())
0053 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0054
0055
0056 class OIDCAuthenticationToken(IDDSController):
0057 """ OIDCAuthentication Token"""
0058
0059 def get(self, vo, device_code, interval=5, expires_in=60):
0060 """ Get id token.
0061 HTTP Success:
0062 200 OK
0063 HTTP Error:
0064 404 Not Found
0065 500 InternalError
0066 :returns: dictionary with sign url.
0067 """
0068
0069 try:
0070 oidc = OIDCAuthentication()
0071 status, id_token = oidc.get_id_token(vo, device_code, interval, expires_in)
0072 if status:
0073 return self.generate_http_response(HTTP_STATUS_CODE.OK, data=id_token)
0074 else:
0075 if 'error' in id_token and 'authorization_pending' in id_token['error']:
0076 raise exceptions.AuthenticationPending(str(id_token))
0077 else:
0078 raise exceptions.IDDSException("Failed to get oidc token: %s" % str(id_token))
0079 except exceptions.NoObject as error:
0080 return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error)
0081 except exceptions.IDDSException as error:
0082 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0083 except Exception as error:
0084 print(error)
0085 print(traceback.format_exc())
0086 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0087
0088 def post(self, vo):
0089 """ Refresh the token.
0090 HTTP Success:
0091 200 OK
0092 HTTP Error:
0093 400 Bad request
0094 500 Internal Error
0095 """
0096 try:
0097 parameters = self.get_request().data and json.loads(self.get_request().data)
0098 refresh_token = parameters['refresh_token']
0099
0100 oidc = OIDCAuthentication()
0101 status, id_token = oidc.refresh_id_token(vo, refresh_token)
0102 if status:
0103 return self.generate_http_response(HTTP_STATUS_CODE.OK, data=id_token)
0104 else:
0105 raise exceptions.IDDSException("Failed to refresh oidc token: %s" % str(id_token))
0106 except exceptions.NoObject as error:
0107 return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error)
0108 except exceptions.IDDSException as error:
0109 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0110 except Exception as error:
0111 print(error)
0112 print(traceback.format_exc())
0113 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0114
0115 def post_test(self):
0116 import pprint
0117 pprint.pprint(self.get_request())
0118 pprint.pprint(self.get_request().endpoint)
0119 pprint.pprint(self.get_request().url_rule)
0120
0121
0122 """----------------------
0123 Web service url maps
0124 ----------------------"""
0125
0126
0127 def get_blueprint():
0128 bp = Blueprint('auth', __name__)
0129
0130 url_view = OIDCAuthenticationSignURL.as_view('url')
0131 bp.add_url_rule('/auth/url/<vo>', view_func=url_view, methods=['get'])
0132 bp.add_url_rule('/auth/url/<vo>/<auth_type>', view_func=url_view, methods=['get'])
0133
0134 token_view = OIDCAuthenticationToken.as_view('token')
0135 bp.add_url_rule('/auth/token/<vo>/<device_code>', view_func=token_view, methods=['get'])
0136 bp.add_url_rule('/auth/token/<vo>/<device_code>/<interval>', view_func=token_view, methods=['get'])
0137 bp.add_url_rule('/auth/token/<vo>/<device_code>/<interval>/<expires_in>', view_func=token_view, methods=['get'])
0138 bp.add_url_rule('/auth/token/<vo>', view_func=token_view, methods=['post'])
0139 return bp