File indexing completed on 2026-04-10 08:38:58
0001 import copy
0002 import json
0003 import re
0004 import traceback
0005
0006 from pandacommon.pandalogger import logger_utils
0007
0008 from pandajedi.jediconfig import jedi_config
0009 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0010
0011 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0012
0013
0014
0015 to_return_task_status_list = [
0016 "defined",
0017 "ready",
0018 "scouting",
0019 "pending",
0020 "paused",
0021 "running",
0022 "prepared",
0023 "done",
0024 "finished",
0025 "failed",
0026 "broken",
0027 "aborted",
0028 ]
0029
0030 to_return_job_status_list = [
0031 "pending",
0032 "defined",
0033 "assigned",
0034 "activated",
0035 "starting",
0036 "running",
0037 "throttled",
0038 "finished",
0039 "failed",
0040 "cancelled",
0041 "closed",
0042 ]
0043
0044
0045
0046 class StatusReportMsgProcPlugin(BaseMsgProcPlugin):
0047 """
0048 Messaging status of jobs and tasks
0049 Forward the incoming message to other msg_processors plugin (e.g. Kafka) if configured in params
0050 Return the processed message to send to iDDS via MQ
0051 """
0052
0053 def initialize(self):
0054 BaseMsgProcPlugin.initialize(self)
0055
0056 self.forwarding_plugins = []
0057 forwarding_plugin_names = self.params.get("forwarding_plugins", [])
0058 if "kafka" in forwarding_plugin_names:
0059
0060 from pandajedi.jedimsgprocessor.kafka_msg_processor import (
0061 KafkaMsgProcPlugin,
0062 )
0063
0064 plugin_inst = KafkaMsgProcPlugin()
0065 plugin_inst.initialize()
0066 self.forwarding_plugins.append(plugin_inst)
0067
0068 def process(self, msg_obj):
0069 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0070
0071 tmp_log.info("start")
0072 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0073
0074 try:
0075 msg_dict = json.loads(msg_obj.data)
0076 except Exception as exc:
0077 err_str = f"failed to parse message json {msg_obj.data} , skipped. {exc.__class__.__name__} : {exc} ; {traceback.format_exc()}"
0078 tmp_log.error(err_str)
0079 raise
0080
0081 try:
0082 msg_type = msg_dict["msg_type"]
0083 except Exception as e:
0084 err_str = f"failed to parse message object dict {msg_dict} , skipped. {exc.__class__.__name__} : {exc} ; {traceback.format_exc()}"
0085 tmp_log.error(err_str)
0086 raise
0087
0088 to_return_message = False
0089
0090 try:
0091
0092 db_source_name = re.sub("_PANDA", "", jedi_config.db.schemaJEDI).lower()
0093
0094 if msg_type == "task_status":
0095 tmp_log.debug("task_status")
0096
0097 for plugin_inst in self.forwarding_plugins:
0098 try:
0099 tmp_msg_dict = copy.deepcopy(msg_dict)
0100 tmp_msg_dict["db_source"] = db_source_name
0101 plugin_inst.process(msg_obj, decoded_data=tmp_msg_dict)
0102 except Exception as exc:
0103 tmp_log.error(f"failed to process message object dict {tmp_msg_dict}; {exc.__class__.__name__} : {exc} ; {traceback.format_exc()}")
0104
0105 if msg_dict.get("status") in to_return_task_status_list:
0106 to_return_message = True
0107 elif msg_type == "job_status":
0108 tmp_log.debug("job_status")
0109
0110 for plugin_inst in self.forwarding_plugins:
0111 try:
0112 tmp_msg_dict = copy.deepcopy(msg_dict)
0113 tmp_msg_dict["db_source"] = db_source_name
0114 plugin_inst.process(msg_obj, decoded_data=tmp_msg_dict)
0115 except Exception as exc:
0116 tmp_log.error(f"failed to process message object dict {tmp_msg_dict}; {exc.__class__.__name__} : {exc} ; {traceback.format_exc()}")
0117
0118 if msg_dict.get("status") in to_return_job_status_list:
0119 to_return_message = True
0120 else:
0121 warn_str = f"unknown msg_type : {msg_type}"
0122 tmp_log.warning(warn_str)
0123 except Exception:
0124 raise
0125
0126 tmp_log.debug(f"{msg_dict}; to_return={to_return_message}")
0127 tmp_log.info("done")
0128 if to_return_message:
0129 return msg_obj.data