Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019
0010 
0011 import time
0012 import traceback
0013 try:
0014     # python 3
0015     from queue import Queue
0016 except ImportError:
0017     # Python 2
0018     from Queue import Queue
0019 
0020 from idds.common.constants import (Sections, MessageStatus)
0021 from idds.common.exceptions import AgentPluginError, IDDSException
0022 from idds.common.utils import setup_logging
0023 from idds.core import messages as core_messages
0024 from idds.agents.common.baseagent import BaseAgent
0025 
0026 
0027 setup_logging(__name__)
0028 
0029 
0030 class Consumer(BaseAgent):
0031     """
0032     Consumer works to notify workload management that the data is available.
0033     """
0034 
0035     def __init__(self, num_threads=1, retrieve_bulk_size=None, **kwargs):
0036         super(Consumer, self).__init__(num_threads=num_threads, **kwargs)
0037         self.config_section = Sections.Consumer
0038         self.retrieve_bulk_size = int(retrieve_bulk_size)
0039         self.message_queue = Queue()
0040 
0041     def __del__(self):
0042         self.stop_receiver()
0043 
0044     def get_messages(self):
0045         """
0046         Get messages
0047         """
0048         messages = core_messages.retrieve_messages(status=MessageStatus.New, bulk_size=self.retrieve_bulk_size)
0049 
0050         self.logger.debug("Main thread get %s new messages" % len(messages))
0051         if messages:
0052             self.logger.info("Main thread get %s new messages" % len(messages))
0053 
0054         return messages
0055 
0056     def clean_messages(self, msgs):
0057         # core_messages.delete_messages(msgs)
0058         to_updates = []
0059         for msg in msgs:
0060             to_updates.append({'msg_id': msg['msg_id'],
0061                                'status': MessageStatus.Delivered})
0062         core_messages.update_messages(to_updates)
0063 
0064     def start_receiver(self):
0065         if 'receiver' not in self.plugins:
0066             raise AgentPluginError('Plugin receiver is required')
0067         self.receiver = self.plugins['receiver']
0068 
0069         self.logger.info("Starting receiver: %s" % self.receiver)
0070         self.receiver.set_request_queue(self.message_queue)
0071         self.receiver.start()
0072 
0073     def stop_receiver(self):
0074         if hasattr(self, 'receiver') and self.receiver:
0075             self.logger.info("Stopping receiver: %s" % self.receiver)
0076             self.receiver.stop()
0077 
0078     def run(self):
0079         """
0080         Main run function.
0081         """
0082         try:
0083             self.logger.info("Starting main thread")
0084             self.load_plugins()
0085 
0086             self.start_receiver()
0087 
0088             while not self.graceful_stop.is_set():
0089                 try:
0090                     pass
0091                 except IDDSException as error:
0092                     self.logger.error("Main thread IDDSException: %s" % str(error))
0093                 except Exception as error:
0094                     self.logger.critical("Main thread exception: %s\n%s" % (str(error), traceback.format_exc()))
0095                 time.sleep(5)
0096             self.stop()
0097         except KeyboardInterrupt:
0098             self.stop()
0099 
0100     def stop(self):
0101         super(Consumer, self).stop()
0102         self.stop_receiver()
0103 
0104 
0105 if __name__ == '__main__':
0106     agent = Consumer()
0107     agent()