Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 
0003 from pandacommon.pandalogger import logger_utils
0004 
0005 from pandajedi.jediconfig import jedi_config
0006 from pandajedi.jedicore.FactoryBase import FactoryBase
0007 from pandajedi.jediddm.DDMInterface import DDMInterface
0008 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0009 from pandajedi.jediorder.PostProcessor import PostProcessorThread
0010 from pandaserver.srvcore.CoreUtils import convert_config_params, parse_init_params
0011 
0012 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0013 
0014 
0015 # Jedi Post-Processor message processor plugin
0016 class JediPostProcessorMsgProcPlugin(BaseMsgProcPlugin):
0017     """
0018     Message-driven Post-Processor
0019     """
0020 
0021     def initialize(self):
0022         BaseMsgProcPlugin.initialize(self)
0023         # DDM interface
0024         ddmIF = DDMInterface()
0025         ddmIF.setupInterface()
0026         # factory bases and post processor thread object for all vos and prodsourcelabels
0027         self.post_processor_thread_dict = dict()
0028         for itemStr in jedi_config.postprocessor.procConfig.split(";"):
0029             items = convert_config_params(itemStr)
0030             vos = parse_init_params(items[0])
0031             prodsourcelabels = parse_init_params(items[1])
0032             tmp_factory_base_obj = FactoryBase(vos=vos, sourceLabels=prodsourcelabels, logger=base_logger, modConfig=jedi_config.postprocessor.modConfig)
0033             tmp_post_processor_thread_obj = PostProcessorThread(
0034                 taskList=None, threadPool=None, taskbufferIF=self.tbIF, ddmIF=ddmIF, implFactory=tmp_factory_base_obj
0035             )
0036             for vo in vos:
0037                 for prodsourcelabel in prodsourcelabels:
0038                     self.post_processor_thread_dict[(vo, prodsourcelabel)] = tmp_post_processor_thread_obj
0039 
0040     def process(self, msg_obj):
0041         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0042         # start
0043         tmp_log.info("start")
0044         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0045         # parse json
0046         try:
0047             msg_dict = json.loads(msg_obj.data)
0048         except Exception as e:
0049             err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0050             tmp_log.error(err_str)
0051             raise
0052         # sanity check
0053         try:
0054             msg_type = msg_dict["msg_type"]
0055         except Exception as e:
0056             err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0057             tmp_log.error(err_str)
0058             raise
0059         if msg_type != "jedi_post_processor":
0060             # FIXME
0061             err_str = f"got unknown msg_type {msg_type} , skipped "
0062             tmp_log.error(err_str)
0063             raise
0064         # run
0065         try:
0066             task_id = msg_dict["taskid"]
0067             vo = msg_dict["task_vo"]
0068             prodsourcelabel = msg_dict["task_prodsourcelabel"]
0069             ret_list = self.tbIF.prepareTasksToBeFinished_JEDI(vo, prodSourceLabel, jedi_config.postprocessor.nTasks, self.get_pid())
0070             task_list = self.tbIF.getTasksToBeFinished_JEDI(vo, prodSourceLabel, self.get_pid(), jedi_config.postprocessor.nTasks, target_tasks=ret_list)
0071             if task_list and task_id in [task_spec.jediTaskID for task_spec in task_list]:
0072                 tmp_post_processor_thread_obj = self.post_processor_thread_dict[(vo, prodsourcelabel)]
0073                 tmp_post_processor_thread_obj.post_process_tasks(task_list)
0074                 tmp_log.info(f"post processed tasks {task_list} including {task_id}")
0075             else:
0076                 tmp_log.debug(f"did not get task {task_id}; skip ")
0077         except Exception as e:
0078             err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0079             tmp_log.error(err_str)
0080             raise
0081         # done
0082         tmp_log.info("done")