File indexing completed on 2026-04-10 08:38:58
0001 import json
0002
0003 from pandacommon.pandalogger import logger_utils
0004
0005 from pandajedi.jediddm.DDMInterface import DDMInterface
0006 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0007 from pandajedi.jediorder.ContentsFeeder import ContentsFeederThread
0008
0009 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0010
0011
0012
0013 class JediContentsFeederMsgProcPlugin(BaseMsgProcPlugin):
0014 """
0015 Message-driven Contents Feeder
0016 """
0017
0018 def initialize(self):
0019 BaseMsgProcPlugin.initialize(self)
0020 ddmIF = DDMInterface()
0021 ddmIF.setupInterface()
0022 the_pid = self.get_pid()
0023 self.contents_feeder_thread_obj = ContentsFeederThread(taskDsList=None, threadPool=None, taskbufferIF=self.tbIF, ddmIF=ddmIF, pid=the_pid)
0024
0025 def process(self, msg_obj):
0026 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0027
0028 tmp_log.info("start")
0029 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0030
0031 try:
0032 msg_dict = json.loads(msg_obj.data)
0033 except Exception as e:
0034 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0035 tmp_log.error(err_str)
0036 raise
0037
0038 try:
0039 msg_type = msg_dict["msg_type"]
0040 except Exception as e:
0041 err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0042 tmp_log.error(err_str)
0043 raise
0044 if msg_type != "jedi_contents_feeder":
0045
0046 err_str = f"got unknown msg_type {msg_type} , skipped "
0047 tmp_log.error(err_str)
0048 raise
0049
0050 try:
0051 task_id = msg_dict["taskid"]
0052 task_ds_list = self.tbIF.getDatasetsToFeedContents_JEDI(vo=None, prodSourceLabel=None, task_id=task_id)
0053 if task_ds_list:
0054 self.contents_feeder_thread_obj.feed_contents_to_tasks(task_ds_list)
0055 tmp_log.info(f"fed datasets to task {task_id}")
0056 else:
0057 tmp_log.debug(f"got empty list of datasets to feed to task {task_id}; skipped ")
0058 except Exception as e:
0059 err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0060 tmp_log.error(err_str)
0061 raise
0062
0063 tmp_log.info("done")