File indexing completed on 2026-04-10 08:38:58
0001 import json
0002
0003 from pandacommon.pandalogger import logger_utils
0004
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006 from pandaserver.workflow.workflow_core import WorkflowInterface
0007
0008 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0009
0010
0011
0012 class WorkflowManagerMsgProcPlugin(BaseMsgProcPlugin):
0013 """
0014 Message-driven workflow manager
0015 """
0016
0017 def initialize(self):
0018 """
0019 Initialize the plugin
0020 """
0021 BaseMsgProcPlugin.initialize(self)
0022 self.workflow_interface = WorkflowInterface(self.tbIF)
0023
0024 def process(self, msg_obj):
0025 """
0026 Process the message
0027 Typical message data looks like:
0028 {"msg_type":"workflow", "workflow_id": 123, "timestamp": 987654321}
0029 {"msg_type":"wfstep", "step_id": 456, "timestamp": 987654321}
0030 {"msg_type":"wfdata", "data_id": 789, "timestamp": 987654321}
0031
0032 Args:
0033 msg_obj: message object
0034 """
0035 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0036
0037 tmp_log.info("start")
0038 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0039
0040 try:
0041 msg_dict = json.loads(msg_obj.data)
0042 except Exception as e:
0043 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0044 tmp_log.error(err_str)
0045 raise
0046
0047 try:
0048 msg_type = msg_dict["msg_type"]
0049 except Exception as e:
0050 err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0051 tmp_log.error(err_str)
0052 raise
0053 if msg_type not in ("workflow", "wfstep", "wfdata"):
0054 err_str = f"got unknown msg_type {msg_type} , skipped "
0055 tmp_log.error(err_str)
0056 raise
0057
0058 try:
0059 tmp_log.info(f"got message {msg_dict}")
0060 if msg_type == "workflow":
0061 workflow_id = msg_dict["workflow_id"]
0062 workflow_spec = self.tbIF.get_workflow(workflow_id)
0063 if workflow_spec is None:
0064 tmp_log.warning(f"workflow_id={workflow_id} not found; skipped")
0065 return
0066 stats, workflow_spec = self.workflow_interface.process_workflow(workflow_spec, by="msg")
0067 tmp_log.info(f"processed workflow_id={workflow_id}")
0068 elif msg_type == "wfstep":
0069 step_id = msg_dict["step_id"]
0070 step_spec = self.tbIF.get_workflow_step(step_id)
0071 if step_spec is None:
0072 tmp_log.warning(f"step_id={step_id} not found; skipped")
0073 return
0074 stats, step_spec = self.workflow_interface.process_step(step_spec, by="msg")
0075 tmp_log.info(f"processed step_id={step_id}")
0076 elif msg_type == "wfdata":
0077 data_id = msg_dict["data_id"]
0078 data_spec = self.tbIF.get_workflow_data(data_id)
0079 if data_spec is None:
0080 tmp_log.warning(f"data_id={data_id} not found; skipped")
0081 return
0082 stats, data_spec = self.workflow_interface.process_data(data_spec, by="msg")
0083 tmp_log.info(f"processed data_id={data_id}")
0084 except Exception as e:
0085 err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0086 tmp_log.error(err_str)
0087 raise
0088
0089 tmp_log.info("done")