import json
import os
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Any
import numpy as np
import pandas as pd
from rdkit import Chem
from rdkit.Chem import Draw
from .model import QSPRModel
from ..data.tables.qspr import QSPRDataset
from ..utils.serialization import JSONSerializable
[docs]class FitMonitor(JSONSerializable, ABC):
"""Base class for monitoring the fitting of a model."""
[docs] @abstractmethod
def onFitStart(
self,
model: QSPRModel,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: np.ndarray | None = None,
y_val: np.ndarray | None = None,
):
"""Called before the training has started.
Args:
model (QSPRModel): model to be fitted
X_train (np.ndarray): training data
y_train (np.ndarray): training targets
X_val (np.ndarray | None): validation data, used for early stopping
y_val (np.ndarray | None): validation targets, used for early stopping
"""
[docs] @abstractmethod
def onFitEnd(self, estimator: Any, best_epoch: int | None = None):
"""Called after the training has finished.
Args:
estimator (Any): estimator that was fitted
best_epoch (int | None): index of the best epoch
"""
[docs] @abstractmethod
def onEpochStart(self, epoch: int):
"""Called before each epoch of the training.
Args:
epoch (int): index of the current epoch
"""
[docs] @abstractmethod
def onEpochEnd(self, epoch: int, train_loss: float, val_loss: float | None = None):
"""Called after each epoch of the training.
Args:
epoch (int): index of the current epoch
train_loss (float): loss of the current epoch
val_loss (float | None): validation loss of the current epoch
"""
[docs] @abstractmethod
def onBatchStart(self, batch: int):
"""Called before each batch of the training.
Args:
batch (int): index of the current batch
"""
[docs] @abstractmethod
def onBatchEnd(self, batch: int, loss: float):
"""Called after each batch of the training.
Args:
batch (int): index of the current batch
loss (float): loss of the current batch
"""
[docs]class AssessorMonitor(FitMonitor):
"""Base class for monitoring the assessment of a model."""
[docs] @abstractmethod
def onAssessmentStart(
self, model: QSPRModel, data: QSPRDataset, assesment_type: str
):
"""Called before the assessment has started.
Args:
model (QSPRModel): model to assess
data (QSPRDataset): data set used in assessment
assesment_type (str): type of assessment
"""
[docs] @abstractmethod
def onAssessmentEnd(self, predictions: pd.DataFrame):
"""Called after the assessment has finished.
Args:
predictions (pd.DataFrame): predictions of the assessment
"""
[docs] @abstractmethod
def onFoldStart(
self,
fold: int,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
):
"""Called before each fold of the assessment.
Args:
fold (int): index of the current fold
X_train (np.ndarray): training data of the current fold
y_train (np.ndarray): training targets of the current fold
X_test (np.ndarray): test data of the current fold
y_test (np.ndarray): test targets of the current fold
"""
[docs] @abstractmethod
def onFoldEnd(
self, model_fit: Any | tuple[Any, int], fold_predictions: pd.DataFrame
):
"""Called after each fold of the assessment.
Args:
model_fit (Any|tuple[Any, int]): fitted estimator of the current fold, or
tuple containing the fitted estimator and
the number of epochs it was trained for
"""
[docs]class HyperparameterOptimizationMonitor(AssessorMonitor):
"""Base class for monitoring the hyperparameter optimization of a model."""
[docs] @abstractmethod
def onOptimizationStart(
self, model: QSPRModel, data: QSPRDataset, config: dict,
optimization_type: str
):
"""Called before the hyperparameter optimization has started.
Args:
model (QSPRModel): model to optimize
data (QSPRDataset): data set used in optimization
config (dict): configuration of the hyperparameter optimization
optimization_type (str): type of hyperparameter optimization
"""
[docs] @abstractmethod
def onOptimizationEnd(self, best_score: float, best_parameters: dict):
"""Called after the hyperparameter optimization has finished.
Args:
best_score (float): best score found during optimization
best_parameters (dict): best parameters found during optimization
"""
[docs] @abstractmethod
def onIterationStart(self, params: dict):
"""Called before each iteration of the hyperparameter optimization.
Args:
params (dict): parameters used for the current iteration
"""
[docs] @abstractmethod
def onIterationEnd(self, score: float, scores: list[float]):
"""Called after each iteration of the hyperparameter optimization.
Args:
score (float): (aggregated) score of the current iteration
scores (list[float]): scores of the current iteration
(e.g for cross-validation)
"""
[docs]class NullMonitor(HyperparameterOptimizationMonitor):
"""Monitor that does nothing."""
[docs] def onFitStart(
self,
model: QSPRModel,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: np.ndarray | None = None,
y_val: np.ndarray | None = None,
):
"""Called before the training has started.
Args:
model (QSPRModel): model to be fitted
X_train (np.ndarray): training data
y_train (np.ndarray): training targets
X_val (np.ndarray | None): validation data, used for early stopping
y_val (np.ndarray | None): validation targets, used for early stopping
"""
[docs] def onFitEnd(self, estimator: Any, best_epoch: int | None = None):
"""Called after the training has finished.
Args:
estimator (Any): estimator that was fitted
best_epoch (int | None): index of the best epoch
"""
[docs] def onEpochStart(self, epoch: int):
"""Called before each epoch of the training.
Args:
epoch (int): index of the current epoch
"""
[docs] def onEpochEnd(self, epoch: int, train_loss: float, val_loss: float | None = None):
"""Called after each epoch of the training.
Args:
epoch (int): index of the current epoch
train_loss (float): loss of the current epoch
val_loss (float | None): validation loss of the current epoch
"""
[docs] def onBatchStart(self, batch: int):
"""Called before each batch of the training.
Args:
batch (int): index of the current batch
"""
[docs] def onBatchEnd(self, batch: int, loss: float):
"""Called after each batch of the training.
Args:
batch (int): index of the current batch
loss (float): loss of the current batch
"""
[docs] def onAssessmentStart(
self, model: QSPRModel, data: QSPRDataset, assesment_type: str
):
"""Called before the assessment has started.
Args:
model (QSPRModel): model to assess
data (QSPRDataset): data set used in assessment
assesment_type (str): type of assessment
"""
[docs] def onAssessmentEnd(self, predictions: pd.DataFrame):
"""Called after the assessment has finished.
Args:
predictions (pd.DataFrame): predictions of the assessment
"""
[docs] def onFoldStart(
self,
fold: int,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
):
"""Called before each fold of the assessment.
Args:
fold (int): index of the current fold
X_train (np.ndarray): training data of the current fold
y_train (np.ndarray): training targets of the current fold
X_test (np.ndarray): test data of the current fold
y_test (np.ndarray): test targets of the current fold
"""
[docs] def onFoldEnd(
self, model_fit: Any | tuple[Any, int], fold_predictions: pd.DataFrame
):
"""Called after each fold of the assessment.
Args:
model_fit (Any|tuple[Any, int]): fitted estimator of the current fold, or
tuple containing the fitted estimator and
the number of epochs it was trained for
"""
[docs] def onOptimizationStart(
self, model: QSPRModel, data: QSPRDataset, config: dict,
optimization_type: str
):
"""Called before the hyperparameter optimization has started.
Args:
model (QSPRModel): model to optimize
data (QSPRDataset): data set used in optimization
config (dict): configuration of the hyperparameter optimization
optimization_type (str): type of hyperparameter optimization
"""
[docs] def onOptimizationEnd(self, best_score: float, best_parameters: dict):
"""Called after the hyperparameter optimization has finished.
Args:
best_score (float): best score found during optimization
best_parameters (dict): best parameters found during optimization
"""
[docs] def onIterationStart(self, params: dict):
"""Called before each iteration of the hyperparameter optimization.
Args:
params (dict): parameters used for the current iteration
"""
[docs] def onIterationEnd(self, score: float, scores: list[float]):
"""Called after each iteration of the hyperparameter optimization.
Args:
score (float): (aggregated) score of the current iteration
scores (list[float]): scores of the current iteration
(e.g for cross-validation)
"""
[docs]class ListMonitor(HyperparameterOptimizationMonitor):
"""Monitor that combines multiple monitors.
Attributes:
monitors (list[HyperparameterOptimizationMonitor]): list of monitors
"""
def __init__(self, monitors: list[HyperparameterOptimizationMonitor]):
"""Initialize the monitor.
Args:
monitors (list[HyperparameterOptimizationMonitor]): list of monitors
"""
self.monitors = monitors
[docs] def onFitStart(
self,
model: QSPRModel,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: np.ndarray | None = None,
y_val: np.ndarray | None = None,
):
"""Called before the training has started.
Args:
model (QSPRModel): model to be fitted
X_train (np.ndarray): training data
y_train (np.ndarray): training targets
X_val (np.ndarray | None): validation data, used for early stopping
y_val (np.ndarray | None): validation targets, used for early stopping
"""
for monitor in self.monitors:
monitor.onFitStart(model, X_train, y_train, X_val, y_val)
[docs] def onFitEnd(self, estimator: Any, best_epoch: int | None = None):
"""Called after the training has finished.
Args:
estimator (Any): estimator that was fitted
best_epoch (int | None): index of the best epoch
"""
for monitor in self.monitors:
monitor.onFitEnd(estimator, best_epoch)
[docs] def onEpochStart(self, epoch: int):
"""Called before each epoch of the training.
Args:
epoch (int): index of the current epoch
"""
for monitor in self.monitors:
monitor.onEpochStart(epoch)
[docs] def onEpochEnd(self, epoch: int, train_loss: float, val_loss: float | None = None):
"""Called after each epoch of the training.
Args:
epoch (int): index of the current epoch
train_loss (float): loss of the current epoch
val_loss (float | None): validation loss of the current epoch
"""
for monitor in self.monitors:
monitor.onEpochEnd(epoch, train_loss, val_loss)
[docs] def onBatchStart(self, batch: int):
"""Called before each batch of the training.
Args:
batch (int): index of the current batch
"""
for monitor in self.monitors:
monitor.onBatchStart(batch)
[docs] def onBatchEnd(self, batch: int, loss: float):
"""Called after each batch of the training.
Args:
batch (int): index of the current batch
loss (float): loss of the current batch
"""
for monitor in self.monitors:
monitor.onBatchEnd(batch, loss)
[docs] def onAssessmentStart(
self, model: QSPRModel, data: QSPRDataset, assesment_type: str
):
"""Called before the assessment has started.
Args:
model (QSPRModel): model to assess
data (QSPRDataset): data set used in assessment
assesment_type (str): type of assessment
"""
for monitor in self.monitors:
monitor.onAssessmentStart(model, data, assesment_type)
[docs] def onAssessmentEnd(self, predictions: pd.DataFrame):
"""Called after the assessment has finished.
Args:
predictions (pd.DataFrame): predictions of the assessment
"""
for monitor in self.monitors:
monitor.onAssessmentEnd(predictions)
[docs] def onFoldStart(
self,
fold: int,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
):
"""Called before each fold of the assessment.
Args:
fold (int): index of the current fold
X_train (np.ndarray): training data of the current fold
y_train (np.ndarray): training targets of the current fold
X_test (np.ndarray): test data of the current fold
y_test (np.ndarray): test targets of the current fold
"""
for monitor in self.monitors:
monitor.onFoldStart(fold, X_train, y_train, X_test, y_test)
[docs] def onFoldEnd(
self, model_fit: Any | tuple[Any, int], fold_predictions: pd.DataFrame
):
"""Called after each fold of the assessment.
Args:
model_fit (Any|tuple[Any, int]): fitted estimator of the current fold, or
tuple containing the fitted estimator and
the number of epochs it was trained for
"""
for monitor in self.monitors:
monitor.onFoldEnd(model_fit, fold_predictions)
[docs] def onOptimizationStart(
self, model: QSPRModel, data: QSPRDataset, config: dict,
optimization_type: str
):
"""Called before the hyperparameter optimization has started.
Args:
model (QSPRModel): model to optimize
data (QSPRDataset): data set used in optimization
config (dict): configuration of the hyperparameter optimization
optimization_type (str): type of hyperparameter optimization
"""
for monitor in self.monitors:
monitor.onOptimizationStart(model, data, config, optimization_type)
[docs] def onOptimizationEnd(self, best_score: float, best_parameters: dict):
"""Called after the hyperparameter optimization has finished.
Args:
best_score (float): best score found during optimization
best_parameters (dict): best parameters found during optimization
"""
for monitor in self.monitors:
monitor.onOptimizationEnd(best_score, best_parameters)
[docs] def onIterationStart(self, params: dict):
"""Called before each iteration of the hyperparameter optimization.
Args:
params (dict): parameters used for the current iteration
"""
for monitor in self.monitors:
monitor.onIterationStart(params)
[docs] def onIterationEnd(self, score: float, scores: list[float]):
"""Called after each iteration of the hyperparameter optimization.
Args:
score (float): (aggregated) score of the current iteration
scores (list[float]): scores of the current iteration
(e.g for cross-validation)
"""
for monitor in self.monitors:
monitor.onIterationEnd(score, scores)
[docs]class BaseMonitor(HyperparameterOptimizationMonitor):
"""Base monitoring the fitting, training and optimization of a model.
Information about the fitting, training and optimization process is stored
internally, but not logged. This class can be used as a base class for other
other monitors that do log the information elsewhere.
If used to monitor hyperparameter optimization, the information about the
underlying assessments and fits is stored in the assessments and fits
attributes, respectively. If used to monitor assessment, the information
about the fits is stored in the fits attribute.
Attributes:
config (dict): configuration of the hyperparameter optimization
bestScore (float): best score found during optimization
bestParameters (dict): best parameters found during optimization
assessments (dict): dictionary of assessments, keyed by the iteration number
(each assessment includes: assessmentModel, assessmentDataset, foldData,
predictions, estimators, fits)
scores (pd.DataFrame): scores for each hyperparameter search iteration
model (QSPRModel): model to optimize
data (QSPRDataset): dataset used in optimization
assessmentType (str): type of current assessment
assessmentModel (QSPRModel): model to assess in current assessment
assessmentDataset (QSPRDataset): data set used in current assessment
foldData (dict): dictionary of input data, keyed by the fold index, of the
current assessment
predictions (pd.DataFrame): predictions for the dataset of the current assessment
estimators (dict): dictionary of fitted estimators, keyed by the fold index of
the current assessment
currentFold (int): index of the current fold of the
current assessment
fits (dict): dictionary of fit data, keyed by the fold index of the current
assessment (each fit includes: fitData, fitLog, batchLog, bestEstimator,
bestEpoch)
fitData (dict): dictionary of input data of the current fit of the current
assessment
fitModel (QSPRModel): model to fit in current fit of the current assessment
fitLog (pd.DataFrame): log of the training process of the current fit of the
current assessment
batchLog (pd.DataFrame): log of the training process per batch of the current
fit of the current assessment
currentEpoch (int): index of the current epoch of the current fit of the current
assessment
currentBatch (int): index of the current batch of the current fit of the current
assessment
bestEstimator (Any): best estimator of the current fit of the current
assessment
bestEpoch (int): index of the best epoch of the current fit of the current
assessment
"""
def __init__(self):
self.data = None
self.model = None
self.optimizationType = None
# hyperparameter optimization data
self.config = None
self.bestScore = None
self.bestParameters = None
self.parameters = {}
self.assessments = {}
self.scores = pd.DataFrame(
columns=["aggregated_score", "fold_scores"]
).rename_axis("Iteration")
self.iteration = None
# assessment data
self.assessmentModel = None
self.assessmentDataset = None
self.foldData = {}
self.predictions = None
self.estimators = {}
self.currentFold = None
self.fits = {}
# fit data
self.fitModel = None
self.fitData = None
self.fitLog = pd.DataFrame(columns=["epoch", "train_loss", "val_loss"])
self.batchLog = pd.DataFrame(columns=["epoch", "batch", "loss"])
self.currentEpoch = None
self.currentBatch = None
self.bestEstimator = None
self.bestEpoch = None
def __getstate__(self):
o_dict = super().__getstate__()
# convert all data frames to dicts
for key, value in o_dict.items():
if isinstance(value, pd.DataFrame):
o_dict[key] = {"pd.DataFrame": value.to_dict()}
return o_dict
def __setstate__(self, state):
# convert all dicts to data frames
for key, value in state.items():
if isinstance(value, dict) and "pd.DataFrame" in value:
state[key] = pd.DataFrame.from_dict(value["pd.DataFrame"])
super().__setstate__(state)
[docs] def onOptimizationStart(
self, model: QSPRModel, data: QSPRDataset, config: dict,
optimization_type: str
):
"""Called before the hyperparameter optimization has started.
Args:
model (QSPRModel): model to optimize
data (QSPRDataset): data set used in optimization
config (dict): configuration of the hyperparameter optimization
optimization_type (str): type of hyperparameter optimization
"""
self.optimizationType = optimization_type
self.iteration = 0
self.model = model
self.data = data
self.config = config
[docs] def onOptimizationEnd(self, best_score: float, best_parameters: dict):
"""Called after the hyperparameter optimization has finished.
Args:
best_score (float): best score found during optimization
best_parameters (dict): best parameters found during optimization
"""
self.bestScore = best_score
self.bestParameters = best_parameters
[docs] def onIterationStart(self, params: dict):
"""Called before each iteration of the hyperparameter optimization.
Args:
params (dict): parameters used for the current iteration
"""
self.parameters[self.iteration] = params
[docs] def onIterationEnd(self, score: float, scores: list[float]):
"""Called after each iteration of the hyperparameter optimization.
Args:
score (float): (aggregated) score of the current iteration
scores (list[float]): scores of the current iteration
(e.g for cross-validation)
"""
self.scores.loc[self.iteration, :] = [score, scores]
self.assessments[self.iteration] = self._get_assessment()
self._clear_assessment()
self.iteration += 1
[docs] def onAssessmentStart(
self, model: QSPRModel, data: QSPRDataset, assesment_type: str
):
"""Called before the assessment has started.
Args:
model (QSPRModel): model to assess
data (QSPRDataset): data set used in assessment
assesment_type (str): type of assessment
"""
self.assessmentModel = model
self.assessmentDataset = data
self.assessmentType = assesment_type
[docs] def onAssessmentEnd(self, predictions: pd.DataFrame):
"""Called after the assessment has finished.
Args:
predictions (pd.DataFrame): predictions of the assessment
"""
self.predictions = predictions
[docs] def onFoldStart(
self,
fold: int,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
):
"""Called before each fold of the assessment.
Args:
fold (int): index of the current fold
X_train (np.ndarray): training data of the current fold
y_train (np.ndarray): training targets of the current fold
X_test (np.ndarray): test data of the current fold
y_test (np.ndarray): test targets of the current fold
"""
self.currentFold = fold
self.foldData[fold] = {
"X_train": X_train,
"y_train": y_train,
"X_test": X_test,
"y_test": y_test,
}
[docs] def onFoldEnd(
self, model_fit: Any | tuple[Any, int], fold_predictions: pd.DataFrame
):
"""Called after each fold of the assessment.
Args:
model_fit (Any|tuple[Any, int]): fitted estimator of the current fold, or
tuple containing the fitted estimator and
the number of epochs it was trained for
fold_predictions (pd.DataFrame): predictions of the current fold
"""
self.estimators[self.currentFold] = model_fit
self.fits[self.currentFold] = self._getFit()
self._clearFit()
def _clear_assessment(self):
"""Clear the assessment data."""
self.assessmentModel = None
self.asssessmentDataset = None
self.foldData = {}
self.predictions = None
self.estimators = {}
self.fits = {}
def _get_assessment(self) -> tuple[QSPRModel, QSPRDataset, pd.DataFrame, dict]:
"""Return the assessment data."""
return {
"assessmentModel": self.assessmentModel,
"assessmentDataset": self.assessmentDataset,
"foldData": self.foldData,
"predictions": self.predictions,
"estimators": self.estimators,
"fits": self.fits,
}
[docs] def onFitStart(
self,
model: QSPRModel,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: np.ndarray | None = None,
y_val: np.ndarray | None = None,
):
"""Called before the training has started.
Args:
model (QSPRModel): model to be fitted
data (QSPRDataset): data set used in training
X_train (np.ndarray): training data
y_train (np.ndarray): training targets
X_val (np.ndarray | None): validation data, used for early stopping
y_val (np.ndarray | None): validation targets, used for early stopping
"""
self.fitModel = model
self.fitData = {
"X_train": X_train,
"y_train": y_train,
"X_val": X_val,
"y_val": y_val,
}
self.currentEpoch = 0
self.currentBatch = 0
[docs] def onFitEnd(self, estimator: Any, best_epoch: int | None = None):
"""Called after the training has finished.
Args:
estimator (Any): estimator that was fitted
best_epoch (int | None): index of the best epoch
"""
self.bestEstimator = estimator
self.bestEpoch = best_epoch
[docs] def onEpochStart(self, epoch: int):
"""Called before each epoch of the training.
Args:
epoch (int): index of the current epoch
"""
self.currentEpoch = epoch
[docs] def onEpochEnd(self, epoch: int, train_loss: float, val_loss: float | None = None):
"""Called after each epoch of the training.
Args:
epoch (int): index of the current epoch
train_loss (float): loss of the current epoch
val_loss (float | None): validation loss of the current epoch
"""
self.fitLog.loc[epoch, :] = [epoch, train_loss, val_loss]
[docs] def onBatchStart(self, batch: int):
"""Called before each batch of the training.
Args:
batch (int): index of the current batch
"""
self.currentBatch = batch
[docs] def onBatchEnd(self, batch: int, loss: float):
"""Called after each batch of the training.
Args:
batch (int): index of the current batch
loss (float): loss of the current batch
"""
self.batchLog.loc[len(self.batchLog), :] = [self.currentEpoch, batch, loss]
def _clearFit(self):
self.fitLog = pd.DataFrame(columns=["epoch", "train_loss", "val_loss"])
self.batchLog = pd.DataFrame(columns=["epoch", "batch", "loss"])
self.fitData = None
self.currentEpoch = None
self.currentBatch = None
self.bestEstimator = None
self.bestEpoch = None
def _getFit(self) -> tuple[pd.DataFrame, pd.DataFrame, Any, int]:
return {
"fitData": self.fitData,
"fitLog": self.fitLog,
"batchLog": self.batchLog,
"bestEstimator": self.bestEstimator,
"bestEpoch": self.bestEpoch,
}
[docs]class FileMonitor(BaseMonitor):
def __init__(
self,
save_optimization: bool = True,
save_assessments: bool = True,
save_fits: bool = True,
):
"""Monitor hyperparameter optimization, assessment and fitting to files.
Args:
save_optimization (bool): whether to save the hyperparameter optimization
scores
save_assessments (bool): whether to save assessment predictions
save_fits (bool): whether to save the fit log and batch log
"""
super().__init__()
self.saveOptimization = save_optimization
self.saveAssessments = save_assessments
self.saveFits = save_fits
self.outDir = None
[docs] def onOptimizationStart(
self, model: QSPRModel, data: QSPRDataset, config: dict,
optimization_type: str
):
"""Called before the hyperparameter optimization has started.
Args:
model (QSPRModel): model to optimize
data (QSPRDataset): data set used in optimization
config (dict): configuration of the hyperparameter optimization
optimization_type (str): type of hyperparameter optimization
"""
super().onOptimizationStart(model, data, config, optimization_type)
self.outDir = self.outDir or model.outDir
self.optimizationPath = f"{self.outDir}/{self.optimizationType}"
[docs] def onIterationStart(self, params: dict):
"""Called before each iteration of the hyperparameter optimization.
Args:
params (dict): parameters used for the current iteration
"""
super().onIterationStart(params)
self.optimizationItPath = f"{self.optimizationPath}/iteration_{self.iteration}"
[docs] def onIterationEnd(self, score: float, scores: list[float]):
"""Called after each iteration of the hyperparameter optimization.
Args:
score (float): (aggregated) score of the current iteration
scores (list[float]): scores of the current iteration
(e.g for cross-validation)
"""
if self.saveAssessments:
# save parameters to json
with open(f"{self.optimizationItPath}/parameters.json", "w") as f:
json.dump(self.parameters[self.iteration], f)
super().onIterationEnd(score, scores)
if self.saveOptimization:
# add parameters to scores with separate columns
savescores = pd.concat(
[self.scores, pd.DataFrame(self.parameters).T], axis=1
)
savescores.to_csv(
f"{self.optimizationPath}/{self.optimizationType}_scores.tsv",
sep="\t",
index=False,
)
[docs] def onAssessmentStart(
self, model: QSPRModel, data: QSPRDataset, assesment_type: str
):
"""Called before the assessment has started.
Args:
model (QSPRModel): model to assess
data (QSPRDataset): data set used in assessment
assesment_type (str): type of assessment
"""
super().onAssessmentStart(model, data, assesment_type)
self.outDir = self.outDir or model.outDir
if self.saveAssessments:
if self.iteration is not None:
self.assessmentPath = f"{self.optimizationItPath}/{self.assessmentType}"
else:
self.assessmentPath = f"{self.outDir}/{self.assessmentType}"
os.makedirs(self.assessmentPath, exist_ok=True)
[docs] def onAssessmentEnd(self, predictions: pd.DataFrame):
"""Called after the assessment has finished.
Args:
predictions (pd.DataFrame): predictions of the assessment
"""
super().onAssessmentEnd(predictions)
if self.saveAssessments:
predictions.to_csv(
f"{self.assessmentPath}/{self.assessmentType}_predictions.tsv", sep="\t"
)
[docs] def onFitStart(
self,
model: QSPRModel,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: np.ndarray | None = None,
y_val: np.ndarray | None = None,
):
"""Called before the training has started.
Args:
model (QSPRModel): model to be fitted
X_train (np.ndarray): training data
y_train (np.ndarray): training targets
X_val (np.ndarray | None): validation data, used for early stopping
y_val (np.ndarray | None): validation targets, used for early stopping
"""
super().onFitStart(model, X_train, y_train, X_val, y_val)
self.outDir = self.outDir or model.outDir
self.fitPath = self.outDir
if self.saveFits:
if self.iteration is not None:
self.fitPath = f"{self.optimizationItPath}"
if self.currentFold is not None:
self.fitPath = (
f"{self.fitPath}/{self.assessmentType}/fold_{self.currentFold}"
)
os.makedirs(self.fitPath, exist_ok=True)
[docs] def onFitEnd(self, estimator: Any, best_epoch: int | None = None):
"""Called after the training has finished.
Args:
estimator (Any): estimator that was fitted
best_epoch (int | None): index of the best epoch
"""
super().onFitEnd(estimator, best_epoch)
if self.saveFits:
self.fitLog.to_csv(f"{self.fitPath}/fit_log.tsv", sep="\t")
self.batchLog.to_csv(f"{self.fitPath}/batch_log.tsv", sep="\t")
[docs]class WandBMonitor(BaseMonitor):
"""Monitor hyperparameter optimization to weights and biases."""
def __init__(self, project_name: str, **kwargs):
"""Monitor assessment to weights and biases.
Args:
project_name (str): name of the project to log to
kwargs: additional keyword arguments for wandb.init
"""
super().__init__()
try:
import wandb
except ImportError:
raise ImportError("WandBMonitor requires wandb to be installed.")
self.wandb = wandb
wandb.login()
self.projectName = project_name
self.kwargs = kwargs
[docs] def onFoldStart(
self,
fold: int,
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
):
"""Called before each fold of the assessment.
Args:
fold (int): index of the current fold
X_train (np.ndarray): training data of the current fold
y_train (np.ndarray): training targets of the current fold
X_test (np.ndarray): test data of the current fold
y_test (np.ndarray): test targets of the current fold
"""
super().onFoldStart(fold, X_train, y_train, X_test, y_test)
config = {
"fold": fold,
"model": self.assessmentModel.name,
"assessmentType": self.assessmentType,
}
# add hyperparameter optimization parameters if available
if hasattr(self, "optimizationType"):
config["optimizationType"] = self.optimizationType
config.update(self.parameters[self.iteration])
config["hyperParamOpt_iteration"] = self.iteration
else:
config["optimizationType"] = None
group = (
f"{self.model.name}_{self.optimizationType}_{self.iteration}"
if hasattr(self, "optimizationType")
else f"{self.assessmentModel.name}"
)
name = f"{group}_{self.assessmentType}_{fold}"
self.wandb.init(
project=self.projectName,
config=config,
name=name,
group=group,
dir=f"{self.assessmentModel.outDir}",
**self.kwargs,
)
[docs] def onFoldEnd(
self, model_fit: Any | tuple[Any, int], fold_predictions: pd.DataFrame
):
"""Called after each fold of the assessment.
Args:
model_fit (Any |tuple[Any, int]):
fitted estimator of the current fold
"""
super().onFoldEnd(model_fit, fold_predictions)
fold_predictions_copy = deepcopy(fold_predictions)
# add smiles to fold predictions by merging on index
dataset_smiles = self.assessmentDataset.getDF()[
self.assessmentDataset.smilesCol
]
fold_predictions_copy = fold_predictions_copy.merge(
dataset_smiles, left_index=True, right_index=True
)
fold_predictions_copy["molecule"] = None
for index, row in fold_predictions_copy.iterrows():
mol = Chem.MolFromSmiles(row[self.assessmentDataset.smilesCol])
if mol is not None:
fold_predictions_copy.at[index, "molecule"] = self.wandb.Image(
Draw.MolToImage(mol, size=(200, 200))
)
wandbTable = self.wandb.Table(data=fold_predictions_copy)
self.wandb.log({"Test Results": wandbTable})
self.wandb.finish()
[docs] def onFitStart(
self,
model: QSPRModel,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: np.ndarray | None = None,
y_val: np.ndarray | None = None,
):
"""Called before the training has started.
Args:
model (QSPRModel): model to train
"""
super().onFitStart(model, X_train, y_train, X_val, y_val)
# initialize wandb run if not already initialized
if not self.wandb.run:
self.wandb.init(
project=self.projectName,
config={"model": self.fitModel.name},
name=f"{self.fitModel.name}_fit",
group=self.fitModel.name,
dir=f"{self.fitModel.outDir}",
**self.kwargs,
)
[docs] def onFitEnd(self, estimator: Any, best_epoch: int | None = None):
"""Called after the training has finished.
Args:
estimator (Any): estimator that was fitted
best_epoch (int | None): index of the best epoch
"""
super().onFitEnd(estimator, best_epoch)
self.wandb.log({"best_epoch": best_epoch})
# finish wandb run if not already finished
if not hasattr(self, "assessmentType"):
self.wandb.finish()
[docs] def onEpochEnd(self, epoch: int, train_loss: float, val_loss: float | None = None):
"""Called after each epoch of the training.
Args:
epoch (int): index of the current epoch
train_loss (float): loss of the current epoch
val_loss (float | None): validation loss of the current epoch
"""
super().onEpochEnd(epoch, train_loss, val_loss)
self.wandb.log({"epoch": epoch, "train_loss": train_loss, "val_loss": val_loss})
[docs] def onBatchEnd(self, batch: int, loss: float):
"""Called after each batch of the training.
Args:
batch (int): index of the current batch
loss (float): loss of the current batch
"""
super().onBatchEnd(batch, loss)
self.wandb.log({"batch": batch, "loss": loss})