Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 
0003 from pandacommon.pandalogger import logger_utils
0004 
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006 
0007 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0008 
0009 
0010 # processing message processing plugin
0011 class ProcessingMsgProcPlugin(BaseMsgProcPlugin):
0012     def process(self, msg_obj, decoded_data=None):
0013         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0014         # start
0015         tmp_log.info("start")
0016         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0017         # parse
0018         if decoded_data is None:
0019             # json decode
0020             try:
0021                 msg_dict = json.loads(msg_obj.data)
0022             except Exception as e:
0023                 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0024                 tmp_log.error(err_str)
0025                 raise
0026         else:
0027             msg_dict = decoded_data
0028         # sanity check
0029         try:
0030             jeditaskid = int(msg_dict["workload_id"])
0031             # message type
0032             msg_type = msg_dict["msg_type"]
0033             if msg_type == "file_processing":
0034                 target_list = msg_dict["files"]
0035             elif msg_type == "collection_processing":
0036                 target_list = msg_dict["collections"]
0037             elif msg_type == "work_processing":
0038                 pass
0039             else:
0040                 raise ValueError(f"invalid msg_type value: {msg_type}")
0041             # relation type
0042             relation_type = msg_dict.get("relation_type")
0043         except Exception as e:
0044             err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0045             tmp_log.error(err_str)
0046             raise
0047         # run
0048         try:
0049             # initialize to_proceed
0050             to_proceed = False
0051             # type filters
0052             if msg_type in ["file_processing", "collection_processing"] and relation_type in ["input"]:
0053                 to_proceed = True
0054             # whether to proceed the targets
0055             if to_proceed:
0056                 # initialize
0057                 scope_name_dict_map = {}
0058                 missing_files_dict = {}
0059                 # loop over targets
0060                 for target in target_list:
0061                     name = target["name"]
0062                     scope = target["scope"]
0063                     datasetid = target.get("external_coll_id", None)
0064                     fileid = target.get("external_content_id", None)
0065                     if (msg_type == "file_processing" and target["status"] in ["Available"]) or (
0066                         msg_type == "collection_processing" and target["status"] in ["Closed"]
0067                     ):
0068                         scope_name_dict_map.setdefault(scope, {})
0069                         scope_name_dict_map[scope][name] = (datasetid, fileid)
0070                     elif msg_type == "file_processing" and target["status"] in ["Missing"]:
0071                         # missing files
0072                         missing_files_dict[name] = (datasetid, fileid)
0073                     else:
0074                         # got target in bad attributes, do nothing
0075                         tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, msg_type={msg_type}, status={target['status']}, did nothing for bad target")
0076                         pass
0077                 # run by each scope
0078                 for scope, name_dict in scope_name_dict_map.items():
0079                     # about files or datasets in good status
0080                     if msg_type == "file_processing":
0081                         tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, update about files...")
0082                         res = self.tbIF.updateInputFilesStaged_JEDI(jeditaskid, scope, name_dict, by="iDDS")
0083                         if res is None:
0084                             # got error and rollback in dbproxy
0085                             err_str = f"jeditaskid={jeditaskid}, scope={scope}, failed to update files"
0086                             raise RuntimeError(err_str)
0087                         tmp_log.info(f"jeditaskid={jeditaskid}, scope={scope}, updated {res} files")
0088                     elif msg_type == "collection_processing":
0089                         tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, update about datasets...")
0090                         res = self.tbIF.updateInputDatasetsStaged_JEDI(jeditaskid, scope, name_dict, by="iDDS")
0091                         if res is None:
0092                             # got error and rollback in dbproxy
0093                             err_str = f"jeditaskid={jeditaskid}, scope={scope}, failed to update datasets"
0094                             raise RuntimeError(err_str)
0095                         tmp_log.info(f"jeditaskid={jeditaskid}, scope={scope}, updated {res} datasets")
0096                     # send message to contents feeder if new files are staged
0097                     if res > 0 or msg_type == "collection_processing":
0098                         tmp_s, task_spec = self.tbIF.getTaskWithID_JEDI(jeditaskid)
0099                         if tmp_s and task_spec.is_msg_driven():
0100                             push_ret = self.tbIF.push_task_trigger_message("jedi_contents_feeder", jeditaskid, task_spec=task_spec)
0101                             if push_ret:
0102                                 tmp_log.debug(f"pushed trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0103                             else:
0104                                 tmp_log.warning(f"failed to push trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0105                     # check if all ok
0106                     if res == len(target_list):
0107                         tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, all OK")
0108                     elif res < len(target_list):
0109                         tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, only {res} out of {len(target_list)} done...")
0110                     elif res > len(target_list):
0111                         tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, strangely, {res} out of {len(target_list)} done...")
0112                     else:
0113                         tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, something unwanted happened...")
0114                 # handle missing files
0115                 n_missing = len(missing_files_dict)
0116                 if n_missing > 0:
0117                     res = self.tbIF.setMissingFilesAboutIdds_JEDI(jeditaskid=jeditaskid, filenames_dict=missing_files_dict)
0118                     if res == n_missing:
0119                         tmp_log.debug(f"jeditaskid={jeditaskid}, marked all {n_missing} files missing")
0120                     elif res < n_missing:
0121                         tmp_log.warning(f"jeditaskid={jeditaskid}, only {res} out of {n_missing} files marked missing...")
0122                     elif res > n_missing:
0123                         tmp_log.warning(f"jeditaskid={jeditaskid}, strangely, {res} out of {n_missing} files marked missing...")
0124                     else:
0125                         tmp_log.warning(f"jeditaskid={jeditaskid}, res={res}, something unwanted happened about missing files...")
0126             else:
0127                 # do nothing
0128                 tmp_log.debug(f"jeditaskid={jeditaskid}, msg_type={msg_type}, relation_type={relation_type}, nothing done")
0129         except Exception as e:
0130             err_str = f"failed to process the message, skipped. {e.__class__.__name__} : {e}"
0131             tmp_log.error(err_str)
0132             raise
0133         # done
0134         tmp_log.info("done")