File indexing completed on 2026-04-10 08:38:58
0001 import json
0002
0003 from pandacommon.pandalogger import logger_utils
0004
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006
0007 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0008
0009
0010
0011 class HPOMsgProcPlugin(BaseMsgProcPlugin):
0012 def process(self, msg_obj, decoded_data=None):
0013 tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0014
0015 tmp_log.info("start")
0016 tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0017
0018 if decoded_data is None:
0019
0020 try:
0021 msg_dict = json.loads(msg_obj.data)
0022 except Exception as e:
0023 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0024 tmp_log.error(err_str)
0025 raise
0026 else:
0027 msg_dict = decoded_data
0028
0029 try:
0030 msg_type = msg_dict["msg_type"]
0031 jeditaskid = int(msg_dict["workload_id"])
0032 if msg_type == "file_hyperparameteropt":
0033 target_list = msg_dict["files"]
0034 elif msg_type == "collection_hyperparameteropt":
0035
0036 pass
0037 elif msg_type == "work_hyperparameteropt":
0038 pass
0039 else:
0040 raise ValueError(f"invalid msg_type value: {msg_type}")
0041 except Exception as e:
0042 err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0043 tmp_log.error(err_str)
0044 raise
0045
0046 if msg_type == "file_hyperparameteropt":
0047
0048 try:
0049
0050 event_id_list = []
0051 for target in target_list:
0052 if target["status"] != "New":
0053 continue
0054 model_id = None
0055 try:
0056 path = json.loads(target["path"])
0057 if isinstance(path[0], (list, tuple)):
0058 model_id = path[0][0]
0059 except Exception:
0060 pass
0061 event_id_list.append((target["name"], model_id))
0062 if event_id_list:
0063 n_events = len(event_id_list)
0064
0065 res = self.tbIF.insertHpoEventAboutIdds_JEDI(jedi_task_id=jeditaskid, event_id_list=event_id_list)
0066
0067 if res:
0068 tmp_log.debug(f"jeditaskid={jeditaskid}, inserted {n_events} events: {event_id_list}")
0069 else:
0070 tmp_log.warning(f"jeditaskid={jeditaskid}, failed to insert events: {event_id_list}")
0071 except Exception as e:
0072 err_str = f"failed to parse message object, skipped. {e.__class__.__name__} : {e}"
0073 tmp_log.error(err_str)
0074 raise
0075 elif msg_type == "collection_hyperparameteropt":
0076
0077 try:
0078
0079 retVal, retStr = self.tbIF.sendCommandTaskPanda(jeditaskid, "iDDS. HPO task finished", True, "finish", comQualifier="soft")
0080
0081 if retVal:
0082 tmp_log.debug(f"jeditaskid={jeditaskid}, finished the task")
0083 else:
0084 tmp_log.warning(f"jeditaskid={jeditaskid}, failed finish the task: {retStr}")
0085 except Exception as e:
0086 err_str = f"failed to parse message object, skipped. {e.__class__.__name__} : {e}"
0087 tmp_log.error(err_str)
0088 raise
0089 else:
0090
0091 tmp_log.debug(f"jeditaskid={jeditaskid}, msg_type={msg_type}, did nothing")
0092
0093 tmp_log.info("done")