File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Messages.
0014 """
0015
0016 import datetime
0017 import threading
0018
0019 from idds.common.constants import MessageDestination, MessageType, MessageStatus
0020 from idds.orm.base.session import read_session, transactional_session
0021 from idds.orm import messages as orm_messages
0022
0023
0024 @transactional_session
0025 def add_message(msg_type, status, source, request_id, workload_id, transform_id,
0026 num_contents, msg_content, internal_id=None, bulk_size=None, processing_id=0,
0027 destination=MessageDestination.Outside, session=None):
0028 """
0029 Add a message to be submitted asynchronously to a message broker.
0030
0031 :param msg_type: The type of the msg as a number, e.g., finished_stagein.
0032 :param status: The status about the message
0033 :param source: The source where the message is from.
0034 :param msg_content: The message msg_content as JSON.
0035 :param session: The database session.
0036 """
0037 return orm_messages.add_message(msg_type=msg_type, status=status, source=source,
0038 request_id=request_id, workload_id=workload_id,
0039 transform_id=transform_id, num_contents=num_contents,
0040 destination=destination, processing_id=processing_id,
0041 internal_id=internal_id, bulk_size=bulk_size,
0042 msg_content=msg_content, session=session)
0043
0044
0045 @transactional_session
0046 def add_messages(messages, bulk_size=1000, session=None):
0047 return orm_messages.add_messages(messages, bulk_size=bulk_size, session=session)
0048
0049
0050 @transactional_session
0051 def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=None,
0052 source=None, request_id=None, workload_id=None, transform_id=None,
0053 processing_id=None, use_poll_period=False, retries=None, delay=None,
0054 min_request_id=None, fetching_id=None, internal_id=None,
0055 record_fetched=False, record_fetched_status=MessageStatus.Fetched,
0056 session=None):
0057 """
0058 Retrieve up to $bulk messages.
0059
0060 :param bulk: Number of messages as an integer.
0061 :param msg_type: Return only specified msg_type.
0062 :param status: The status about the message
0063 :param source: The source where the message is from.
0064 :param session: The database session.
0065
0066 :returns messages: List of dictionaries
0067 """
0068 if fetching_id is None:
0069 hb_thread = threading.current_thread()
0070 fetching_id = hb_thread.ident
0071
0072 messages = orm_messages.retrieve_messages(bulk_size=bulk_size, msg_type=msg_type,
0073 status=status, source=source, destination=destination,
0074 request_id=request_id, workload_id=workload_id,
0075 transform_id=transform_id, processing_id=processing_id,
0076 retries=retries, delay=delay, fetching_id=fetching_id,
0077 min_request_id=min_request_id, internal_id=internal_id,
0078 use_poll_period=use_poll_period, session=session)
0079 if record_fetched:
0080 to_updates = []
0081 for msg in messages:
0082 to_update = {'msg_id': msg['msg_id'],
0083 'request_id': msg['request_id'],
0084 'poll_period': datetime.timedelta(seconds=delay),
0085 'status': record_fetched_status}
0086 to_updates.append(to_update)
0087 if to_updates:
0088 orm_messages.update_messages(to_updates, min_request_id=min_request_id, session=session)
0089 return messages
0090
0091
0092 @read_session
0093 def retrieve_request_messages(request_id, bulk_size=1, session=None):
0094 return retrieve_messages(request_id=request_id,
0095 msg_type=MessageType.IDDSCommunication,
0096 status=MessageStatus.New,
0097 bulk_size=bulk_size,
0098 destination=MessageDestination.Clerk,
0099 session=session)
0100
0101
0102 @read_session
0103 def retrieve_transform_messages(request_id, transform_id, bulk_size=1, session=None):
0104 return retrieve_messages(request_id=request_id,
0105 transform_id=transform_id,
0106 msg_type=MessageType.IDDSCommunication,
0107 status=MessageStatus.New,
0108 bulk_size=bulk_size,
0109 destination=MessageDestination.Transformer,
0110 session=session)
0111
0112
0113 @read_session
0114 def retrieve_processing_messages(request_id, processing_id, bulk_size=1, session=None):
0115 return retrieve_messages(request_id=request_id,
0116 processing_id=processing_id,
0117 msg_type=MessageType.IDDSCommunication,
0118 status=MessageStatus.New,
0119 bulk_size=bulk_size,
0120 destination=MessageDestination.Carrier,
0121 session=session)
0122
0123
0124 @transactional_session
0125 def delete_messages(messages, session=None):
0126 """
0127 Delete all messages with the given IDs.
0128
0129 :param messages: The messages to delete as a list of dictionaries.
0130 """
0131 return orm_messages.delete_messages(messages=messages, session=session)
0132
0133
0134 @transactional_session
0135 def update_messages(messages, min_request_id=None, session=None):
0136 """
0137 Update all messages status with the given IDs.
0138
0139 :param messages: The messages to be updated as a list of dictionaries.
0140 """
0141 return orm_messages.update_messages(messages=messages, min_request_id=min_request_id, session=session)
0142
0143
0144 @transactional_session
0145 def clean_old_messages(request_id, session=None):
0146 """
0147 Delete messages whose request id is older than request_id.
0148
0149 :param request_id: request id..
0150 """
0151 return orm_messages.clean_old_messages(request_id=request_id, session=session)