File indexing completed on 2026-04-11 08:41:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import logging
0011 import os
0012 import threading
0013 import time
0014 import traceback
0015
0016 from pilot.common.exception import PilotException, MessageFailure
0017
0018
0019 logger = logging.getLogger(__name__)
0020
0021
0022 class MessageThread(threading.Thread):
0023 """
0024 A thread to receive messages from payload and put recevied messages to the out queues.
0025 """
0026
0027 def __init__(self, message_queue, socket_name=None, context='local', **kwds):
0028 """
0029 Initialize yampl server socket.
0030
0031 :param message_queue: a queue to transfer messages between current instance and ESProcess.
0032 :param socket_name: name of the socket between current process and payload.
0033 :param context: name of the context between current process and payload, default is 'local'.
0034 :param **kwds: other parameters.
0035
0036 :raises MessageFailure: when failed to setup message socket.
0037 """
0038
0039 threading.Thread.__init__(self, **kwds)
0040 self.setName("MessageThread")
0041 self.__message_queue = message_queue
0042 self._socket_name = socket_name
0043 self.__stop = threading.Event()
0044
0045 logger.info('try to import yampl')
0046 try:
0047 import yampl
0048 except Exception as e:
0049 raise MessageFailure("Failed to import yampl: %s" % e)
0050 logger.info('finished to import yampl')
0051
0052 logger.info('start to setup yampl server socket.')
0053 try:
0054 if self._socket_name is None or len(self._socket_name) == 0:
0055 self._socket_name = 'EventService_EventRanges_' + str(os.getpid())
0056 self.__message_server = yampl.ServerSocket(self._socket_name, context)
0057 except Exception as e:
0058 raise MessageFailure("Failed to setup yampl server socket: %s %s" % (e, traceback.print_exc()))
0059 logger.info('finished to setup yampl server socket(socket_name: %s, context:%s).' % (self._socket_name, context))
0060
0061 def get_yampl_socket_name(self):
0062 return self._socket_name
0063
0064 def send(self, message):
0065 """
0066 Send messages to payload through yampl server socket.
0067
0068 :param message: String of the message.
0069
0070 :raises MessageFailure: When failed to send a message to the payload.
0071 """
0072 logger.debug('Send a message to yampl: %s' % message)
0073 try:
0074 if not self.__message_server:
0075 raise MessageFailure("No message server.")
0076 self.__message_server.send_raw(message.encode('utf8'))
0077 except Exception as e:
0078 raise MessageFailure(e)
0079
0080 def stop(self):
0081 """
0082 Set stop event.
0083 """
0084 logger.debug('set stop event')
0085 self.__stop.set()
0086
0087 def is_stopped(self):
0088 """
0089 Get status whether stop event is set.
0090
0091 :returns: True if stop event is set, otherwise False.
0092 """
0093 return self.__stop.is_set()
0094
0095 def terminate(self):
0096 """
0097 Terminate message server.
0098 """
0099 if self.__message_server:
0100 logger.info("Terminating message server.")
0101 del self.__message_server
0102 self.__message_server = None
0103
0104 def run(self):
0105 """
0106 Main thread loop to poll messages from payload and
0107 put received into message queue for other processes to fetch.
0108 """
0109 logger.info('Message thread starts to run.')
0110 try:
0111 while True:
0112 if self.is_stopped():
0113 self.terminate()
0114 break
0115 if not self.__message_server:
0116 raise MessageFailure("No message server.")
0117
0118 size, buf = self.__message_server.try_recv_raw()
0119 if size == -1:
0120 time.sleep(0.01)
0121 else:
0122 self.__message_queue.put(buf.decode('utf8'))
0123 except PilotException as e:
0124 self.terminate()
0125 logger.error("Pilot Exception: Message thread got an exception, will finish: %s, %s" % (e.get_detail(), traceback.format_exc()))
0126
0127 except Exception as e:
0128 self.terminate()
0129 logger.error("Message thread got an exception, will finish: %s" % str(e))
0130
0131 self.terminate()
0132 logger.info('Message thread finished.')