File indexing completed on 2026-04-10 08:38:58
0001 import json
0002
0003 from pandacommon.pandalogger import logger_utils
0004
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006 from pandajedi.jedimsgprocessor.hpo_msg_processor import HPOMsgProcPlugin
0007 from pandajedi.jedimsgprocessor.processing_msg_processor import ProcessingMsgProcPlugin
0008 from pandajedi.jedimsgprocessor.tape_carousel_msg_processor import (
0009 TapeCarouselMsgProcPlugin,
0010 )
0011
0012 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0013
0014
0015
0016 class AtlasIddsMsgProcPlugin(BaseMsgProcPlugin):
0017 def initialize(self):
0018 BaseMsgProcPlugin.initialize(self)
0019 self.plugin_TapeCarousel = TapeCarouselMsgProcPlugin()
0020 self.plugin_HPO = HPOMsgProcPlugin()
0021 self.plugin_Processing = ProcessingMsgProcPlugin()
0022
0023 for _plugin in [self.plugin_TapeCarousel, self.plugin_HPO, self.plugin_Processing]:
0024
0025 _plugin.initialize(in_collective=True)
0026
0027 _plugin.tbIF = self.tbIF
0028
0029 def process(self, msg_obj):
0030 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0031
0032 tmp_log.info("start")
0033 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0034
0035 try:
0036 msg_dict = json.loads(msg_obj.data)
0037 except Exception as e:
0038 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0039 tmp_log.error(err_str)
0040 raise
0041
0042 try:
0043 msg_type = msg_dict.get("msg_type")
0044 except Exception as e:
0045 err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0046 tmp_log.error(err_str)
0047 raise
0048
0049 try:
0050 if msg_type in ("file_stagein", "collection_stagein", "work_stagein"):
0051 tmp_log.debug("to tape_carousel")
0052 self.plugin_TapeCarousel.process(msg_obj, decoded_data=msg_dict)
0053 elif msg_type in ("file_hyperparameteropt", "collection_hyperparameteropt", "work_hyperparameteropt"):
0054 tmp_log.debug("to hpo")
0055 self.plugin_HPO.process(msg_obj, decoded_data=msg_dict)
0056 elif msg_type in ("file_processing", "collection_processing", "work_processing"):
0057 tmp_log.debug("to processing")
0058 self.plugin_Processing.process(msg_obj, decoded_data=msg_dict)
0059 else:
0060
0061 warn_str = f"unknown msg_type: {msg_type} msg_dict: {str(msg_dict)}"
0062 tmp_log.warning(warn_str)
0063 except Exception:
0064 raise
0065
0066 tmp_log.info("done")