File indexing completed on 2026-04-09 07:58:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 from traceback import format_exc
0013
0014 from flask import Blueprint
0015
0016 from idds.common import exceptions
0017 from idds.common.authentication import authenticate_is_super_user
0018 from idds.common.constants import (HTTP_STATUS_CODE, MessageType, MessageStatus,
0019 MessageSource, MessageDestination,
0020 CommandType, RequestStatus)
0021 from idds.common.utils import json_loads
0022 from idds.core.commands import add_command
0023 from idds.core.requests import get_requests
0024 from idds.core.messages import add_message, retrieve_messages
0025 from idds.rest.v1.controller import IDDSController
0026
0027
0028 class Message(IDDSController):
0029 """ Get message """
0030
0031 def __init__(self, *args, **kwargs):
0032 super().__init__(*args, **kwargs)
0033 self.logger = self.setup_logger()
0034
0035 def get(self, request_id, workload_id, transform_id, internal_id):
0036 """ Get messages with given id.
0037 HTTP Success:
0038 200 OK
0039 HTTP Error:
0040 404 Not Found
0041 500 InternalError
0042 :returns: dictionary of an request.
0043 """
0044
0045 try:
0046 if request_id == 'null':
0047 request_id = None
0048 if workload_id == 'null':
0049 workload_id = None
0050 if transform_id == 'null':
0051 transform_id = None
0052 if internal_id == 'null':
0053 internal_id = None
0054
0055 if request_id is None:
0056 raise Exception("request_id should not be None")
0057 except Exception as error:
0058 print(error)
0059 print(format_exc())
0060 return self.generate_http_response(HTTP_STATUS_CODE.BadRequest, exc_cls=exceptions.BadRequest.__name__, exc_msg=str(error))
0061
0062 try:
0063 username = self.get_username()
0064 reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True)
0065 for req in reqs:
0066 if req['username'] and req['username'] != username and not authenticate_is_super_user(username):
0067 raise exceptions.AuthenticationNoPermission("User %s has no permission to get messages from request %s" % (username, req['request_id']))
0068 except exceptions.AuthenticationNoPermission as error:
0069 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0070 except Exception as error:
0071 print(error)
0072 print(format_exc())
0073 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0074
0075 try:
0076 msgs = retrieve_messages(request_id=request_id, workload_id=workload_id, transform_id=transform_id, internal_id=internal_id)
0077 rets = []
0078 for msg in msgs:
0079 msg_content = msg['msg_content']
0080 if type(msg_content) in (list, tuple):
0081 for msg_content_item in msg_content:
0082 rets.append(msg_content_item)
0083 else:
0084 rets.append(msg_content)
0085 except exceptions.NoObject as error:
0086 return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error)
0087 except exceptions.IDDSException as error:
0088 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0089 except Exception as error:
0090 print(error)
0091 print(format_exc())
0092 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0093
0094 return self.generate_http_response(HTTP_STATUS_CODE.OK, data=rets)
0095
0096 def post(self, request_id, workload_id, transform_id, internal_id):
0097 """ Create Request.
0098 HTTP Success:
0099 200 OK
0100 HTTP Error:
0101 400 Bad request
0102 500 Internal Error
0103 """
0104 try:
0105 if request_id == 'null':
0106 request_id = None
0107 if workload_id == 'null':
0108 workload_id = None
0109 if transform_id == 'null':
0110 transform_id = None
0111 if internal_id == 'null':
0112 internal_id = None
0113 if request_id is None:
0114 raise Exception("request_id should not be None")
0115 except Exception as error:
0116 print(error)
0117 print(format_exc())
0118 return self.generate_http_response(HTTP_STATUS_CODE.BadRequest, exc_cls=exceptions.BadRequest.__name__, exc_msg=str(error))
0119
0120 try:
0121 username = self.get_username()
0122 reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True)
0123 for req in reqs:
0124 if req['username'] and req['username'] != username and not authenticate_is_super_user(username):
0125 raise exceptions.AuthenticationNoPermission("User %s has no permission to send messages to request %s" % (username, req['request_id']))
0126 except exceptions.AuthenticationNoPermission as error:
0127 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0128 except Exception as error:
0129 print(error)
0130 print(format_exc())
0131 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0132
0133 try:
0134 msg = self.get_request().data and json_loads(self.get_request().data)
0135 if type(msg) in (list, tuple) and type(msg[0]) in [dict] and 'headers' and msg[0] and 'channel' in msg[0]['headers'] and msg[0]['headers']['channel'] == 'asyncresult':
0136 for msg_item in msg:
0137 add_message(msg_type=MessageType.AsyncResult,
0138 status=MessageStatus.New,
0139 destination=MessageDestination.AsyncResult,
0140 source=MessageSource.Rest,
0141 request_id=request_id,
0142 workload_id=workload_id,
0143 transform_id=transform_id,
0144 internal_id=internal_id,
0145 num_contents=1,
0146 msg_content=msg_item)
0147 elif 'command' in msg and msg['command'] in ['update_request', 'update_processing']:
0148 status = msg['parameters']['status']
0149 if status in [RequestStatus.ToCancel, RequestStatus.ToSuspend]:
0150 add_command(request_id=request_id, cmd_type=CommandType.AbortRequest,
0151 cmd_content=None)
0152 elif status in [RequestStatus.ToResume]:
0153 add_command(request_id=request_id, cmd_type=CommandType.ResumeRequest,
0154 cmd_content=None)
0155 else:
0156 add_message(msg_type=MessageType.IDDSCommunication,
0157 status=MessageStatus.New,
0158 destination=MessageDestination.Clerk,
0159 source=MessageSource.Rest,
0160 request_id=request_id,
0161 workload_id=workload_id,
0162 transform_id=None,
0163 num_contents=1,
0164 msg_content=msg)
0165
0166 except exceptions.DuplicatedObject as error:
0167 return self.generate_http_response(HTTP_STATUS_CODE.Conflict, exc_cls=error.__class__.__name__, exc_msg=error)
0168 except exceptions.IDDSException as error:
0169 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
0170 except Exception as error:
0171 print(error)
0172 print(format_exc())
0173 return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)
0174
0175 return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'request_id': request_id})
0176
0177 def post_test(self):
0178 import pprint
0179 pprint.pprint(self.get_request())
0180 pprint.pprint(self.get_request().endpoint)
0181 pprint.pprint(self.get_request().url_rule)
0182
0183
0184 """----------------------
0185 Web service url maps
0186 ----------------------"""
0187
0188
0189 def get_blueprint():
0190 bp = Blueprint('message', __name__)
0191
0192 view = Message.as_view('message')
0193 bp.add_url_rule('/message/<request_id>/<workload_id>/<transform_id>/<internal_id>', view_func=view, methods=['get', 'post'])
0194 return bp