Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import json
0002 
0003 from pandacommon.pandalogger import logger_utils
0004 
0005 from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
0006 
0007 base_logger = logger_utils.setup_logger(__name__.split(".")[-1])
0008 
0009 
0010 # Hyper-Parameter-Optimization message processing plugin
0011 class HPOMsgProcPlugin(BaseMsgProcPlugin):
0012     def process(self, msg_obj, decoded_data=None):
0013         tmp_log = logger_utils.make_logger(base_logger, token=self.get_pid(), method_name="process")
0014         # start
0015         tmp_log.info("start")
0016         tmp_log.debug(f"sub_id={msg_obj.sub_id} ; msg_id={msg_obj.msg_id}")
0017         # parse
0018         if decoded_data is None:
0019             # json decode
0020             try:
0021                 msg_dict = json.loads(msg_obj.data)
0022             except Exception as e:
0023                 err_str = f"failed to parse message json {msg_obj.data} , skipped. {e.__class__.__name__} : {e}"
0024                 tmp_log.error(err_str)
0025                 raise
0026         else:
0027             msg_dict = decoded_data
0028         # sanity check
0029         try:
0030             msg_type = msg_dict["msg_type"]
0031             jeditaskid = int(msg_dict["workload_id"])
0032             if msg_type == "file_hyperparameteropt":
0033                 target_list = msg_dict["files"]
0034             elif msg_type == "collection_hyperparameteropt":
0035                 # to finish the task
0036                 pass
0037             elif msg_type == "work_hyperparameteropt":
0038                 pass
0039             else:
0040                 raise ValueError(f"invalid msg_type value: {msg_type}")
0041         except Exception as e:
0042             err_str = f"failed to parse message object dict {msg_dict} , skipped. {e.__class__.__name__} : {e}"
0043             tmp_log.error(err_str)
0044             raise
0045         # run
0046         if msg_type == "file_hyperparameteropt":
0047             # insert HPO events
0048             try:
0049                 # event ids from the targets
0050                 event_id_list = []
0051                 for target in target_list:
0052                     if target["status"] != "New":
0053                         continue
0054                     model_id = None
0055                     try:
0056                         path = json.loads(target["path"])
0057                         if isinstance(path[0], (list, tuple)):
0058                             model_id = path[0][0]
0059                     except Exception:
0060                         pass
0061                     event_id_list.append((target["name"], model_id))
0062                 if event_id_list:
0063                     n_events = len(event_id_list)
0064                     # insert events
0065                     res = self.tbIF.insertHpoEventAboutIdds_JEDI(jedi_task_id=jeditaskid, event_id_list=event_id_list)
0066                     # check if ok
0067                     if res:
0068                         tmp_log.debug(f"jeditaskid={jeditaskid}, inserted {n_events} events: {event_id_list}")
0069                     else:
0070                         tmp_log.warning(f"jeditaskid={jeditaskid}, failed to insert events: {event_id_list}")
0071             except Exception as e:
0072                 err_str = f"failed to parse message object, skipped. {e.__class__.__name__} : {e}"
0073                 tmp_log.error(err_str)
0074                 raise
0075         elif msg_type == "collection_hyperparameteropt":
0076             # finish the task
0077             try:
0078                 # send finish command
0079                 retVal, retStr = self.tbIF.sendCommandTaskPanda(jeditaskid, "iDDS. HPO task finished", True, "finish", comQualifier="soft")
0080                 # check if ok
0081                 if retVal:
0082                     tmp_log.debug(f"jeditaskid={jeditaskid}, finished the task")
0083                 else:
0084                     tmp_log.warning(f"jeditaskid={jeditaskid}, failed finish the task: {retStr}")
0085             except Exception as e:
0086                 err_str = f"failed to parse message object, skipped. {e.__class__.__name__} : {e}"
0087                 tmp_log.error(err_str)
0088                 raise
0089         else:
0090             # do nothing
0091             tmp_log.debug(f"jeditaskid={jeditaskid}, msg_type={msg_type}, did nothing")
0092         # done
0093         tmp_log.info("done")