Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-02-23 09:22:36

0001 from dataclasses import dataclass
0002 from typing import Tuple, Dict, Any, List
0003 
0004 import numpy as np
0005 from optuna import Trial, create_study, get_all_study_summaries, load_study
0006 from optuna.pruners import MedianPruner
0007 from optuna.samplers import TPESampler
0008 from optuna.trial import TrialState
0009 
0010 from core.constants import LEARNING_RATE, BATCH_SIZE_PER_REPLICA, ACTIVATION, OUT_ACTIVATION, \
0011     OPTIMIZER_TYPE, KERNEL_INITIALIZER, BIAS_INITIALIZER, N_TRIALS, LATENT_DIM, \
0012     INTERMEDIATE_DIMS, MAX_HIDDEN_LAYER_DIM, GLOBAL_CHECKPOINT_DIR
0013 from core.model import VAEHandler
0014 from utils.preprocess import preprocess
0015 
0016 
0017 @dataclass
0018 class HyperparameterTuner:
0019     """Tuner which looks for the best hyperparameters of a Variational Autoencoder specified in model.py.
0020 
0021     Currently, supported hyperparameters are: dimension of latent space, number of hidden layers, learning rate,
0022     activation function, activation function after the final layer, optimizer type, kernel initializer,
0023     bias initializer, batch size.
0024 
0025     Attributes:
0026         _discrete_parameters: A dictionary of hyperparameters taking discrete values in the range [low, high].
0027         _continuous_parameters: A dictionary of hyperparameters taking continuous values in the range [low, high].
0028         _categorical_parameters: A dictionary of hyperparameters taking values specified by the list of them.
0029         _storage: A string representing URL to a database required for a distributed training
0030         _study_name: A string, a name of study.
0031 
0032     """
0033     _discrete_parameters: Dict[str, Tuple[int, int]]
0034     _continuous_parameters: Dict[str, Tuple[float, float]]
0035     _categorical_parameters: Dict[str, List[Any]]
0036     _storage: str = None
0037     _study_name: str = None
0038 
0039     def _check_hyperparameters(self):
0040         available_hyperparameters = ["latent_dim", "nb_hidden_layers", "learning_rate", "activation", "out_activation",
0041                                      "optimizer_type", "kernel_initializer", "bias_initializer",
0042                                      "batch_size_per_replica"]
0043         hyperparameters_to_be_optimized = list(self._discrete_parameters.keys()) + list(
0044             self._continuous_parameters.keys()) + list(self._categorical_parameters.keys())
0045         for hyperparameter_name in hyperparameters_to_be_optimized:
0046             if hyperparameter_name not in available_hyperparameters:
0047                 raise Exception(f"Unknown hyperparameter: {hyperparameter_name}")
0048 
0049     def __post_init__(self):
0050         self._check_hyperparameters()
0051         self._energies_train, self._cond_e_train, self._cond_angle_train, self._cond_geo_train = preprocess()
0052 
0053         if self._storage is not None and self._study_name is not None:
0054             # Parallel optimization
0055             study_summaries = get_all_study_summaries(self._storage)
0056             if any(self._study_name == study_summary.study_name for study_summary in study_summaries):
0057                 # The study is already created in the database. Load it.
0058                 self._study = load_study(self._study_name, self._storage)
0059             else:
0060                 # The study does not exist in the database. Create a new one.
0061                 self._study = create_study(storage=self._storage, sampler=TPESampler(), pruner=MedianPruner(),
0062                                            study_name=self._study_name, direction="minimize")
0063         else:
0064             # Single optimization
0065             self._study = create_study(sampler=TPESampler(), pruner=MedianPruner(), direction="minimize")
0066 
0067     def _create_model_handler(self, trial: Trial) -> VAEHandler:
0068         """For a given trail builds the model.
0069 
0070         Optuna suggests parameters like dimensions of particular layers of the model, learning rate, optimizer, etc.
0071 
0072         Args:
0073             trial: Optuna's trial
0074 
0075         Returns:
0076             Variational Autoencoder (VAE)
0077         """
0078 
0079         # Discrete parameters
0080         if "latent_dim" in self._discrete_parameters.keys():
0081             latent_dim = trial.suggest_int(name="latent_dim",
0082                                            low=self._discrete_parameters["latent_dim"][0],
0083                                            high=self._discrete_parameters["latent_dim"][1])
0084         else:
0085             latent_dim = LATENT_DIM
0086 
0087         if "nb_hidden_layers" in self._discrete_parameters.keys():
0088             nb_hidden_layers = trial.suggest_int(name="nb_hidden_layers",
0089                                                  low=self._discrete_parameters["nb_hidden_layers"][0],
0090                                                  high=self._discrete_parameters["nb_hidden_layers"][1])
0091 
0092             all_possible = np.arange(start=latent_dim + 5, stop=MAX_HIDDEN_LAYER_DIM)
0093             chunks = np.array_split(all_possible, nb_hidden_layers)
0094             ranges = [(chunk[0], chunk[-1]) for chunk in chunks]
0095             ranges = reversed(ranges)
0096 
0097             # Cast from np.int to int allows to become JSON serializable.
0098             intermediate_dims = [trial.suggest_int(name=f"intermediate_dim_{i}", low=int(low), high=int(high)) for
0099                                  i, (low, high)
0100                                  in enumerate(ranges)]
0101         else:
0102             intermediate_dims = INTERMEDIATE_DIMS
0103 
0104         if "batch_size_per_replica" in self._discrete_parameters.keys():
0105             batch_size_per_replica = trial.suggest_int(name="batch_size_per_replica",
0106                                                        low=self._discrete_parameters["batch_size_per_replica"][0],
0107                                                        high=self._discrete_parameters["batch_size_per_replica"][1])
0108         else:
0109             batch_size_per_replica = BATCH_SIZE_PER_REPLICA
0110 
0111         # Continuous parameters
0112         if "learning_rate" in self._continuous_parameters.keys():
0113             learning_rate = trial.suggest_float(name="learning_rate",
0114                                                 low=self._continuous_parameters["learning_rate"][0],
0115                                                 high=self._continuous_parameters["learning_rate"][1])
0116         else:
0117             learning_rate = LEARNING_RATE
0118 
0119         # Categorical parameters
0120         if "activation" in self._categorical_parameters.keys():
0121             activation = trial.suggest_categorical(name="activation",
0122                                                    choices=self._categorical_parameters["activation"])
0123         else:
0124             activation = ACTIVATION
0125 
0126         if "out_activation" in self._categorical_parameters.keys():
0127             out_activation = trial.suggest_categorical(name="out_activation",
0128                                                        choices=self._categorical_parameters["out_activation"])
0129         else:
0130             out_activation = OUT_ACTIVATION
0131 
0132         if "optimizer_type" in self._categorical_parameters.keys():
0133             optimizer_type = trial.suggest_categorical(name="optimizer_type",
0134                                                        choices=self._categorical_parameters["optimizer_type"])
0135         else:
0136             optimizer_type = OPTIMIZER_TYPE
0137 
0138         if "kernel_initializer" in self._categorical_parameters.keys():
0139             kernel_initializer = trial.suggest_categorical(name="kernel_initializer",
0140                                                            choices=self._categorical_parameters["kernel_initializer"])
0141         else:
0142             kernel_initializer = KERNEL_INITIALIZER
0143 
0144         if "bias_initializer" in self._categorical_parameters.keys():
0145             bias_initializer = trial.suggest_categorical(name="bias_initializer",
0146                                                          choices=self._categorical_parameters["bias_initializer"])
0147         else:
0148             bias_initializer = BIAS_INITIALIZER
0149 
0150         checkpoint_dir = f"{GLOBAL_CHECKPOINT_DIR}/{self._study_name}/trial_{trial.number:03d}"
0151 
0152         return VAEHandler(_wandb_project_name=self._study_name,
0153                           _wandb_tags=["hyperparameter tuning", f"trial {trial.number}"],
0154                           _batch_size_per_replica=batch_size_per_replica,
0155                           _intermediate_dims=intermediate_dims,
0156                           latent_dim=latent_dim,
0157                           _learning_rate=learning_rate,
0158                           _activation=activation,
0159                           _out_activation=out_activation,
0160                           _optimizer_type=optimizer_type,
0161                           _kernel_initializer=kernel_initializer,
0162                           _bias_initializer=bias_initializer,
0163                           _checkpoint_dir=checkpoint_dir,
0164                           _early_stop=True,
0165                           _save_model_every_epoch=False,
0166                           _save_best_model=True,
0167                           )
0168 
0169     def _objective(self, trial: Trial) -> float:
0170         """For a given trial trains the model and returns an average validation loss.
0171 
0172         Args:
0173             trial: Optuna's trial
0174 
0175         Returns: One float numer which is a validation loss. It can be either calculated as an average of k trainings
0176         performed in cross validation mode or is one number obtained from  validation on unseen before, some fraction
0177         of the dataset.
0178         """
0179 
0180         # Generate the trial model.
0181         model_handler = self._create_model_handler(trial)
0182 
0183         # Train the model.
0184         verbose = True
0185         histories = model_handler.train(self._energies_train, self._cond_e_train, self._cond_angle_train,
0186                                         self._cond_geo_train, verbose)
0187 
0188         # Return validation loss (currently it is treated as an objective goal). Notice that we take into account the
0189         # best model according to the validation loss.
0190         final_validation_losses = [np.min(history.history["val_loss"]) for history in histories]
0191         avg_validation_loss = np.mean(final_validation_losses).item()
0192         return avg_validation_loss
0193 
0194     def tune(self) -> None:
0195         """Main tuning function.
0196 
0197         Based on a given study, tunes the model and prints detailed information about the best trial (value of the
0198         objective function and adjusted parameters).
0199         """
0200 
0201         self._study.optimize(func=self._objective, n_trials=N_TRIALS, gc_after_trial=True)
0202         pruned_trials = self._study.get_trials(deepcopy=False, states=(TrialState.PRUNED,))
0203         complete_trials = self._study.get_trials(deepcopy=False, states=(TrialState.COMPLETE,))
0204         print("Study statistics: ")
0205         print("  Number of finished trials: ", len(self._study.trials))
0206         print("  Number of pruned trials: ", len(pruned_trials))
0207         print("  Number of complete trials: ", len(complete_trials))
0208 
0209         print("Best trial:")
0210         trial = self._study.best_trial
0211 
0212         print("  Value: ", trial.value)
0213 
0214         print("  Params: ")
0215         for key, value in trial.params.items():
0216             print(f"    {key}: {value}")