"""This module holds the tests for functions regarding QSPR modelling."""
import os
from typing import Type
from unittest import TestCase
import numpy as np
from mlchemad.applicability_domains import KNNApplicabilityDomain
from parameterized import parameterized
from sklearn.cross_decomposition import PLSRegression
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
accuracy_score,
explained_variance_score,
log_loss,
make_scorer,
mean_squared_error,
roc_auc_score,
top_k_accuracy_score
)
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.svm import SVC, SVR
from xgboost import XGBClassifier, XGBRegressor
from . import SklearnMetrics
from .assessment.classification import create_metrics_summary
from .assessment.metrics.classification import CalibrationError, BEDROC, \
EnrichmentFactor, \
RobustInitialEnhancement, Prevalence, Sensitivity, Specificity, \
PositivePredictivity, NegativePredictivity, CohenKappa, \
BalancedPositivePredictivity, BalancedNegativePredictivity, \
BalancedMatthewsCorrcoeff, BalancedCohenKappa
from .assessment.metrics.masked import MaskedMetric
from .assessment.metrics.regression import Pearson, Kendall, KSlope, R20, KPrimeSlope, \
AverageFoldError, AbsoluteAverageFoldError, PercentageWithinFoldError, RPrime20
from .assessment.regression import create_correlation_summary
from ..data.processing.applicability_domain import MLChemADWrapper
from ..models.early_stopping import EarlyStopping, EarlyStoppingMode, early_stopping
from ..models.monitors import BaseMonitor, FileMonitor, ListMonitor
from ..models.scikit_learn import SklearnModel
from ..tasks import ModelTasks, TargetTasks
from ..utils.testing.base import QSPRTestCase
from ..utils.testing.check_mixins import ModelCheckMixIn, MonitorsCheckMixIn
from ..utils.testing.path_mixins import ModelDataSetsPathMixIn
[docs]class SklearnBaseModelTestCase(ModelDataSetsPathMixIn, ModelCheckMixIn, QSPRTestCase):
"""This class holds the tests for the SklearnModel class."""
[docs] def setUp(self):
super().setUp()
self.setUpPaths()
[docs] def getModel(
self,
name: str,
alg: Type | None = None,
parameters: dict | None = None,
random_state: int | None = None,
):
"""Create a SklearnModel model.
Args:
name (str): the name of the model
alg (Type, optional): the algorithm to use. Defaults to None.
dataset (QSPRDataset, optional): the dataset to use. Defaults to None.
parameters (dict, optional): the parameters to use. Defaults to None.
random_state(int, optional):
Random state to use for shuffling and other random operations. Defaults
to None.
Returns:
SklearnModel: the model
"""
return SklearnModel(
base_dir=self.generatedModelsPath,
alg=alg,
name=name,
parameters=parameters,
random_state=random_state,
)
[docs]class TestSklearnRegression(SklearnBaseModelTestCase):
"""Test the SklearnModel class for regression models."""
@parameterized.expand(
[
(alg_name, TargetTasks.REGRESSION, alg_name, alg, random_state)
for alg, alg_name in (
(RandomForestRegressor, "RFR"),
(XGBRegressor, "XGBR"),
) for random_state in ([None], [1, 42], [42, 42])
] + [
(alg_name, TargetTasks.REGRESSION, alg_name, alg, [None])
for alg, alg_name in (
(PLSRegression, "PLSR"),
(SVR, "SVR"),
(KNeighborsRegressor, "KNNR"),
)
]
)
def testRegressionBasicFit(self, _, task, model_name, model_class, random_state):
"""Test model training for regression models."""
if model_name not in ["SVR", "PLSR"]:
parameters = {"n_jobs": self.nCPU}
else:
parameters = None
# initialize dataset
dataset = self.createLargeTestDataSet(
target_props=[{
"name": "CL",
"task": task
}],
preparation_settings=self.getDefaultPrep(),
)
# initialize model for training from class
model = self.getModel(
name=f"{model_name}_{task}",
alg=model_class,
parameters=parameters,
random_state=random_state[0],
)
self.fitTest(model, dataset)
# load in model from file
predictor = SklearnModel(name=f"{model_name}_{task}", base_dir=model.baseDir)
# make predictions with the trained model and check if the results are (not)
# equal if the random state is the (not) same and at the same time
# check if the output is the same before and after saving and loading
if random_state[0] is not None:
comparison_model = self.getModel(
name=f"{model_name}_{task}",
alg=model_class,
parameters=parameters,
random_state=random_state[1],
)
self.fitTest(comparison_model, dataset)
self.predictorTest(
predictor, # model loaded from file
dataset=dataset,
comparison_model=comparison_model, # comparison model not saved/loaded
expect_equal_result=random_state[0] == random_state[1],
)
else:
self.predictorTest(predictor, dataset=dataset)
[docs] def testPLSRegressionSummaryWithSeed(self):
"""Test model training for regression models."""
task = TargetTasks.REGRESSION
model_name = "PLSR"
model_class = PLSRegression
parameters = None
dataset = self.createLargeTestDataSet(
target_props=[{
"name": "CL",
"task": task
}],
preparation_settings=self.getDefaultPrep(),
)
model = self.getModel(
name=f"{model_name}_{task}",
alg=model_class,
parameters=parameters,
)
self.fitTest(model, dataset)
expected_summary = create_correlation_summary(model)
# Generate summary again, check that the result is identical
model = self.getModel(
name=f"{model_name}_{task}",
alg=model_class,
parameters=parameters,
)
self.fitTest(model, dataset)
summary = create_correlation_summary(model)
self.assertListEqual(summary["ModelName"], expected_summary["ModelName"])
self.assertListEqual(summary["R2"], expected_summary["R2"])
self.assertListEqual(summary["RMSE"], expected_summary["RMSE"])
self.assertListEqual(summary["Set"], expected_summary["Set"])
[docs]class TestSklearnRegressionMultiTask(SklearnBaseModelTestCase):
"""Test the SklearnModel class for multi-task regression models."""
@parameterized.expand(
[
(alg_name, alg_name, alg, random_state)
for alg, alg_name in ((RandomForestRegressor, "RFR"),)
for random_state in ([None], [1, 42], [42, 42])
] + [
(alg_name, alg_name, alg, [None])
for alg, alg_name in ((KNeighborsRegressor, "KNNR"),)
]
)
def testRegressionMultiTaskFit(self, _, model_name, model_class, random_state):
"""Test model training for multitask regression models."""
# initialize dataset
dataset = self.createLargeTestDataSet(
target_props=[
{
"name": "fu",
"task": TargetTasks.REGRESSION,
"imputer": SimpleImputer(strategy="mean"),
},
{
"name": "CL",
"task": TargetTasks.REGRESSION,
"imputer": SimpleImputer(strategy="mean"),
},
],
preparation_settings=self.getDefaultPrep(),
)
# test classifier
# initialize model for training from class
model = self.getModel(
name=f"{model_name}_multitask_regression",
alg=model_class,
random_state=random_state[0],
)
self.fitTest(model, dataset)
# load in model from file
predictor = SklearnModel(
name=f"{model_name}_multitask_regression", base_dir=model.baseDir
)
# make predictions with the trained model and check if the results are (not)
# equal if the random state is the (not) same and at the same time
# check if the output is the same before and after saving and loading
if random_state[0] is not None and model_name in ["RFR"]:
comparison_model = self.getModel(
name=f"{model_name}_multitask_regression",
alg=model_class,
random_state=random_state[1],
)
self.fitTest(comparison_model, dataset)
self.predictorTest(
predictor, # model loaded from file
dataset=dataset,
comparison_model=comparison_model, # comparison model not saved/loaded
expect_equal_result=random_state[0] == random_state[1],
)
else:
self.predictorTest(predictor, dataset=dataset)
[docs]class TestSklearnSerialization(SklearnBaseModelTestCase):
[docs] def testJSON(self):
dataset = self.createLargeTestDataSet(
target_props=[{
"name": "CL",
"task": TargetTasks.SINGLECLASS,
"th": [6.5]
}],
preparation_settings=self.getDefaultPrep(),
)
model = self.getModel(
name="TestSerialization",
alg=RandomForestClassifier,
parameters={
"n_jobs": self.nCPU,
"n_estimators": 10
},
random_state=42,
)
model.save()
content = model.toJSON()
model2 = SklearnModel.fromJSON(content)
model2.baseDir = model.baseDir
model3 = SklearnModel.fromFile(model.metaFile)
model4 = SklearnModel(name=model.name, base_dir=model.baseDir)
self.assertEqual(model.metaFile, model2.metaFile)
self.assertEqual(model.metaFile, model3.metaFile)
self.assertEqual(model.metaFile, model4.metaFile)
self.assertEqual(model.toJSON(), model2.toJSON())
self.assertEqual(model.toJSON(), model3.toJSON())
self.assertEqual(model.toJSON(), model4.toJSON())
[docs]class TestSklearnClassification(SklearnBaseModelTestCase):
"""Test the SklearnModel class for classification models."""
@parameterized.expand(
[
(f"{alg_name}_{task}", task, th, alg_name, alg, random_state)
for alg, alg_name in (
(RandomForestClassifier, "RFC"),
(XGBClassifier, "XGBC"),
) for task, th in (
(TargetTasks.SINGLECLASS, [6.5]),
(TargetTasks.MULTICLASS, [0, 2, 10, 1100]),
) for random_state in ([None], [1, 42], [42, 42])
] + [
(f"{alg_name}_{task}", task, th, alg_name, alg, [None])
for alg, alg_name in (
(SVC, "SVC"),
(KNeighborsClassifier, "KNNC"),
(GaussianNB, "NB"),
) for task, th in (
(TargetTasks.SINGLECLASS, [6.5]),
(TargetTasks.MULTICLASS, [0, 2, 10, 1100]),
)
]
)
def testClassificationBasicFit(
self, _, task, th, model_name, model_class, random_state
):
"""Test model training for classification models."""
if model_name not in ["NB", "SVC"]:
parameters = {"n_jobs": self.nCPU}
else:
parameters = None
# special case for SVC
if model_name == "SVC":
if parameters is not None:
parameters.update({"probability": True})
else:
parameters = {"probability": True}
# special case for XGB, set subsample to 0.6 to introduce randomness
if model_name == "XGBC":
parameters = {"subsample": 0.3}
# initialize dataset
dataset = self.createLargeTestDataSet(
target_props=[{
"name": "CL",
"task": task,
"th": th
}],
preparation_settings=self.getDefaultPrep(),
)
# test classifier
# initialize model for training from class
model = self.getModel(
name=f"{model_name}_{task}",
alg=model_class,
parameters=parameters,
random_state=random_state[0],
)
self.fitTest(model, dataset)
# load in model from file
predictor = SklearnModel(name=f"{model_name}_{task}", base_dir=model.baseDir)
# make predictions with the trained model and check if the results are (not)
# equal if the random state is the (not) same and at the same time
# check if the output is the same before and after saving and loading
if random_state[0] is not None:
comparison_model = self.getModel(
name=f"{model_name}_{task}",
alg=model_class,
parameters=parameters,
random_state=random_state[1],
)
self.fitTest(comparison_model, dataset)
self.predictorTest(
predictor, # model loaded from file
dataset=dataset,
comparison_model=comparison_model, # comparison model not saved/loaded
expect_equal_result=random_state[0] == random_state[1],
)
else:
self.predictorTest(predictor, dataset=dataset)
[docs] def testRandomForestClassifierFitWithSeed(self):
parameters = {
"n_jobs": self.nCPU,
}
# initialize dataset
dataset = self.createLargeTestDataSet(
target_props=[{
"name": "CL",
"task": TargetTasks.SINGLECLASS,
"th": [6.5]
}],
preparation_settings=self.getDefaultPrep(),
random_state=42,
)
# test classifier
# initialize model for training from class
model = self.getModel(
name=f"RFC_{TargetTasks.SINGLECLASS}",
alg=RandomForestClassifier,
parameters=parameters,
)
self.fitTest(model, dataset)
expected_summary = create_metrics_summary(model)
# Generate summary again, check that the result is identical
model = self.getModel(
name=f"RFC_{TargetTasks.SINGLECLASS}",
alg=RandomForestClassifier,
parameters=parameters,
random_state=42,
)
self.fitTest(model, dataset)
summary = create_metrics_summary(model)
self.assertListEqual(summary["Metric"], expected_summary["Metric"])
self.assertListEqual(summary["Model"], expected_summary["Model"])
self.assertListEqual(summary["TestSet"], expected_summary["TestSet"])
self.assertListEqual(summary["Value"], expected_summary["Value"])
[docs]class TestSklearnClassificationMultiTask(SklearnBaseModelTestCase):
"""Test the SklearnModel class for multi-task classification models."""
@parameterized.expand(
[
(alg_name, alg_name, alg, random_state)
for alg, alg_name in ((RandomForestClassifier, "RFC"),)
for random_state in ([None], [1, 42], [42, 42])
] + [
(alg_name, alg_name, alg, [None])
for alg, alg_name in ((KNeighborsClassifier, "KNNC"),)
]
)
def testClassificationMultiTaskFit(self, _, model_name, model_class, random_state):
"""Test model training for multitask classification models."""
if model_name not in ["NB", "SVC"]:
parameters = {"n_jobs": self.nCPU}
else:
parameters = {}
# special case for SVC
if model_name == "SVC":
parameters.update({"probability": True})
# initialize dataset
dataset = self.createLargeTestDataSet(
target_props=[
{
"name": "fu",
"task": TargetTasks.SINGLECLASS,
"th": [0.3],
"imputer": SimpleImputer(strategy="mean"),
},
{
"name": "CL",
"task": TargetTasks.SINGLECLASS,
"th": [6.5],
"imputer": SimpleImputer(strategy="mean"),
},
],
preparation_settings=self.getDefaultPrep(),
)
# test classifier
# initialize model for training from class
model = self.getModel(
name=f"{model_name}_multitask_classification",
alg=model_class,
parameters=parameters,
random_state=random_state[0],
)
self.fitTest(model, dataset)
# load in model from file
predictor = SklearnModel(
name=f"{model_name}_multitask_classification", base_dir=model.baseDir
)
# make predictions with the trained model and check if the results are (not)
# equal if the random state is the (not) same and at the same time
# check if the output is the same before and after saving and loading
if random_state[0] is not None:
comparison_model = self.getModel(
name=f"{model_name}_multitask_classification",
alg=model_class,
parameters=parameters,
random_state=random_state[1],
)
self.fitTest(comparison_model, dataset)
self.predictorTest(
predictor, # model loaded from file
dataset=dataset,
comparison_model=comparison_model, # comparison model not saved/loaded
expect_equal_result=random_state[0] == random_state[1],
)
else:
self.predictorTest(predictor, dataset=dataset)
[docs]class TestMetrics(TestCase):
"""Test the SklearnMetrics from the metrics module."""
[docs] def sample_data(self, task: ModelTasks, use_proba: bool = False):
"""Sample data for testing."""
if task == ModelTasks.REGRESSION:
y_true = np.array([1.2, 2.2, 3.2, 4.2, 5.2])
y_pred = np.array([[2.2], [2.2], [3.2], [4.2], [5.2]])
elif task == ModelTasks.MULTITASK_REGRESSION:
y_true = np.array(
[[1.2, 2.2], [3.2, 4.2], [5.2, 1.2], [2.2, 3.2], [4.2, 5.2]]
)
y_pred = np.array(
[[2.2, 2.2], [3.2, 4.2], [5.2, 1.2], [2.2, 3.2], [4.2, 5.2]]
)
elif task == ModelTasks.SINGLECLASS and not use_proba:
y_true = np.array([1, 0, 1, 0, 1])
y_pred = np.array([[0], [0], [1], [0], [1]])
elif task == ModelTasks.SINGLECLASS and use_proba:
y_true = np.array([1, 0, 1, 0, 1])
y_pred = [
np.array([[0.2, 0.8], [0.2, 0.8], [0.8, 0.2], [0.1, 0.9], [0.9, 0.1]])
]
elif task == ModelTasks.MULTICLASS and not use_proba:
y_true = np.array([1, 2, 1, 0, 1])
y_pred = np.array([[0], [1], [2], [1], [1]])
elif task == ModelTasks.MULTICLASS and use_proba:
y_true = np.array([1, 2, 1, 0, 1])
y_pred = [
np.array([[0.9, 0.1, 0.0],
[0.1, 0.8, 0.1],
[0.0, 0.1, 0.9],
[0.1, 0.8, 0.1],
[0.1, 0.8, 0.1]])
]
elif task == ModelTasks.MULTITASK_SINGLECLASS and not use_proba:
y_true = np.array([[1, 0], [1, 1], [1, 0], [0, 0], [1, 0]])
y_pred = np.array([[0, 0], [1, 1], [0, 0], [0, 0], [1, 0]])
elif task == ModelTasks.MULTITASK_SINGLECLASS and use_proba:
y_true = np.array([[1, 0], [1, 1], [1, 0], [0, 0], [1, 0]])
y_pred = [
np.array([[0.2, 0.8], [0.2, 0.8], [0.8, 0.2], [0.1, 0.9], [0.9, 0.1]]),
np.array([[0.2, 0.8], [0.2, 0.8], [0.8, 0.2], [0.1, 0.9], [0.9, 0.1]]),
]
else:
raise ValueError(f"Invalid task or not implemented: {task}")
return y_true, y_pred
[docs] def test_SklearnMetrics(self):
"""Test the sklearn metrics wrapper."""
# test regression metrics
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
## test explained variance score with scorer from metric
metric = explained_variance_score
qsprpred_scorer = SklearnMetrics(make_scorer(metric))
self.assertEqual(
qsprpred_scorer(y_true, y_pred), # 2D np array standard in QSPRpred
metric(y_true, np.squeeze(y_pred)), # 1D np array standard in sklearn
)
## test RMSE score with scorer from str (smaller is better)
qsprpred_scorer = SklearnMetrics("neg_mean_squared_error")
self.assertEqual(
qsprpred_scorer(y_true, y_pred),
-mean_squared_error(y_true, np.squeeze(y_pred)), # negated
)
## test multitask regression
y_true, y_pred = self.sample_data(ModelTasks.MULTITASK_REGRESSION)
qsprpred_scorer = SklearnMetrics("explained_variance")
self.assertEqual(
qsprpred_scorer(y_true, y_pred), explained_variance_score(y_true, y_pred)
)
# test classification metrics
## single class discrete
y_true, y_pred = self.sample_data(ModelTasks.SINGLECLASS)
qsprpred_scorer = SklearnMetrics("accuracy")
self.assertEqual(
qsprpred_scorer(y_true, y_pred), accuracy_score(y_true, np.squeeze(y_pred))
)
## single class proba
y_true, y_pred = self.sample_data(ModelTasks.SINGLECLASS, use_proba=True)
qsprpred_scorer = SklearnMetrics("neg_log_loss")
self.assertEqual(
qsprpred_scorer(y_true, y_pred), -log_loss(y_true, np.squeeze(y_pred[0]))
)
## multi-class discrete scorer (_PredictScorer)
qsprpred_scorer = SklearnMetrics("accuracy")
self.assertEqual(
qsprpred_scorer(y_true, y_pred),
accuracy_score(y_true, np.argmax(y_pred[0], axis=1)),
)
## multi-class with threshold
y_true, y_pred = self.sample_data(ModelTasks.MULTICLASS, use_proba=True)
qsprpred_scorer = SklearnMetrics(
make_scorer(top_k_accuracy_score, needs_threshold=True, k=2)
)
self.assertEqual(
qsprpred_scorer(y_true, y_pred),
top_k_accuracy_score(y_true, y_pred[0], k=2),
)
## multi-task single class (same as multi-label in sklearn)
### proba
y_true, y_pred = self.sample_data(
ModelTasks.MULTITASK_SINGLECLASS, use_proba=True
)
qsprpred_scorer = SklearnMetrics("roc_auc_ovr")
y_pred_sklearn = np.array([y_pred[0][:, 1], y_pred[1][:, 1]]).T
self.assertEqual(
qsprpred_scorer(y_true, y_pred),
roc_auc_score(y_true, y_pred_sklearn, multi_class="ovr"),
)
### discrete
y_true, y_pred = self.sample_data(ModelTasks.MULTITASK_SINGLECLASS)
qsprpred_scorer = SklearnMetrics("accuracy")
self.assertEqual(
qsprpred_scorer(y_true, y_pred),
accuracy_score(y_true, y_pred),
)
[docs] def test_CalibrationError(self):
"""Test the calibration error metric."""
# test calibration error with single class probabilities
y_true, y_pred = self.sample_data(ModelTasks.SINGLECLASS, use_proba=True)
cal_error = CalibrationError()
# check assertion error is raised when n_bins > n_samples
with self.assertRaises(AssertionError):
cal_error(y_true, y_pred)
cal_error = CalibrationError(n_bins=5)
self.assertIsInstance(cal_error(y_true, y_pred), float)
# assert value error is raised when using discrete predictions
with self.assertRaises(TypeError):
cal_error(self.sample_data(ModelTasks.SINGLECLASS, use_proba=False))
[docs] def test_BEDROC(self):
"""Test the BEDROC metric."""
y_true, y_pred = self.sample_data(ModelTasks.SINGLECLASS, use_proba=True)
bedroc = BEDROC()
self.assertIsInstance(bedroc(y_true, y_pred), float)
[docs] def test_EnrichmentFactor(self):
"""Test the enrichment factor metric."""
y_true, y_pred = self.sample_data(ModelTasks.SINGLECLASS, use_proba=True)
enrichment = EnrichmentFactor()
self.assertIsInstance(enrichment(y_true, y_pred), float)
[docs] def test_RobustInitialEnhancement(self):
"""Test the robust initial enhancement metric."""
y_true, y_pred = self.sample_data(ModelTasks.SINGLECLASS, use_proba=True)
robust_enhancement = RobustInitialEnhancement()
self.assertIsInstance(robust_enhancement(y_true, y_pred), float)
[docs] def test_ConfusionMatrixMetrics(self):
y_true, y_pred = self.sample_data(ModelTasks.SINGLECLASS)
self.assertIsInstance(Prevalence()(y_true, y_pred), float)
self.assertIsInstance(Specificity()(y_true, y_pred), float)
self.assertIsInstance(Sensitivity()(y_true, y_pred), float)
self.assertIsInstance(NegativePredictivity()(y_true, y_pred), float)
self.assertIsInstance(PositivePredictivity()(y_true, y_pred), float)
self.assertIsInstance(CohenKappa()(y_true, y_pred), float)
self.assertIsInstance(BalancedNegativePredictivity()(y_true, y_pred), float)
self.assertIsInstance(BalancedPositivePredictivity()(y_true, y_pred), float)
self.assertIsInstance(BalancedMatthewsCorrcoeff()(y_true, y_pred), float)
self.assertIsInstance(BalancedCohenKappa()(y_true, y_pred), float)
[docs] def test_Pearson(self):
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
Pearson()(y_true, y_pred)
[docs] def test_Kendall(self):
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
Kendall()(y_true, y_pred)
[docs] def test_KPrimeSlope(self):
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
KPrimeSlope()(y_true, y_pred)
[docs] def test_KSlope(self):
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
KSlope()(y_true, y_pred)
[docs] def test_R20(self):
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
R20()(y_true, y_pred)
[docs] def test_RPrime20(self):
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
RPrime20()(y_true, y_pred)
[docs] def test_FoldErrorMetrics(self):
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
self.assertIsInstance(AbsoluteAverageFoldError()(y_true, y_pred), float)
self.assertIsInstance(AverageFoldError()(y_true, y_pred), float)
self.assertIsInstance(PercentageWithinFoldError()(y_true, y_pred), float)
[docs] def test_MaskedMetric(self):
y_true, y_pred = self.sample_data(ModelTasks.REGRESSION)
y_true[1] = None
metric = MaskedMetric(SklearnMetrics("neg_mean_squared_error"))
self.assertIsInstance(metric(y_true, y_pred), float)
y_true, y_pred = self.sample_data(ModelTasks.MULTITASK_SINGLECLASS,
use_proba=False)
y_true = y_true.astype(float)
y_true[0, :] = None
y_true[1, 1] = None
y_true, y_pred = self.sample_data(ModelTasks.MULTITASK_SINGLECLASS,
use_proba=True)
y_true = y_true.astype(float)
y_true[0, :] = None
y_true[1, 1] = None
[docs]class TestEarlyStopping(ModelDataSetsPathMixIn, TestCase):
[docs] def setUp(self):
super().setUp()
self.setUpPaths()
[docs] def test_earlyStoppingMode(self):
"""Test the early stopping mode enum."""
# get the enum values
recording = EarlyStoppingMode.RECORDING
not_recording = EarlyStoppingMode.NOT_RECORDING
fixed = EarlyStoppingMode.FIXED
optimal = EarlyStoppingMode.OPTIMAL
# check if the enum bools are correct
self.assertTrue(recording and not_recording)
self.assertFalse(fixed)
self.assertFalse(optimal)
# check if the string representation is correct
self.assertTrue("RECORDING" == str(recording))
self.assertTrue("NOT_RECORDING" == str(not_recording))
self.assertTrue("FIXED" == str(fixed))
self.assertTrue("OPTIMAL" == str(optimal))
[docs] def test_EarlyStopping(self):
"""Test the early stopping class."""
earlystopping = EarlyStopping(EarlyStoppingMode.RECORDING)
# check value error is raised when calling optimal epochs without recording
with self.assertRaises(ValueError):
print(earlystopping.optimalEpochs)
# record some epochs
earlystopping.recordEpochs(10)
earlystopping.recordEpochs(20)
earlystopping.recordEpochs(30)
earlystopping.recordEpochs(40)
earlystopping.recordEpochs(60)
# check if epochs are recorded correctly
self.assertEqual(earlystopping.trainedEpochs, [10, 20, 30, 40, 60])
# check if the optimal epochs are correct
self.assertEqual(earlystopping.optimalEpochs, 32)
# check if optimal epochs are correct when using a different aggregate function
earlystopping.aggregateFunc = np.median
self.assertEqual(earlystopping.optimalEpochs, 30)
# check set num epochs manually
earlystopping.numEpochs = 100
self.assertEqual(earlystopping.numEpochs, 100)
# check correct epochs are returned when using fixed mode and optimal mode
earlystopping.mode = EarlyStoppingMode.FIXED
self.assertEqual(earlystopping.getEpochs(), 100)
earlystopping.mode = EarlyStoppingMode.OPTIMAL
self.assertEqual(earlystopping.getEpochs(), 30)
# check saving
earlystopping.toFile(f"{self.inputBasePath}/earlystopping.json")
self.assertTrue(os.path.exists(f"{self.inputBasePath}/earlystopping.json"))
# check loading
earlystopping2 = EarlyStopping.fromFile(
f"{self.inputBasePath}/earlystopping.json"
)
self.assertEqual(earlystopping2.trainedEpochs, [10, 20, 30, 40, 60])
self.assertEqual(earlystopping2.mode, EarlyStoppingMode.OPTIMAL)
self.assertEqual(earlystopping2.aggregateFunc, np.median)
self.assertEqual(earlystopping2.optimalEpochs, 30)
self.assertEqual(earlystopping.numEpochs, 100)
[docs] def test_early_stopping_decorator(self):
"""Test the early stopping decorator."""
class test_class:
def __init__(self, support=True):
self.earlyStopping = EarlyStopping(EarlyStoppingMode.RECORDING)
self.supportsEarlyStopping = support
@early_stopping
def test_func(
self,
X,
y,
estimator=None,
mode=EarlyStoppingMode.NOT_RECORDING,
split=None,
monitor=None,
**kwargs,
):
return None, kwargs["best_epoch"]
test_obj = test_class()
recording = EarlyStoppingMode.RECORDING
not_recording = EarlyStoppingMode.NOT_RECORDING
# epochs are recorded as self.earlyStopping.mode is set to RECORDING
_ = test_obj.test_func(None, None, None, best_epoch=29)
# epochs are not recorded as mode is set to NOT_RECORDING in the decorator
_ = test_obj.test_func(None, None, None, not_recording, best_epoch=49)
self.assertEqual(test_obj.earlyStopping.mode, not_recording)
# epochs are recorded as mode is now set to RECORDING in the decorator
_ = test_obj.test_func(None, None, None, recording, best_epoch=39)
self.assertEqual(test_obj.earlyStopping.mode, recording)
# Check if the best epochs are recorded with mode RECORDING using the decorator
self.assertEqual(test_obj.earlyStopping.optimalEpochs, 35)
# Check if the best epochs are not recorded with other modes using the decorator
test_obj.earlyStopping.mode = EarlyStoppingMode.FIXED
_ = test_obj.test_func(None, None, None, best_epoch=49)
self.assertEqual(test_obj.earlyStopping.optimalEpochs, 35)
test_obj.earlyStopping.mode = EarlyStoppingMode.OPTIMAL
_ = test_obj.test_func(None, None, None, best_epoch=59)
self.assertEqual(test_obj.earlyStopping.optimalEpochs, 35)
test_obj.earlyStopping.mode = EarlyStoppingMode.NOT_RECORDING
_ = test_obj.test_func(None, None, None, best_epoch=69)
self.assertEqual(test_obj.earlyStopping.optimalEpochs, 35)
# check decorator raises error when early stopping is not supported
test_obj = test_class(support=False)
with self.assertRaises(AssertionError):
_ = test_obj.test_func(None, None, None, best_epoch=40)
[docs]class TestMonitors(MonitorsCheckMixIn, TestCase):
[docs] def setUp(self):
super().setUp()
self.setUpPaths()
[docs] def testBaseMonitor(self):
"""Test the base monitor"""
model = SklearnModel(
base_dir=self.generatedModelsPath,
alg=RandomForestRegressor,
name="RFR",
random_state=42,
)
self.runMonitorTest(
model,
self.createLargeTestDataSet(preparation_settings=self.getDefaultPrep()),
BaseMonitor,
self.baseMonitorTest,
False,
)
[docs] def testFileMonitor(self):
"""Test the file monitor"""
model = SklearnModel(
base_dir=self.generatedModelsPath,
alg=RandomForestRegressor,
name="RFR",
random_state=42,
)
self.runMonitorTest(
model,
self.createLargeTestDataSet(preparation_settings=self.getDefaultPrep()),
FileMonitor,
self.fileMonitorTest,
False,
)
[docs] def testListMonitor(self):
"""Test the list monitor"""
model = SklearnModel(
base_dir=self.generatedModelsPath,
alg=RandomForestRegressor,
name="RFR",
random_state=42,
)
self.runMonitorTest(
model,
self.createLargeTestDataSet(preparation_settings=self.getDefaultPrep()),
ListMonitor,
self.listMonitorTest,
False,
[BaseMonitor(), FileMonitor()],
)
[docs]class TestAttachedApplicabilityDomain(ModelDataSetsPathMixIn, QSPRTestCase):
[docs] def setUp(self):
super().setUp()
self.setUpPaths()
[docs] def testAttachedApplicabilityDomain(self):
"""Test the attached applicability domain class."""
# initialize test dataset with attached applicability domain
dataset = self.createLargeTestDataSet(
target_props=[{
"name": "CL",
"task": "REGRESSION"
}],
preparation_settings={
**self.getDefaultPrep(),
"applicability_domain":
KNNApplicabilityDomain(dist="euclidean", alpha=0.9, scaling=None),
},
)
# initialize model for training
model = SklearnModel(
base_dir=self.generatedModelsPath,
alg=RandomForestRegressor,
name="RFR_with_AD",
parameters={"n_jobs": self.nCPU},
random_state=42,
)
model.fitDataset(dataset)
# check if the applicability domain is attached to the model
self.assertTrue(hasattr(model, "applicabilityDomain"))
self.assertIsInstance(model.applicabilityDomain, MLChemADWrapper)
# check if the applicability domain is saved and loaded correctly
model.save()
model2 = SklearnModel.fromFile(model.metaFile)
self.assertTrue(hasattr(model2, "applicabilityDomain"))
self.assertIsInstance(model2.applicabilityDomain, MLChemADWrapper)
# make predictions with mlchemad ap on the dataset directly
comparison_ap = KNNApplicabilityDomain(
dist="euclidean", alpha=0.9, scaling=None
)
features = dataset.getFeatures(
concat=True, ordered=True, refit_standardizer=False
)
comparison_ap.fit(features)
ap_pred = comparison_ap.contains(features)
# check if the applicability domain predictions from the dataset are equal to the ones from the model
_, ap_preds_model = model.predictMols(
dataset.df["SMILES"], use_applicability_domain=True
)
self.assertTrue(np.array_equal(ap_pred, ap_preds_model))
# check if the applicability domain predictions arrays are equal after saving and loading
_, ap_preds_model2 = model2.predictMols(
dataset.df["SMILES"], use_applicability_domain=True
)
self.assertTrue(np.array_equal(ap_pred, ap_preds_model2))