Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2024
0010 
0011 
0012 """
0013 operations related to Messages.
0014 """
0015 
0016 import datetime
0017 import threading
0018 
0019 from idds.common.constants import MessageDestination, MessageType, MessageStatus
0020 from idds.orm.base.session import read_session, transactional_session
0021 from idds.orm import messages as orm_messages
0022 
0023 
0024 @transactional_session
0025 def add_message(msg_type, status, source, request_id, workload_id, transform_id,
0026                 num_contents, msg_content, internal_id=None, bulk_size=None, processing_id=0,
0027                 destination=MessageDestination.Outside, session=None):
0028     """
0029     Add a message to be submitted asynchronously to a message broker.
0030 
0031     :param msg_type: The type of the msg as a number, e.g., finished_stagein.
0032     :param status: The status about the message
0033     :param source: The source where the message is from.
0034     :param msg_content: The message msg_content as JSON.
0035     :param session: The database session.
0036     """
0037     return orm_messages.add_message(msg_type=msg_type, status=status, source=source,
0038                                     request_id=request_id, workload_id=workload_id,
0039                                     transform_id=transform_id, num_contents=num_contents,
0040                                     destination=destination, processing_id=processing_id,
0041                                     internal_id=internal_id, bulk_size=bulk_size,
0042                                     msg_content=msg_content, session=session)
0043 
0044 
0045 @transactional_session
0046 def add_messages(messages, bulk_size=1000, session=None):
0047     return orm_messages.add_messages(messages, bulk_size=bulk_size, session=session)
0048 
0049 
0050 @transactional_session
0051 def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=None,
0052                       source=None, request_id=None, workload_id=None, transform_id=None,
0053                       processing_id=None, use_poll_period=False, retries=None, delay=None,
0054                       min_request_id=None, fetching_id=None, internal_id=None,
0055                       record_fetched=False, record_fetched_status=MessageStatus.Fetched,
0056                       session=None):
0057     """
0058     Retrieve up to $bulk messages.
0059 
0060     :param bulk: Number of messages as an integer.
0061     :param msg_type: Return only specified msg_type.
0062     :param status: The status about the message
0063     :param source: The source where the message is from.
0064     :param session: The database session.
0065 
0066     :returns messages: List of dictionaries
0067     """
0068     if fetching_id is None:
0069         hb_thread = threading.current_thread()
0070         fetching_id = hb_thread.ident
0071 
0072     messages = orm_messages.retrieve_messages(bulk_size=bulk_size, msg_type=msg_type,
0073                                               status=status, source=source, destination=destination,
0074                                               request_id=request_id, workload_id=workload_id,
0075                                               transform_id=transform_id, processing_id=processing_id,
0076                                               retries=retries, delay=delay, fetching_id=fetching_id,
0077                                               min_request_id=min_request_id, internal_id=internal_id,
0078                                               use_poll_period=use_poll_period, session=session)
0079     if record_fetched:
0080         to_updates = []
0081         for msg in messages:
0082             to_update = {'msg_id': msg['msg_id'],
0083                          'request_id': msg['request_id'],
0084                          'poll_period': datetime.timedelta(seconds=delay),
0085                          'status': record_fetched_status}
0086             to_updates.append(to_update)
0087         if to_updates:
0088             orm_messages.update_messages(to_updates, min_request_id=min_request_id, session=session)
0089     return messages
0090 
0091 
0092 @read_session
0093 def retrieve_request_messages(request_id, bulk_size=1, session=None):
0094     return retrieve_messages(request_id=request_id,
0095                              msg_type=MessageType.IDDSCommunication,
0096                              status=MessageStatus.New,
0097                              bulk_size=bulk_size,
0098                              destination=MessageDestination.Clerk,
0099                              session=session)
0100 
0101 
0102 @read_session
0103 def retrieve_transform_messages(request_id, transform_id, bulk_size=1, session=None):
0104     return retrieve_messages(request_id=request_id,
0105                              transform_id=transform_id,
0106                              msg_type=MessageType.IDDSCommunication,
0107                              status=MessageStatus.New,
0108                              bulk_size=bulk_size,
0109                              destination=MessageDestination.Transformer,
0110                              session=session)
0111 
0112 
0113 @read_session
0114 def retrieve_processing_messages(request_id, processing_id, bulk_size=1, session=None):
0115     return retrieve_messages(request_id=request_id,
0116                              processing_id=processing_id,
0117                              msg_type=MessageType.IDDSCommunication,
0118                              status=MessageStatus.New,
0119                              bulk_size=bulk_size,
0120                              destination=MessageDestination.Carrier,
0121                              session=session)
0122 
0123 
0124 @transactional_session
0125 def delete_messages(messages, session=None):
0126     """
0127     Delete all messages with the given IDs.
0128 
0129     :param messages: The messages to delete as a list of dictionaries.
0130     """
0131     return orm_messages.delete_messages(messages=messages, session=session)
0132 
0133 
0134 @transactional_session
0135 def update_messages(messages, min_request_id=None, session=None):
0136     """
0137     Update all messages status with the given IDs.
0138 
0139     :param messages: The messages to be updated as a list of dictionaries.
0140     """
0141     return orm_messages.update_messages(messages=messages, min_request_id=min_request_id, session=session)
0142 
0143 
0144 @transactional_session
0145 def clean_old_messages(request_id, session=None):
0146     """
0147     Delete messages whose request id is older than request_id.
0148 
0149     :param request_id: request id..
0150     """
0151     return orm_messages.clean_old_messages(request_id=request_id, session=session)