Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 
0003 from pandacommon.pandalogger import logger_utils
0004 
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006 from pandajedi.jedimsgprocessor.hpo_msg_processor import HPOMsgProcPlugin
0007 from pandajedi.jedimsgprocessor.processing_msg_processor import ProcessingMsgProcPlugin
0008 from pandajedi.jedimsgprocessor.tape_carousel_msg_processor import (
0009     TapeCarouselMsgProcPlugin,
0010 )
0011 
0012 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0013 
0014 
0015 # Atlas iDDS message processing plugin, a bridge connect to other idds related message processing plugins
0016 class AtlasIddsMsgProcPlugin(BaseMsgProcPlugin):
0017     def initialize(self):
0018         BaseMsgProcPlugin.initialize(self)
0019         self.plugin_TapeCarousel = TapeCarouselMsgProcPlugin()
0020         self.plugin_HPO = HPOMsgProcPlugin()
0021         self.plugin_Processing = ProcessingMsgProcPlugin()
0022         # for each plugin
0023         for _plugin in [self.plugin_TapeCarousel, self.plugin_HPO, self.plugin_Processing]:
0024             # initialize each
0025             _plugin.initialize(in_collective=True)
0026             # use the same taskBuffer interface
0027             _plugin.tbIF = self.tbIF
0028 
0029     def process(self, msg_obj):
0030         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0031         # start
0032         tmp_log.info("start")
0033         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0034         # parse json
0035         try:
0036             msg_dict = json.loads(msg_obj.data)
0037         except Exception as e:
0038             err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0039             tmp_log.error(err_str)
0040             raise
0041         # sanity check
0042         try:
0043             msg_type = msg_dict.get("msg_type")
0044         except Exception as e:
0045             err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0046             tmp_log.error(err_str)
0047             raise
0048         # run different plugins according to message type
0049         try:
0050             if msg_type in ("file_stagein", "collection_stagein", "work_stagein"):
0051                 tmp_log.debug("to tape_carousel")
0052                 self.plugin_TapeCarousel.process(msg_obj, decoded_data=msg_dict)
0053             elif msg_type in ("file_hyperparameteropt", "collection_hyperparameteropt", "work_hyperparameteropt"):
0054                 tmp_log.debug("to hpo")
0055                 self.plugin_HPO.process(msg_obj, decoded_data=msg_dict)
0056             elif msg_type in ("file_processing", "collection_processing", "work_processing"):
0057                 tmp_log.debug("to processing")
0058                 self.plugin_Processing.process(msg_obj, decoded_data=msg_dict)
0059             else:
0060                 # Asked by iDDS and message broker guys, JEDI needs to consume unknown types of messages and do nothing...
0061                 warn_str = f"unknown msg_type: {msg_type} msg_dict: {str(msg_dict)}"
0062                 tmp_log.warning(warn_str)
0063         except Exception:
0064             raise
0065         # done
0066         tmp_log.info("done")