File indexing completed on 2026-04-10 08:38:58
0001 import json
0002
0003 from pandacommon.kafkapublisher.KafkaPublisher import KafkaPublisher
0004 from pandacommon.pandalogger import logger_utils
0005
0006 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0007
0008 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0009
0010
0011
0012 class KafkaMsgProcPlugin(BaseMsgProcPlugin):
0013 def initialize(self):
0014 """
0015 initialize plugin instance, run once before loop in thread
0016 """
0017 self.publisher = KafkaPublisher()
0018
0019 def process(self, msg_obj, decoded_data=None):
0020 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0021
0022
0023 tmp_log.info("start")
0024 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0025
0026
0027 if decoded_data is None:
0028
0029 try:
0030 message_content = json.loads(msg_obj.data)
0031 except Exception as e:
0032 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0033 tmp_log.error(err_str)
0034 raise
0035 else:
0036 message_content = decoded_data
0037
0038
0039 self.publisher.publish_message(message_content)
0040 tmp_log.debug(f"sent {message_content}")
0041 tmp_log.info("done")
0042
0043 def terminate(self):
0044 self.publisher.close()