Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:20

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024
0010 
0011 
0012 from traceback import format_exc
0013 
0014 from flask import Blueprint
0015 
0016 from idds.common import exceptions
0017 from idds.common.authentication import authenticate_is_super_user
0018 from idds.common.constants import (HTTP_STATUS_CODE, MessageType, MessageStatus,
0019                                    MessageSource, MessageDestination,
0020                                    CommandType, RequestStatus)
0021 from idds.common.utils import json_loads
0022 from idds.core.commands import add_command
0023 from idds.core.requests import get_requests
0024 from idds.core.messages import add_message, retrieve_messages
0025 from idds.rest.v1.controller import IDDSController
0026 
0027 
0028 class Message(IDDSController):
0029     """ Get message """
0030 
0031     def __init__(self, *args, **kwargs):
0032         super().__init__(*args, **kwargs)
0033         self.logger = self.setup_logger()
0034 
0035     def get(self, request_id, workload_id, transform_id, internal_id):
0036         """ Get messages with given id.
0037         HTTP Success:
0038             200 OK
0039         HTTP Error:
0040             404 Not Found
0041             500 InternalError
0042         :returns: dictionary of an request.
0043         """
0044 
0045         try:
0046             if request_id == 'null':
0047                 request_id = None
0048             if workload_id == 'null':
0049                 workload_id = None
0050             if transform_id == 'null':
0051                 transform_id = None
0052             if internal_id == 'null':
0053                 internal_id = None
0054 
0055             if request_id is None:
0056                 raise Exception("request_id should not be None")
0057         except Exception as error:
0058             print(error)
0059             print(format_exc())
0060             return self.generate_http_response(HTTP_STATUS_CODE.BadRequest, exc_cls=exceptions.BadRequest.__name__, exc_msg=str(error))
0061 
0062         try:
0063             username = self.get_username()
0064             reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True)
0065             for req in reqs:
0066                 if req['username'] and req['username'] != username and not authenticate_is_super_user(username):
0067                     raise exceptions.AuthenticationNoPermission("User %s has no permission to get messages from request %s" % (username, req['request_id']))
0068         except exceptions.AuthenticationNoPermission as error:
0069             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0070         except Exception as error:
0071             print(error)
0072             print(format_exc())
0073             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0074 
0075         try:
0076             msgs = retrieve_messages(request_id=request_id, workload_id=workload_id, transform_id=transform_id, internal_id=internal_id)
0077             rets = []
0078             for msg in msgs:
0079                 msg_content = msg['msg_content']
0080                 if type(msg_content) in (list, tuple):
0081                     for msg_content_item in msg_content:
0082                         rets.append(msg_content_item)
0083                 else:
0084                     rets.append(msg_content)
0085         except exceptions.NoObject as error:
0086             return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error)
0087         except exceptions.IDDSException as error:
0088             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0089         except Exception as error:
0090             print(error)
0091             print(format_exc())
0092             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0093 
0094         return self.generate_http_response(HTTP_STATUS_CODE.OK, data=rets)
0095 
0096     def post(self, request_id, workload_id, transform_id, internal_id):
0097         """ Create Request.
0098         HTTP Success:
0099             200 OK
0100         HTTP Error:
0101             400 Bad request
0102             500 Internal Error
0103         """
0104         try:
0105             if request_id == 'null':
0106                 request_id = None
0107             if workload_id == 'null':
0108                 workload_id = None
0109             if transform_id == 'null':
0110                 transform_id = None
0111             if internal_id == 'null':
0112                 internal_id = None
0113             if request_id is None:
0114                 raise Exception("request_id should not be None")
0115         except Exception as error:
0116             print(error)
0117             print(format_exc())
0118             return self.generate_http_response(HTTP_STATUS_CODE.BadRequest, exc_cls=exceptions.BadRequest.__name__, exc_msg=str(error))
0119 
0120         try:
0121             username = self.get_username()
0122             reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True)
0123             for req in reqs:
0124                 if req['username'] and req['username'] != username and not authenticate_is_super_user(username):
0125                     raise exceptions.AuthenticationNoPermission("User %s has no permission to send messages to request %s" % (username, req['request_id']))
0126         except exceptions.AuthenticationNoPermission as error:
0127             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0128         except Exception as error:
0129             print(error)
0130             print(format_exc())
0131             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0132 
0133         try:
0134             msg = self.get_request().data and json_loads(self.get_request().data)
0135             if type(msg) in (list, tuple) and type(msg[0]) in [dict] and 'headers' and msg[0] and 'channel' in msg[0]['headers'] and msg[0]['headers']['channel'] == 'asyncresult':
0136                 for msg_item in msg:
0137                     add_message(msg_type=MessageType.AsyncResult,
0138                                 status=MessageStatus.New,
0139                                 destination=MessageDestination.AsyncResult,
0140                                 source=MessageSource.Rest,
0141                                 request_id=request_id,
0142                                 workload_id=workload_id,
0143                                 transform_id=transform_id,
0144                                 internal_id=internal_id,
0145                                 num_contents=1,
0146                                 msg_content=msg_item)
0147             elif 'command' in msg and msg['command'] in ['update_request', 'update_processing']:
0148                 status = msg['parameters']['status']
0149                 if status in [RequestStatus.ToCancel, RequestStatus.ToSuspend]:
0150                     add_command(request_id=request_id, cmd_type=CommandType.AbortRequest,
0151                                 cmd_content=None)
0152                 elif status in [RequestStatus.ToResume]:
0153                     add_command(request_id=request_id, cmd_type=CommandType.ResumeRequest,
0154                                 cmd_content=None)
0155             else:
0156                 add_message(msg_type=MessageType.IDDSCommunication,
0157                             status=MessageStatus.New,
0158                             destination=MessageDestination.Clerk,
0159                             source=MessageSource.Rest,
0160                             request_id=request_id,
0161                             workload_id=workload_id,
0162                             transform_id=None,
0163                             num_contents=1,
0164                             msg_content=msg)
0165 
0166         except exceptions.DuplicatedObject as error:
0167             return self.generate_http_response(HTTP_STATUS_CODE.Conflict, exc_cls=error.__class__.__name__, exc_msg=error)
0168         except exceptions.IDDSException as error:
0169             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0170         except Exception as error:
0171             print(error)
0172             print(format_exc())
0173             return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0174 
0175         return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'request_id': request_id})
0176 
0177     def post_test(self):
0178         import pprint
0179         pprint.pprint(self.get_request())
0180         pprint.pprint(self.get_request().endpoint)
0181         pprint.pprint(self.get_request().url_rule)
0182 
0183 
0184 """----------------------
0185    Web service url maps
0186 ----------------------"""
0187 
0188 
0189 def get_blueprint():
0190     bp = Blueprint('message', __name__)
0191 
0192     view = Message.as_view('message')
0193     bp.add_url_rule('/message/<request_id>/<workload_id>/<transform_id>/<internal_id>', view_func=view, methods=['get', 'post'])
0194     return bp