File indexing completed on 2026-04-10 08:38:58
0001 import json
0002
0003 from pandacommon.pandalogger import logger_utils
0004
0005 from pandajedi.jediconfig import jedi_config
0006 from pandajedi.jedicore.FactoryBase import FactoryBase
0007 from pandajedi.jediddm.DDMInterface import DDMInterface
0008 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0009 from pandajedi.jediorder.PostProcessor import PostProcessorThread
0010 from pandaserver.srvcore.CoreUtils import convert_config_params, parse_init_params
0011
0012 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0013
0014
0015
0016 class JediPostProcessorMsgProcPlugin(BaseMsgProcPlugin):
0017 """
0018 Message-driven Post-Processor
0019 """
0020
0021 def initialize(self):
0022 BaseMsgProcPlugin.initialize(self)
0023
0024 ddmIF = DDMInterface()
0025 ddmIF.setupInterface()
0026
0027 self.post_processor_thread_dict = dict()
0028 for itemStr in jedi_config.postprocessor.procConfig.split(";"):
0029 items = convert_config_params(itemStr)
0030 vos = parse_init_params(items[0])
0031 prodsourcelabels = parse_init_params(items[1])
0032 tmp_factory_base_obj = FactoryBase(vos=vos, sourceLabels=prodsourcelabels, logger=base_logger, modConfig=jedi_config.postprocessor.modConfig)
0033 tmp_post_processor_thread_obj = PostProcessorThread(
0034 taskList=None, threadPool=None, taskbufferIF=self.tbIF, ddmIF=ddmIF, implFactory=tmp_factory_base_obj
0035 )
0036 for vo in vos:
0037 for prodsourcelabel in prodsourcelabels:
0038 self.post_processor_thread_dict[(vo, prodsourcelabel)] = tmp_post_processor_thread_obj
0039
0040 def process(self, msg_obj):
0041 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0042
0043 tmp_log.info("start")
0044 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0045
0046 try:
0047 msg_dict = json.loads(msg_obj.data)
0048 except Exception as e:
0049 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0050 tmp_log.error(err_str)
0051 raise
0052
0053 try:
0054 msg_type = msg_dict["msg_type"]
0055 except Exception as e:
0056 err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0057 tmp_log.error(err_str)
0058 raise
0059 if msg_type != "jedi_post_processor":
0060
0061 err_str = f"got unknown msg_type {msg_type} , skipped "
0062 tmp_log.error(err_str)
0063 raise
0064
0065 try:
0066 task_id = msg_dict["taskid"]
0067 vo = msg_dict["task_vo"]
0068 prodsourcelabel = msg_dict["task_prodsourcelabel"]
0069 ret_list = self.tbIF.prepareTasksToBeFinished_JEDI(vo, prodSourceLabel, jedi_config.postprocessor.nTasks, self.get_pid())
0070 task_list = self.tbIF.getTasksToBeFinished_JEDI(vo, prodSourceLabel, self.get_pid(), jedi_config.postprocessor.nTasks, target_tasks=ret_list)
0071 if task_list and task_id in [task_spec.jediTaskID for task_spec in task_list]:
0072 tmp_post_processor_thread_obj = self.post_processor_thread_dict[(vo, prodsourcelabel)]
0073 tmp_post_processor_thread_obj.post_process_tasks(task_list)
0074 tmp_log.info(f"post processed tasks {task_list} including {task_id}")
0075 else:
0076 tmp_log.debug(f"did not get task {task_id}; skip ")
0077 except Exception as e:
0078 err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0079 tmp_log.error(err_str)
0080 raise
0081
0082 tmp_log.info("done")