File indexing completed on 2026-04-10 08:38:58
0001 import json
0002
0003 from pandacommon.pandalogger import logger_utils
0004
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006
0007 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0008
0009
0010
0011 class ProcessingMsgProcPlugin(BaseMsgProcPlugin):
0012 def process(self, msg_obj, decoded_data=None):
0013 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0014
0015 tmp_log.info("start")
0016 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0017
0018 if decoded_data is None:
0019
0020 try:
0021 msg_dict = json.loads(msg_obj.data)
0022 except Exception as e:
0023 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0024 tmp_log.error(err_str)
0025 raise
0026 else:
0027 msg_dict = decoded_data
0028
0029 try:
0030 jeditaskid = int(msg_dict["workload_id"])
0031
0032 msg_type = msg_dict["msg_type"]
0033 if msg_type == "file_processing":
0034 target_list = msg_dict["files"]
0035 elif msg_type == "collection_processing":
0036 target_list = msg_dict["collections"]
0037 elif msg_type == "work_processing":
0038 pass
0039 else:
0040 raise ValueError(f"invalid msg_type value: {msg_type}")
0041
0042 relation_type = msg_dict.get("relation_type")
0043 except Exception as e:
0044 err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0045 tmp_log.error(err_str)
0046 raise
0047
0048 try:
0049
0050 to_proceed = False
0051
0052 if msg_type in ["file_processing", "collection_processing"] and relation_type in ["input"]:
0053 to_proceed = True
0054
0055 if to_proceed:
0056
0057 scope_name_dict_map = {}
0058 missing_files_dict = {}
0059
0060 for target in target_list:
0061 name = target["name"]
0062 scope = target["scope"]
0063 datasetid = target.get("external_coll_id", None)
0064 fileid = target.get("external_content_id", None)
0065 if (msg_type == "file_processing" and target["status"] in ["Available"]) or (
0066 msg_type == "collection_processing" and target["status"] in ["Closed"]
0067 ):
0068 scope_name_dict_map.setdefault(scope, {})
0069 scope_name_dict_map[scope][name] = (datasetid, fileid)
0070 elif msg_type == "file_processing" and target["status"] in ["Missing"]:
0071
0072 missing_files_dict[name] = (datasetid, fileid)
0073 else:
0074
0075 tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, msg_type={msg_type}, status={target['status']}, did nothing for bad target")
0076 pass
0077
0078 for scope, name_dict in scope_name_dict_map.items():
0079
0080 if msg_type == "file_processing":
0081 tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, update about files...")
0082 res = self.tbIF.updateInputFilesStaged_JEDI(jeditaskid, scope, name_dict, by="iDDS")
0083 if res is None:
0084
0085 err_str = f"jeditaskid={jeditaskid}, scope={scope}, failed to update files"
0086 raise RuntimeError(err_str)
0087 tmp_log.info(f"jeditaskid={jeditaskid}, scope={scope}, updated {res} files")
0088 elif msg_type == "collection_processing":
0089 tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, update about datasets...")
0090 res = self.tbIF.updateInputDatasetsStaged_JEDI(jeditaskid, scope, name_dict, by="iDDS")
0091 if res is None:
0092
0093 err_str = f"jeditaskid={jeditaskid}, scope={scope}, failed to update datasets"
0094 raise RuntimeError(err_str)
0095 tmp_log.info(f"jeditaskid={jeditaskid}, scope={scope}, updated {res} datasets")
0096
0097 if res > 0 or msg_type == "collection_processing":
0098 tmp_s, task_spec = self.tbIF.getTaskWithID_JEDI(jeditaskid)
0099 if tmp_s and task_spec.is_msg_driven():
0100 push_ret = self.tbIF.push_task_trigger_message("jedi_contents_feeder", jeditaskid, task_spec=task_spec)
0101 if push_ret:
0102 tmp_log.debug(f"pushed trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0103 else:
0104 tmp_log.warning(f"failed to push trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0105
0106 if res == len(target_list):
0107 tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, all OK")
0108 elif res < len(target_list):
0109 tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, only {res} out of {len(target_list)} done...")
0110 elif res > len(target_list):
0111 tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, strangely, {res} out of {len(target_list)} done...")
0112 else:
0113 tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, something unwanted happened...")
0114
0115 n_missing = len(missing_files_dict)
0116 if n_missing > 0:
0117 res = self.tbIF.setMissingFilesAboutIdds_JEDI(jeditaskid=jeditaskid, filenames_dict=missing_files_dict)
0118 if res == n_missing:
0119 tmp_log.debug(f"jeditaskid={jeditaskid}, marked all {n_missing} files missing")
0120 elif res < n_missing:
0121 tmp_log.warning(f"jeditaskid={jeditaskid}, only {res} out of {n_missing} files marked missing...")
0122 elif res > n_missing:
0123 tmp_log.warning(f"jeditaskid={jeditaskid}, strangely, {res} out of {n_missing} files marked missing...")
0124 else:
0125 tmp_log.warning(f"jeditaskid={jeditaskid}, res={res}, something unwanted happened about missing files...")
0126 else:
0127
0128 tmp_log.debug(f"jeditaskid={jeditaskid}, msg_type={msg_type}, relation_type={relation_type}, nothing done")
0129 except Exception as e:
0130 err_str = f"failed to process the message, skipped. {e.__class__.__name__} : {e}"
0131 tmp_log.error(err_str)
0132 raise
0133
0134 tmp_log.info("done")