File indexing completed on 2026-04-10 08:38:58
0001 import json
0002
0003 from pandacommon.pandalogger import logger_utils
0004
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006
0007 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0008
0009
0010
0011 class TapeCarouselMsgProcPlugin(BaseMsgProcPlugin):
0012 def process(self, msg_obj, decoded_data=None):
0013 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0014
0015 tmp_log.info("start")
0016 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0017
0018 if decoded_data is None:
0019
0020 try:
0021 msg_dict = json.loads(msg_obj.data)
0022 except Exception as e:
0023 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0024 tmp_log.error(err_str)
0025 raise
0026 else:
0027 msg_dict = decoded_data
0028
0029 try:
0030 jeditaskid = int(msg_dict["workload_id"])
0031
0032 msg_type = msg_dict["msg_type"]
0033 if msg_type == "file_stagein":
0034 target_list = msg_dict["files"]
0035 elif msg_type == "collection_stagein":
0036 target_list = msg_dict["collections"]
0037 elif msg_type == "work_stagein":
0038 pass
0039 else:
0040 raise ValueError(f"invalid msg_type value: {msg_type}")
0041
0042 relation_type = msg_dict.get("relation_type")
0043 except Exception as e:
0044 err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0045 tmp_log.error(err_str)
0046 raise
0047
0048 try:
0049
0050 to_proceed = False
0051
0052 if msg_type in ["file_stagein", "collection_stagein"] and relation_type in ["output"]:
0053 to_proceed = True
0054
0055 if to_proceed:
0056
0057 scope_name_dict_map = {}
0058
0059 for target in target_list:
0060 name = target["name"]
0061 scope = target["scope"]
0062 datasetid = target.get("external_coll_id", None)
0063 fileid = target.get("external_content_id", None)
0064 if (msg_type == "file_stagein" and target["status"] in ["Available"]) or (
0065 msg_type == "collection_stagein" and target["status"] in ["Closed"]
0066 ):
0067 scope_name_dict_map.setdefault(scope, {})
0068 scope_name_dict_map[scope][name] = (datasetid, fileid)
0069 else:
0070
0071 tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, msg_type={msg_type}, status={target['status']}, did nothing for bad target")
0072 pass
0073
0074 for scope, name_dict in scope_name_dict_map.items():
0075
0076 if msg_type == "file_stagein":
0077 tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, update about files...")
0078 res = self.tbIF.updateInputFilesStaged_JEDI(jeditaskid, scope, name_dict, by="iDDS")
0079 if res is None:
0080
0081 err_str = f"jeditaskid={jeditaskid}, scope={scope}, failed to update files"
0082 raise RuntimeError(err_str)
0083 tmp_log.info(f"jeditaskid={jeditaskid}, scope={scope}, updated {res} files")
0084 elif msg_type == "collection_stagein":
0085 tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, update about datasets...")
0086 res = self.tbIF.updateInputDatasetsStaged_JEDI(jeditaskid, scope, name_dict, by="iDDS")
0087 if res is None:
0088
0089 err_str = f"jeditaskid={jeditaskid}, scope={scope}, failed to update datasets"
0090 raise RuntimeError(err_str)
0091 tmp_log.info(f"jeditaskid={jeditaskid}, scope={scope}, updated {res} datasets")
0092
0093 if res > 0 or msg_type == "collection_stagein":
0094 tmp_s, task_spec = self.tbIF.getTaskWithID_JEDI(jeditaskid)
0095 if tmp_s and task_spec.is_msg_driven():
0096 push_ret = self.tbIF.push_task_trigger_message("jedi_contents_feeder", jeditaskid, task_spec=task_spec)
0097 if push_ret:
0098 tmp_log.debug(f"pushed trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0099 else:
0100 tmp_log.warning(f"failed to push trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0101
0102 if res == len(target_list):
0103 tmp_log.debug(f"jeditaskid={jeditaskid}, scope={scope}, all OK")
0104 elif res < len(target_list):
0105 tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, only {res} out of {len(target_list)} done...")
0106 elif res > len(target_list):
0107 tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, strangely, {res} out of {len(target_list)} done...")
0108 else:
0109 tmp_log.warning(f"jeditaskid={jeditaskid}, scope={scope}, something unwanted happened...")
0110 else:
0111
0112 tmp_log.debug(f"jeditaskid={jeditaskid}, msg_type={msg_type}, relation_type={relation_type}, nothing done")
0113 except Exception as e:
0114 err_str = f"failed to process the message, skipped. {e.__class__.__name__} : {e}"
0115 tmp_log.error(err_str)
0116 raise
0117
0118 tmp_log.info("done")