Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import copy
0002 import json
0003 import re
0004 import traceback
0005 
0006 from pandacommon.pandalogger import logger_utils
0007 
0008 from pandajedi.jediconfig import jedi_config
0009 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0010 
0011 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0012 
0013 
0014 # list of task statuses to return to iDDS
0015 to_return_task_status_list = [
0016     "defined",
0017     "ready",
0018     "scouting",
0019     "pending",
0020     "paused",
0021     "running",
0022     "prepared",
0023     "done",
0024     "finished",
0025     "failed",
0026     "broken",
0027     "aborted",
0028 ]
0029 
0030 to_return_job_status_list = [
0031     "pending",
0032     "defined",
0033     "assigned",
0034     "activated",
0035     "starting",
0036     "running",
0037     "throttled",
0038     "finished",
0039     "failed",
0040     "cancelled",
0041     "closed",
0042 ]
0043 
0044 
0045 # status report message processing plugin
0046 class StatusReportMsgProcPlugin(BaseMsgProcPlugin):
0047     """
0048     Messaging status of jobs and tasks
0049     Forward the incoming message to other msg_processors plugin (e.g. Kafka) if configured in params
0050     Return the processed message to send to iDDS via MQ
0051     """
0052 
0053     def initialize(self):
0054         BaseMsgProcPlugin.initialize(self)
0055         # forwarding plugins: incoming message will be forwarded to process method of these plugins
0056         self.forwarding_plugins = []
0057         forwarding_plugin_names = self.params.get("forwarding_plugins", [])
0058         if "kafka" in forwarding_plugin_names:
0059             # Kafka
0060             from pandajedi.jedimsgprocessor.kafka_msg_processor import (
0061                 KafkaMsgProcPlugin,
0062             )
0063 
0064             plugin_inst = KafkaMsgProcPlugin()
0065             plugin_inst.initialize()
0066             self.forwarding_plugins.append(plugin_inst)
0067 
0068     def process(self, msg_obj):
0069         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0070         # start
0071         tmp_log.info("start")
0072         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0073         # parse json
0074         try:
0075             msg_dict = json.loads(msg_obj.data)
0076         except Exception as exc:
0077             err_str = f"failed to parse message json {msg_obj.data} , skipped. {exc.__class__.__name__} : {exc} ; {traceback.format_exc()}"
0078             tmp_log.error(err_str)
0079             raise
0080         # sanity check
0081         try:
0082             msg_type = msg_dict["msg_type"]
0083         except Exception as e:
0084             err_str = f"failed to parse message object dict {msg_dict} , skipped. {exc.__class__.__name__} : {exc} ; {traceback.format_exc()}"
0085             tmp_log.error(err_str)
0086             raise
0087         # whether to return the message
0088         to_return_message = False
0089         # run different plugins according to message type
0090         try:
0091             # DB source name from DB schema
0092             db_source_name = re.sub("_PANDA", "", jedi_config.db.schemaJEDI).lower()
0093             # process according to msg_type
0094             if msg_type == "task_status":
0095                 tmp_log.debug("task_status")
0096                 # forwarding
0097                 for plugin_inst in self.forwarding_plugins:
0098                     try:
0099                         tmp_msg_dict = copy.deepcopy(msg_dict)
0100                         tmp_msg_dict["db_source"] = db_source_name
0101                         plugin_inst.process(msg_obj, decoded_data=tmp_msg_dict)
0102                     except Exception as exc:
0103                         tmp_log.error(f"failed to process message object dict {tmp_msg_dict}; {exc.__class__.__name__} : {exc} ; {traceback.format_exc()}")
0104                 # only return certain statuses
0105                 if msg_dict.get("status") in to_return_task_status_list:
0106                     to_return_message = True
0107             elif msg_type == "job_status":
0108                 tmp_log.debug("job_status")
0109                 # forwarding
0110                 for plugin_inst in self.forwarding_plugins:
0111                     try:
0112                         tmp_msg_dict = copy.deepcopy(msg_dict)
0113                         tmp_msg_dict["db_source"] = db_source_name
0114                         plugin_inst.process(msg_obj, decoded_data=tmp_msg_dict)
0115                     except Exception as exc:
0116                         tmp_log.error(f"failed to process message object dict {tmp_msg_dict}; {exc.__class__.__name__} : {exc} ; {traceback.format_exc()}")
0117                 # only return certain statuses
0118                 if msg_dict.get("status") in to_return_job_status_list:
0119                     to_return_message = True
0120             else:
0121                 warn_str = f"unknown msg_type : {msg_type}"
0122                 tmp_log.warning(warn_str)
0123         except Exception:
0124             raise
0125         # done
0126         tmp_log.debug(f"{msg_dict}; to_return={to_return_message}")
0127         tmp_log.info("done")
0128         if to_return_message:
0129             return msg_obj.data