Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 
0003 from pandacommon.kafkapublisher.KafkaPublisher import KafkaPublisher
0004 from pandacommon.pandalogger import logger_utils
0005 
0006 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0007 
0008 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0009 
0010 
0011 # Kafka message processing plugin
0012 class KafkaMsgProcPlugin(BaseMsgProcPlugin):
0013     def initialize(self):
0014         """
0015         initialize plugin instance, run once before loop in thread
0016         """
0017         self.publisher = KafkaPublisher()
0018 
0019     def process(self, msg_obj, decoded_data=None):
0020         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0021 
0022         # start
0023         tmp_log.info("start")
0024         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0025 
0026         # Parse and access the message content from msg_obj.data
0027         if decoded_data is None:
0028             # json decode
0029             try:
0030                 message_content = json.loads(msg_obj.data)
0031             except Exception as e:
0032                 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0033                 tmp_log.error(err_str)
0034                 raise
0035         else:
0036             message_content = decoded_data
0037 
0038         # Publish the message to Kafka
0039         self.publisher.publish_message(message_content)
0040         tmp_log.debug(f"sent {message_content}")
0041         tmp_log.info("done")
0042 
0043     def terminate(self):
0044         self.publisher.close()