File indexing completed on 2026-04-10 08:38:58
0001 import re
0002 import traceback
0003
0004 import yaml
0005 from pandacommon.pandalogger import LogWrapper, logger_utils
0006
0007 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0008 from pandaserver.dataservice.ddm_handler import DDMHandler
0009
0010 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0011
0012
0013
0014 class PandaCallbackMsgProcPlugin(BaseMsgProcPlugin):
0015
0016 def __init__(self, **params):
0017 super().__init__(**params)
0018 self.activities_with_file_callback = []
0019 self.component_action_map = []
0020 self.site_mapper = None
0021 self.verbose = False
0022
0023 def initialize(self, **params):
0024 BaseMsgProcPlugin.initialize(self, **params)
0025
0026 self.activities_with_file_callback = self.params.get("activities_with_file_callback", [])
0027
0028
0029 self.component_action_map = self.params.get("component_action_map", [])
0030
0031 self.site_mapper = self.tbIF.get_site_mapper()
0032
0033 self.verbose = self.params.get("verbose", False)
0034
0035 def process(self, msg_obj):
0036 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0037
0038
0039
0040
0041 try:
0042 message_dict = yaml.safe_load(msg_obj.data)
0043 except Exception as e:
0044 err_str = f"failed to parse message yaml {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0045 tmp_log.error(err_str)
0046 raise
0047
0048 try:
0049 to_continue = True
0050 dsn = "UNKNOWN"
0051
0052 if not isinstance(message_dict, dict):
0053 err_str = f"skip due to invalid message format:{type(message_dict).__name__}. msg:{str(message_dict)}"
0054 tmp_log.warning(err_str)
0055 return
0056 event_type = message_dict["event_type"]
0057 message_ids = f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}"
0058 if event_type in ["datasetlock_ok"]:
0059 self.process_dataset_callback(event_type, message_ids, message_dict, tmp_log)
0060 elif self.activities_with_file_callback and event_type in ["transfer-done"]:
0061 self.process_file_callback(event_type, message_ids, message_dict, tmp_log)
0062 else:
0063 if self.verbose:
0064 tmp_log.debug(f"skip event_type={event_type}")
0065
0066 if self.component_action_map:
0067 self.trigger_component_action(event_type, message_ids, message_dict, tmp_log)
0068
0069 except Exception as e:
0070 err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}\n{traceback.format_exc()}"
0071 tmp_log.error(err_str)
0072 raise
0073
0074 def process_dataset_callback(self, event_type: str, message_ids: str, message_dict: dict, tmp_log: LogWrapper.LogWrapper) -> None:
0075 """
0076 Process a dataset callback
0077
0078 Args:
0079 event_type: message event type
0080 message_ids: subscription and message IDs
0081 message_dict: message dictionary
0082 tmp_log: logger instance
0083 """
0084 message_payload = message_dict["payload"]
0085
0086 dsn = message_payload["name"]
0087 if (re.search(r"_dis\d+$", dsn) is None) and (re.search(r"_sub\d+$", dsn) is None):
0088 return
0089 tmp_log.debug(message_ids)
0090 tmp_log.debug(f"{event_type} start")
0091
0092 scope = message_payload["scope"]
0093 site = message_payload["rse"]
0094 tmp_log.debug(f"{dsn} site={site} type={event_type}")
0095 thr = DDMHandler(task_buffer=self.tbIF, vuid=None, site=site, dataset=dsn, scope=scope)
0096
0097 thr.run()
0098 del thr
0099 tmp_log.debug(f"done {dsn}")
0100 return
0101
0102 def process_file_callback(self, event_type: str, message_ids: str, message_dict: dict, tmp_log: LogWrapper.LogWrapper) -> None:
0103 """
0104 Process a file callback
0105
0106 Args:
0107 event_type: message event type
0108 message_ids: subscription and message IDs
0109 message_dict: message dictionary
0110 tmp_log: logger instance
0111 """
0112 message_payload = message_dict["payload"]
0113
0114 activity = message_payload["activity"]
0115 if activity not in self.activities_with_file_callback:
0116 return
0117
0118 filename = message_payload["name"]
0119 endpoint = message_payload["dst-rse"]
0120 tmp_log.debug(message_ids)
0121 tmp_log.debug(f"{event_type} start for lfn={filename} activity={activity}")
0122 sites = self.site_mapper.get_sites_for_endpoint(endpoint, "input")
0123 panda_ids = self.tbIF.update_input_files_at_sites_and_get_panda_ids(filename, sites)
0124 jobs = self.tbIF.peekJobs(panda_ids, fromActive=False, fromArchived=False, fromWaiting=False)
0125
0126 self.tbIF.activateJobs(jobs)
0127 tmp_log.debug(f"done")
0128 return
0129
0130 def trigger_component_action(self, event_type: str, message_ids: str, message_dict: dict, tmp_log: LogWrapper.LogWrapper) -> None:
0131 """
0132 Trigger component action based on the event type
0133 Args:
0134 event_type: message event type
0135 message_ids: subscription and message IDs
0136 message_dict: message dictionary
0137 tmp_log: logger instance
0138 """
0139 message_payload = message_dict["payload"]
0140 for action_item in self.component_action_map:
0141
0142 if action_item["event_type"] != event_type:
0143 continue
0144
0145 criteria_matched = True
0146 for key, value in action_item.get("criteria", {}).items():
0147 dict_value = message_payload.get(key)
0148 if dict_value is None:
0149 criteria_matched = False
0150 break
0151 if dict_value != value and not re.match(value, str(dict_value)):
0152 criteria_matched = False
0153 break
0154 if not criteria_matched:
0155 continue
0156 component_name = action_item["component"]
0157 to_id = action_item["to_id"]
0158
0159 jedi_task_ids = None
0160 if to_id == "from_input_dataset":
0161 dataset_name = message_payload["name"].split(":")[-1]
0162 jedi_task_ids = self.tbIF.get_task_ids_with_dataset_attributes({"datasetName": dataset_name, "type": "input"})
0163 else:
0164 tmp_log.warning(f"unknown to_id={to_id} for action_item={action_item} ; skipped")
0165 continue
0166 if jedi_task_ids is None:
0167 tmp_log.warning(f"failed to extract jediTaskID for action_item={action_item} ; skipped")
0168 continue
0169 if not jedi_task_ids:
0170 tmp_log.debug(f"no jediTaskID found for action_item={action_item} ; skipped")
0171 continue
0172
0173 for jedi_task_id in jedi_task_ids:
0174
0175 self.tbIF.release_task_on_hold(jedi_task_id)
0176
0177 push_ret = self.tbIF.push_task_trigger_message(component_name, jedi_task_ids)
0178 if push_ret:
0179 tmp_log.debug(f"pushed trigger message to {component_name} for jediTaskID={jedi_task_ids}")
0180 else:
0181 tmp_log.warning(f"failed to push trigger to {component_name} for jediTaskID={jedi_task_ids}")
0182 return