Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 
0003 from pandacommon.pandalogger import logger_utils
0004 
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006 from pandaserver.workflow.workflow_core import WorkflowInterface
0007 
0008 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0009 
0010 
0011 # Workflow manager message processor plugin
0012 class WorkflowManagerMsgProcPlugin(BaseMsgProcPlugin):
0013     """
0014     Message-driven workflow manager
0015     """
0016 
0017     def initialize(self):
0018         """
0019         Initialize the plugin
0020         """
0021         BaseMsgProcPlugin.initialize(self)
0022         self.workflow_interface = WorkflowInterface(self.tbIF)
0023 
0024     def process(self, msg_obj):
0025         """
0026         Process the message
0027         Typical message data looks like:
0028             {"msg_type":"workflow", "workflow_id": 123, "timestamp": 987654321}
0029             {"msg_type":"wfstep", "step_id": 456, "timestamp": 987654321}
0030             {"msg_type":"wfdata", "data_id": 789, "timestamp": 987654321}
0031 
0032         Args:
0033             msg_obj: message object
0034         """
0035         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0036         # start
0037         tmp_log.info("start")
0038         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0039         # parse json
0040         try:
0041             msg_dict = json.loads(msg_obj.data)
0042         except Exception as e:
0043             err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0044             tmp_log.error(err_str)
0045             raise
0046         # sanity check
0047         try:
0048             msg_type = msg_dict["msg_type"]
0049         except Exception as e:
0050             err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0051             tmp_log.error(err_str)
0052             raise
0053         if msg_type not in ("workflow", "wfstep", "wfdata"):
0054             err_str = f"got unknown msg_type {msg_type} , skipped "
0055             tmp_log.error(err_str)
0056             raise
0057         # run
0058         try:
0059             tmp_log.info(f"got message {msg_dict}")
0060             if msg_type == "workflow":
0061                 workflow_id = msg_dict["workflow_id"]
0062                 workflow_spec = self.tbIF.get_workflow(workflow_id)
0063                 if workflow_spec is None:
0064                     tmp_log.warning(f"workflow_id={workflow_id} not found; skipped")
0065                     return
0066                 stats, workflow_spec = self.workflow_interface.process_workflow(workflow_spec, by="msg")
0067                 tmp_log.info(f"processed workflow_id={workflow_id}")
0068             elif msg_type == "wfstep":
0069                 step_id = msg_dict["step_id"]
0070                 step_spec = self.tbIF.get_workflow_step(step_id)
0071                 if step_spec is None:
0072                     tmp_log.warning(f"step_id={step_id} not found; skipped")
0073                     return
0074                 stats, step_spec = self.workflow_interface.process_step(step_spec, by="msg")
0075                 tmp_log.info(f"processed step_id={step_id}")
0076             elif msg_type == "wfdata":
0077                 data_id = msg_dict["data_id"]
0078                 data_spec = self.tbIF.get_workflow_data(data_id)
0079                 if data_spec is None:
0080                     tmp_log.warning(f"data_id={data_id} not found; skipped")
0081                     return
0082                 stats, data_spec = self.workflow_interface.process_data(data_spec, by="msg")
0083                 tmp_log.info(f"processed data_id={data_id}")
0084         except Exception as e:
0085             err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}"
0086             tmp_log.error(err_str)
0087             raise
0088         # done
0089         tmp_log.info("done")