Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 
0003 from pandacommon.pandalogger import logger_utils
0004 
0005 from pandajedi.jediddm.DDMInterface import DDMInterface
0006 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0007 from pandajedi.jediorder.ContentsFeeder import ContentsFeederThread
0008 
0009 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0010 
0011 
0012 # Jedi Contents Feeder message processor plugin
0013 class JediContentsFeederMsgProcPlugin(BaseMsgProcPlugin):
0014     """
0015     Message-driven Contents Feeder
0016     """
0017 
0018     def initialize(self):
0019         BaseMsgProcPlugin.initialize(self)
0020         ddmIF = DDMInterface()
0021         ddmIF.setupInterface()
0022         the_pid = self.get_pid()
0023         self.contents_feeder_thread_obj = ContentsFeederThread(taskDsList=None, threadPool=None, taskbufferIF=self.tbIF, ddmIF=ddmIF, pid=the_pid)
0024 
0025     def process(self, msg_obj):
0026         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0027         # start
0028         tmp_log.info("start")
0029         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0030         # parse json
0031         try:
0032             msg_dict = json.loads(msg_obj.data)
0033         except Exception as e:
0034             err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0035             tmp_log.error(err_str)
0036             raise
0037         # sanity check
0038         try:
0039             msg_type = msg_dict["msg_type"]
0040         except Exception as e:
0041             err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0042             tmp_log.error(err_str)
0043             raise
0044         if msg_type != "jedi_contents_feeder":
0045             # FIXME
0046             err_str = f"got unknown msg_type {msg_type} , skipped "
0047             tmp_log.error(err_str)
0048             raise
0049         # run
0050         try:
0051             task_id = msg_dict["taskid"]
0052             task_ds_list = self.tbIF.getDatasetsToFeedContents_JEDI(vo=None, prodSourceLabel=None, task_id=task_id)
0053             if task_ds_list:
0054                 self.contents_feeder_thread_obj.feed_contents_to_tasks(task_ds_list)
0055                 tmp_log.info(f"fed datasets to task {task_id}")
0056             else:
0057                 tmp_log.debug(f"got empty list of datasets to feed to task {task_id}; skipped ")
0058         except Exception as e:
0059             err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0060             tmp_log.error(err_str)
0061             raise
0062         # done
0063         tmp_log.info("done")