File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import time
0012 import traceback
0013 try:
0014
0015 from queue import Queue
0016 except ImportError:
0017
0018 from Queue import Queue
0019
0020 from idds.common.constants import (Sections, MessageStatus)
0021 from idds.common.exceptions import AgentPluginError, IDDSException
0022 from idds.common.utils import setup_logging
0023 from idds.core import messages as core_messages
0024 from idds.agents.common.baseagent import BaseAgent
0025
0026
0027 setup_logging(__name__)
0028
0029
0030 class Consumer(BaseAgent):
0031 """
0032 Consumer works to notify workload management that the data is available.
0033 """
0034
0035 def __init__(self, num_threads=1, retrieve_bulk_size=None, **kwargs):
0036 super(Consumer, self).__init__(num_threads=num_threads, **kwargs)
0037 self.config_section = Sections.Consumer
0038 self.retrieve_bulk_size = int(retrieve_bulk_size)
0039 self.message_queue = Queue()
0040
0041 def __del__(self):
0042 self.stop_receiver()
0043
0044 def get_messages(self):
0045 """
0046 Get messages
0047 """
0048 messages = core_messages.retrieve_messages(status=MessageStatus.New, bulk_size=self.retrieve_bulk_size)
0049
0050 self.logger.debug("Main thread get %s new messages" % len(messages))
0051 if messages:
0052 self.logger.info("Main thread get %s new messages" % len(messages))
0053
0054 return messages
0055
0056 def clean_messages(self, msgs):
0057
0058 to_updates = []
0059 for msg in msgs:
0060 to_updates.append({'msg_id': msg['msg_id'],
0061 'status': MessageStatus.Delivered})
0062 core_messages.update_messages(to_updates)
0063
0064 def start_receiver(self):
0065 if 'receiver' not in self.plugins:
0066 raise AgentPluginError('Plugin receiver is required')
0067 self.receiver = self.plugins['receiver']
0068
0069 self.logger.info("Starting receiver: %s" % self.receiver)
0070 self.receiver.set_request_queue(self.message_queue)
0071 self.receiver.start()
0072
0073 def stop_receiver(self):
0074 if hasattr(self, 'receiver') and self.receiver:
0075 self.logger.info("Stopping receiver: %s" % self.receiver)
0076 self.receiver.stop()
0077
0078 def run(self):
0079 """
0080 Main run function.
0081 """
0082 try:
0083 self.logger.info("Starting main thread")
0084 self.load_plugins()
0085
0086 self.start_receiver()
0087
0088 while not self.graceful_stop.is_set():
0089 try:
0090 pass
0091 except IDDSException as error:
0092 self.logger.error("Main thread IDDSException: %s" % str(error))
0093 except Exception as error:
0094 self.logger.critical("Main thread exception: %s\n%s" % (str(error), traceback.format_exc()))
0095 time.sleep(5)
0096 self.stop()
0097 except KeyboardInterrupt:
0098 self.stop()
0099
0100 def stop(self):
0101 super(Consumer, self).stop()
0102 self.stop_receiver()
0103
0104
0105 if __name__ == '__main__':
0106 agent = Consumer()
0107 agent()