Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import re
0002 import traceback
0003 
0004 import yaml
0005 from pandacommon.pandalogger import LogWrapper, logger_utils
0006 
0007 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0008 from pandaserver.dataservice.ddm_handler import DDMHandler
0009 
0010 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0011 
0012 
0013 # panda dataset callback message processing plugin
0014 class PandaCallbackMsgProcPlugin(BaseMsgProcPlugin):
0015 
0016     def __init__(self, **params):
0017         super().__init__(**params)
0018         self.activities_with_file_callback = []
0019         self.component_action_map = []
0020         self.site_mapper = None
0021         self.verbose = False
0022 
0023     def initialize(self, **params):
0024         BaseMsgProcPlugin.initialize(self, **params)
0025         # activity list to use file callback
0026         self.activities_with_file_callback = self.params.get("activities_with_file_callback", [])
0027         # component action map
0028         # [{"event_type": "<event_type>", "component": "<component_name>", "criteria": {"key": <value>,}, "to_id": "<how_to_get_task_id>}, ...]
0029         self.component_action_map = self.params.get("component_action_map", [])
0030         # site mapper
0031         self.site_mapper = self.tbIF.get_site_mapper()
0032         # verbose logging
0033         self.verbose = self.params.get("verbose", False)
0034 
0035     def process(self, msg_obj):
0036         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0037         # start
0038         # tmp_log.info('start')
0039         # tmp_log.debug('sub_id={0} ; msg_id={1}'.format(msg_obj.sub_id, msg_obj.msg_id))
0040         # parse yaml
0041         try:
0042             message_dict = yaml.safe_load(msg_obj.data)
0043         except Exception as e:
0044             err_str = f"failed to parse message yaml {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0045             tmp_log.error(err_str)
0046             raise
0047         # run
0048         try:
0049             to_continue = True
0050             dsn = "UNKNOWN"
0051             # check event type
0052             if not isinstance(message_dict, dict):
0053                 err_str = f"skip due to invalid message format:{type(message_dict).__name__}. msg:{str(message_dict)}"
0054                 tmp_log.warning(err_str)
0055                 return
0056             event_type = message_dict["event_type"]
0057             message_ids = f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}"
0058             if event_type in ["datasetlock_ok"]:
0059                 self.process_dataset_callback(event_type, message_ids, message_dict, tmp_log)
0060             elif self.activities_with_file_callback and event_type in ["transfer-done"]:
0061                 self.process_file_callback(event_type, message_ids, message_dict, tmp_log)
0062             else:
0063                 if self.verbose:
0064                     tmp_log.debug(f"skip event_type={event_type}")
0065             # trigger component actions
0066             if self.component_action_map:
0067                 self.trigger_component_action(event_type, message_ids, message_dict, tmp_log)
0068             # end
0069         except Exception as e:
0070             err_str = f"failed to run, skipped. {e.__class__.__name__} : {e}\n{traceback.format_exc()}"
0071             tmp_log.error(err_str)
0072             raise
0073 
0074     def process_dataset_callback(self, event_type: str, message_ids: str, message_dict: dict, tmp_log: LogWrapper.LogWrapper) -> None:
0075         """
0076         Process a dataset callback
0077 
0078         Args:
0079             event_type: message event type
0080             message_ids: subscription and message IDs
0081             message_dict: message dictionary
0082             tmp_log: logger instance
0083         """
0084         message_payload = message_dict["payload"]
0085         # only for _dis or _sub
0086         dsn = message_payload["name"]
0087         if (re.search(r"_dis\d+$", dsn) is None) and (re.search(r"_sub\d+$", dsn) is None):
0088             return
0089         tmp_log.debug(message_ids)
0090         tmp_log.debug(f"{event_type} start")
0091         # take action
0092         scope = message_payload["scope"]
0093         site = message_payload["rse"]
0094         tmp_log.debug(f"{dsn} site={site} type={event_type}")
0095         thr = DDMHandler(task_buffer=self.tbIF, vuid=None, site=site, dataset=dsn, scope=scope)
0096         # just call run rather than start+join, to run it in main thread instead of spawning new thread
0097         thr.run()
0098         del thr
0099         tmp_log.debug(f"done {dsn}")
0100         return
0101 
0102     def process_file_callback(self, event_type: str, message_ids: str, message_dict: dict, tmp_log: LogWrapper.LogWrapper) -> None:
0103         """
0104         Process a file callback
0105 
0106         Args:
0107             event_type: message event type
0108             message_ids: subscription and message IDs
0109             message_dict: message dictionary
0110             tmp_log: logger instance
0111         """
0112         message_payload = message_dict["payload"]
0113         # only for activities with file callback
0114         activity = message_payload["activity"]
0115         if activity not in self.activities_with_file_callback:
0116             return
0117         # update file status and get corresponding PandaIDs
0118         filename = message_payload["name"]
0119         endpoint = message_payload["dst-rse"]
0120         tmp_log.debug(message_ids)
0121         tmp_log.debug(f"{event_type} start for lfn={filename} activity={activity}")
0122         sites = self.site_mapper.get_sites_for_endpoint(endpoint, "input")
0123         panda_ids = self.tbIF.update_input_files_at_sites_and_get_panda_ids(filename, sites)
0124         jobs = self.tbIF.peekJobs(panda_ids, fromActive=False, fromArchived=False, fromWaiting=False)
0125         # activate jobs
0126         self.tbIF.activateJobs(jobs)
0127         tmp_log.debug(f"done")
0128         return
0129 
0130     def trigger_component_action(self, event_type: str, message_ids: str, message_dict: dict, tmp_log: LogWrapper.LogWrapper) -> None:
0131         """
0132         Trigger component action based on the event type
0133         Args:
0134             event_type: message event type
0135             message_ids: subscription and message IDs
0136             message_dict: message dictionary
0137             tmp_log: logger instance
0138         """
0139         message_payload = message_dict["payload"]
0140         for action_item in self.component_action_map:
0141             # check event type
0142             if action_item["event_type"] != event_type:
0143                 continue
0144             # check criteria
0145             criteria_matched = True
0146             for key, value in action_item.get("criteria", {}).items():
0147                 dict_value = message_payload.get(key)
0148                 if dict_value is None:
0149                     criteria_matched = False
0150                     break
0151                 if dict_value != value and not re.match(value, str(dict_value)):
0152                     criteria_matched = False
0153                     break
0154             if not criteria_matched:
0155                 continue
0156             component_name = action_item["component"]
0157             to_id = action_item["to_id"]
0158             # extract task ID based on to_id
0159             jedi_task_ids = None
0160             if to_id == "from_input_dataset":
0161                 dataset_name = message_payload["name"].split(":")[-1]
0162                 jedi_task_ids = self.tbIF.get_task_ids_with_dataset_attributes({"datasetName": dataset_name, "type": "input"})
0163             else:
0164                 tmp_log.warning(f"unknown to_id={to_id} for action_item={action_item} ; skipped")
0165                 continue
0166             if jedi_task_ids is None:
0167                 tmp_log.warning(f"failed to extract jediTaskID for action_item={action_item} ; skipped")
0168                 continue
0169             if not jedi_task_ids:
0170                 tmp_log.debug(f"no jediTaskID found for action_item={action_item} ; skipped")
0171                 continue
0172             # loop over task IDs
0173             for jedi_task_id in jedi_task_ids:
0174                 # release task just in case
0175                 self.tbIF.release_task_on_hold(jedi_task_id)
0176                 # push trigger message
0177                 push_ret = self.tbIF.push_task_trigger_message(component_name, jedi_task_ids)
0178                 if push_ret:
0179                     tmp_log.debug(f"pushed trigger message to {component_name} for jediTaskID={jedi_task_ids}")
0180                 else:
0181                     tmp_log.warning(f"failed to push trigger to {component_name} for jediTaskID={jedi_task_ids}")
0182         return