Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:04

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Wen Guan, wen.guan@cern.ch, 2017
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2021
0009 
0010 import logging
0011 import os
0012 import threading
0013 import time
0014 import traceback
0015 
0016 from pilot.common.exception import PilotException, MessageFailure
0017 
0018 
0019 logger = logging.getLogger(__name__)
0020 
0021 
0022 class MessageThread(threading.Thread):
0023     """
0024     A thread to receive messages from payload and put recevied messages to the out queues.
0025     """
0026 
0027     def __init__(self, message_queue, socket_name=None, context='local', **kwds):
0028         """
0029         Initialize yampl server socket.
0030 
0031         :param message_queue: a queue to transfer messages between current instance and ESProcess.
0032         :param socket_name: name of the socket between current process and payload.
0033         :param context: name of the context between current process and payload, default is 'local'.
0034         :param **kwds: other parameters.
0035 
0036         :raises MessageFailure: when failed to setup message socket.
0037         """
0038 
0039         threading.Thread.__init__(self, **kwds)
0040         self.setName("MessageThread")
0041         self.__message_queue = message_queue
0042         self._socket_name = socket_name
0043         self.__stop = threading.Event()
0044 
0045         logger.info('try to import yampl')
0046         try:
0047             import yampl
0048         except Exception as e:
0049             raise MessageFailure("Failed to import yampl: %s" % e)
0050         logger.info('finished to import yampl')
0051 
0052         logger.info('start to setup yampl server socket.')
0053         try:
0054             if self._socket_name is None or len(self._socket_name) == 0:
0055                 self._socket_name = 'EventService_EventRanges_' + str(os.getpid())
0056             self.__message_server = yampl.ServerSocket(self._socket_name, context)
0057         except Exception as e:
0058             raise MessageFailure("Failed to setup yampl server socket: %s %s" % (e, traceback.print_exc()))
0059         logger.info('finished to setup yampl server socket(socket_name: %s, context:%s).' % (self._socket_name, context))
0060 
0061     def get_yampl_socket_name(self):
0062         return self._socket_name
0063 
0064     def send(self, message):
0065         """
0066         Send messages to payload through yampl server socket.
0067 
0068         :param message: String of the message.
0069 
0070         :raises MessageFailure: When failed to send a message to the payload.
0071         """
0072         logger.debug('Send a message to yampl: %s' % message)
0073         try:
0074             if not self.__message_server:
0075                 raise MessageFailure("No message server.")
0076             self.__message_server.send_raw(message.encode('utf8'))  # Python 2 and 3
0077         except Exception as e:
0078             raise MessageFailure(e)
0079 
0080     def stop(self):
0081         """
0082         Set stop event.
0083         """
0084         logger.debug('set stop event')
0085         self.__stop.set()
0086 
0087     def is_stopped(self):
0088         """
0089         Get status whether stop event is set.
0090 
0091         :returns: True if stop event is set, otherwise False.
0092         """
0093         return self.__stop.is_set()
0094 
0095     def terminate(self):
0096         """
0097         Terminate message server.
0098         """
0099         if self.__message_server:
0100             logger.info("Terminating message server.")
0101             del self.__message_server
0102             self.__message_server = None
0103 
0104     def run(self):
0105         """
0106         Main thread loop to poll messages from payload and
0107         put received into message queue for other processes to fetch.
0108         """
0109         logger.info('Message thread starts to run.')
0110         try:
0111             while True:
0112                 if self.is_stopped():
0113                     self.terminate()
0114                     break
0115                 if not self.__message_server:
0116                     raise MessageFailure("No message server.")
0117 
0118                 size, buf = self.__message_server.try_recv_raw()
0119                 if size == -1:
0120                     time.sleep(0.01)
0121                 else:
0122                     self.__message_queue.put(buf.decode('utf8'))  # Python 2 and 3
0123         except PilotException as e:
0124             self.terminate()
0125             logger.error("Pilot Exception: Message thread got an exception, will finish: %s, %s" % (e.get_detail(), traceback.format_exc()))
0126             # raise e
0127         except Exception as e:
0128             self.terminate()
0129             logger.error("Message thread got an exception, will finish: %s" % str(e))
0130             # raise MessageFailure(e)
0131         self.terminate()
0132         logger.info('Message thread finished.')