Source code for qsprpred.utils.testing.check_mixins

import logging
import os
from copy import deepcopy
from os.path import exists
from typing import Literal

import numpy as np
import pandas as pd
from sklearn.model_selection import KFold

from .path_mixins import ModelDataSetsPathMixIn
from ... import TargetTasks
from ...data import QSPRDataset
from ...data.descriptors.sets import DescriptorSet
from ...data.processing.feature_standardizers import SKLearnStandardizer
from ...models import (
    QSPRModel,
    OptunaOptimization,
    CrossValAssessor,
    EarlyStoppingMode,
    SklearnMetrics,
    GridSearchOptimization,
    TestSetAssessor,
    HyperparameterOptimizationMonitor,
    AssessorMonitor,
    FitMonitor,
    BaseMonitor,
    FileMonitor,
    HyperparameterOptimization,
)
from ...models.monitors import ListMonitor
from ...tasks import TargetProperty


[docs]class DescriptorCheckMixIn: """Mixin class for common descriptor checks."""
[docs] def checkFeatures(self, ds: QSPRDataset, expected_length: int): """Check if the feature names and the feature matrix of a data set is consistent with expected number of variables. Args: ds (QSPRDataset): The data set to check. expected_length (int): The expected number of features. Raises: AssertionError: If the feature names or the feature matrix is not consistent """ self.assertEqual(len(ds.featureNames), expected_length) self.assertEqual(len(ds.getFeatureNames()), expected_length) if expected_length > 0: features = ds.getFeatures(concat=True) else: self.assertRaises(ValueError, ds.getFeatures, concat=True) features = pd.concat([ds.X, ds.X_ind]) self.assertEqual(features.shape[0], len(ds)) self.assertEqual(features.shape[1], expected_length) self.assertEqual(ds.X.shape[1], expected_length) self.assertEqual(ds.X_ind.shape[1], expected_length) if expected_length > 0: for fold in ds.iterFolds(split=KFold(n_splits=5)): self.assertIsInstance(fold, tuple) self.assertEqual(fold[0].shape[1], expected_length) self.assertEqual(fold[1].shape[1], expected_length) else: self.assertRaises( ValueError, lambda: list(ds.iterFolds(split=KFold(n_splits=5))) ) # check if outliers are dropped if "TestOutlier" in ds.df.columns: num_dropped = ds.df.TestOutlier.sum() # expected number of samples is the total number of samples minus the number # of samples in the training set, minus the number of dropped expected_num_samples = len(ds) - (len(ds.X)) - num_dropped X, X_ind = ds.getFeatures(concat=False) self.assertEqual(X_ind.shape[0], expected_num_samples)
[docs] def checkDescriptors( self, dataset: QSPRDataset, target_props: list[dict | TargetProperty] ): """ Check if information about descriptors is consistent in the data set. Checks if calculators are consistent with the descriptors contained in the data set. This is tested also before and after serialization. Args: dataset (QSPRDataset): The data set to check. target_props (List of dicts or TargetProperty): list of target properties Raises: AssertionError: If the consistency check fails. """ # test some basic consistency rules on the resulting features expected_length = 0 for calc in dataset.descriptorSets: expected_length += len(calc.descriptors) self.checkFeatures(dataset, expected_length) # save to file, check if it can be loaded, and if the features are consistent dataset.save() ds_loaded = dataset.__class__.fromFile(dataset.metaFile) self.assertEqual(ds_loaded.nJobs, dataset.nJobs) self.assertEqual(ds_loaded.chunkSize, dataset.chunkSize) self.assertEqual(ds_loaded.randomState, dataset.randomState) for ds_loaded_prop, target_prop in zip( ds_loaded.targetProperties, target_props ): if ds_loaded_prop.task.isClassification(): self.assertEqual(ds_loaded_prop.name, target_prop["name"]) self.assertEqual(ds_loaded_prop.task, target_prop["task"]) self.assertTrue(ds_loaded.descriptorSets) for calc in ds_loaded.descriptors: calc = calc.calculator self.assertTrue(isinstance(calc, DescriptorSet)) self.checkFeatures(dataset, expected_length)
[docs]class DataPrepCheckMixIn(DescriptorCheckMixIn): """Mixin for testing data preparation."""
[docs] def checkPrep( self, dataset, feature_calculators, split, feature_standardizer, feature_filter, data_filter, applicability_domain, expected_target_props, ): """Check the consistency of the dataset after preparation.""" name = dataset.name # if a split needs a dataset, give it one if split and hasattr(split, "setDataSet"): split.setDataSet(None) self.assertRaises(ValueError, split.getDataSet) split.setDataSet(dataset) self.assertEqual(dataset, split.getDataSet()) # prepare the dataset and check consistency dataset.prepareDataset( feature_calculators=feature_calculators, split=split if split else None, feature_standardizer=feature_standardizer if feature_standardizer else None, feature_filters=[feature_filter] if feature_filter else None, data_filters=[data_filter] if data_filter else None, applicability_domain=applicability_domain, drop_outliers=True if applicability_domain is not None else False, ) expected_feature_count = len(dataset.featureNames) original_features = dataset.featureNames train, test = dataset.getFeatures() self.checkFeatures(dataset, expected_feature_count) # save the dataset dataset.save() # reload the dataset and check consistency again dataset = dataset.__class__.fromFile(dataset.metaFile) self.assertEqual(dataset.name, name) self.assertEqual(dataset.targetProperties[0].task, TargetTasks.REGRESSION) for idx, prop in enumerate(expected_target_props): self.assertEqual(dataset.targetProperties[idx].name, prop) for calc in dataset.descriptors: calc = calc.calculator self.assertIsInstance(calc, DescriptorSet) if feature_standardizer is not None: self.assertIsInstance(dataset.featureStandardizer, SKLearnStandardizer) else: self.assertIsNone(dataset.featureStandardizer) self.checkFeatures(dataset, expected_feature_count) # verify prep results are the same after reloading dataset.prepareDataset( feature_calculators=feature_calculators, split=split if split else None, feature_standardizer=feature_standardizer if feature_standardizer else None, feature_filters=[feature_filter] if feature_filter else None, data_filters=[data_filter] if data_filter else None, applicability_domain=applicability_domain, drop_outliers=True if applicability_domain is not None else False, ) self.checkFeatures(dataset, expected_feature_count) self.assertListEqual(sorted(dataset.featureNames), sorted(original_features)) train2, test2 = dataset.getFeatures() self.assertTrue(train.index.equals(train2.index)) self.assertTrue(test.index.equals(test2.index))
[docs]class DescriptorInDataCheckMixIn(DescriptorCheckMixIn): """Mixin for testing descriptor sets in data sets."""
[docs] @staticmethod def getDatSetName(desc_set, target_props): """Get a unique name for a data set.""" target_props_id = [ f"{target_prop['name']}_{target_prop['task']}" for target_prop in target_props ] return f"{desc_set}_{target_props_id}"
[docs] def checkDataSetContainsDescriptorSet( self, dataset, desc_set, prep_combo, target_props ): """Check if a descriptor set is in a data set.""" # run the preparation logging.debug(f"Testing descriptor set: {desc_set} in data set: {dataset.name}") preparation = {} preparation.update(prep_combo) preparation["feature_calculators"] = [desc_set] dataset.prepareDataset(**preparation) # test consistency self.checkDescriptors(dataset, target_props)
[docs]class ModelCheckMixIn: """This class holds the tests for the QSPRmodel class.""" @property def gridFile(self): return f"{os.path.dirname(__file__)}/test_files/search_space_test.json"
[docs] def getParamGrid(self, model: QSPRModel, grid: str) -> dict: """Get the parameter grid for a model. Args: model (QSPRModel): The model to get the parameter grid for. grid (str): The grid type to get the parameter grid for. Returns: dict: The parameter grid. """ mname = model.name.split("_")[0] grid_params = model.__class__.loadParamsGrid(self.gridFile, grid, mname) return grid_params[grid_params[:, 0] == mname, 1][0]
[docs] def checkOptimization( self, model: QSPRModel, ds: QSPRDataset, optimizer: HyperparameterOptimization ): model_path, est_path = model.save(save_estimator=True) # get last modified time stamp of the model file model_last_modified = os.path.getmtime(est_path) best_params = optimizer.optimize(model, ds) for param in best_params: self.assertEqual(best_params[param], model.parameters[param]) new_time_modified = os.path.getmtime(est_path) self.assertTrue(model_last_modified < new_time_modified) optimizer.optimize(model, ds, refit_optimal=True) model_last_modified = new_time_modified new_time_modified = os.path.getmtime(est_path) self.assertTrue(model_last_modified < new_time_modified) model_new = model.__class__.fromFile(model.metaFile) for param in model.parameters: self.assertEqual(model_new.parameters[param], model.parameters[param])
[docs] def fitTest(self, model: QSPRModel, ds: QSPRDataset): """Test model fitting, optimization and evaluation. Args: model (QSPRModel): The model to test. ds (QSPRDataset): The dataset to use for testing. """ # perform bayes optimization model.initFromDataset(ds) score_func = "r2" if model.task.isRegression() else "roc_auc_ovr" search_space_bs = self.getParamGrid(model, "bayes") bayesoptimizer = OptunaOptimization( param_grid=search_space_bs, n_trials=1, model_assessor=CrossValAssessor( scoring=score_func, mode=EarlyStoppingMode.NOT_RECORDING ), ) self.checkOptimization(model, ds, bayesoptimizer) model.cleanFiles() # perform grid search search_space_gs = self.getParamGrid(model, "grid") if model.task.isClassification(): score_func = SklearnMetrics("accuracy") gridsearcher = GridSearchOptimization( param_grid=search_space_gs, score_aggregation=np.median, model_assessor=TestSetAssessor( scoring=score_func, use_proba=False, mode=EarlyStoppingMode.NOT_RECORDING, ), ) self.checkOptimization(model, ds, gridsearcher) model.cleanFiles() # perform crossvalidation score_func = "r2" if model.task.isRegression() else "roc_auc_ovr" n_folds = 5 scores = CrossValAssessor( mode=EarlyStoppingMode.RECORDING, scoring=score_func, split_multitask_scores=model.isMultiTask, split=KFold(n_splits=n_folds, shuffle=True, random_state=model.randomState), )(model, ds) if model.isMultiTask: self.assertEqual(scores.shape, (n_folds, len(model.targetProperties))) scores = TestSetAssessor( mode=EarlyStoppingMode.NOT_RECORDING, scoring=score_func, split_multitask_scores=model.isMultiTask, )(model, ds) if model.isMultiTask: self.assertEqual(scores.shape, (len(model.targetProperties),)) self.assertTrue(exists(f"{model.outDir}/{model.name}.ind.tsv")) self.assertTrue(exists(f"{model.outDir}/{model.name}.cv.tsv")) # train the model on all data path = model.fitDataset(ds) self.assertTrue(exists(path)) self.assertTrue(exists(model.metaFile)) self.assertEqual(path, model.metaFile)
[docs] def predictorTest( self, model: QSPRModel, dataset: QSPRDataset, comparison_model: QSPRModel | None = None, expect_equal_result=True, **pred_kwargs, ): """Test model predictions. Checks if the shape of the predictions is as expected and if the predictions of the predictMols function are consistent with the predictions of the predict/predictProba functions. Also checks if the predictions of the model are the same as the predictions of the comparison model if given. Args: model (QSPRModel): The model to make predictions with. dataset (QSPRDataset): The dataset to make predictions for. comparison_model (QSPRModel): another model to compare the predictions with. expect_equal_result (bool): Whether the expected result should be equal or not equal to the predictions of the comparison model. **pred_kwargs: Extra keyword arguments to pass to the predictor's `predictMols` method. """ # define checks of the shape of the predictions def check_shape(predictions, model, num_smiles, use_probas): if model.task.isClassification() and use_probas: # check predictions are a list of arrays of shape (n_smiles, n_classes) self.assertEqual(len(predictions), len(model.targetProperties)) for i in range(len(model.targetProperties)): self.assertEqual( predictions[i].shape, (num_smiles, model.targetProperties[i].nClasses), ) else: # check predictions are an array of shape (n_smiles, n_targets) self.assertEqual( predictions.shape, (num_smiles, len(model.targetProperties)), ) # define check for comparing predictions with expected result def check_predictions(preds, expected, expect_equal): # check if predictions are almost equal to expected result (rtol=1e-5) check_outcome = self.assertTrue if expect_equal else self.assertFalse if isinstance(expected, list): for i in range(len(expected)): check_outcome(np.allclose(preds[i], expected[i])) else: check_outcome(np.allclose(preds, expected)) # Check if the predictMols function gives the same result as the # predict/predictProba function # get the expected result from the basic predict function features = dataset.getFeatures( concat=True, ordered=True, refit_standardizer=False ) expected_result = model.predict(features) # make predictions with the predictMols function and check with previous result smiles = list(dataset.smiles) num_smiles = len(smiles) predictions = model.predictMols(smiles, use_probas=False, **pred_kwargs) check_shape(predictions, model, num_smiles, use_probas=False) check_predictions(predictions, expected_result, True) # do the same for the predictProba function predictions_proba = None if model.task.isClassification(): expected_result_proba = model.predictProba(features) predictions_proba = model.predictMols( smiles, use_probas=True, **pred_kwargs ) check_shape(predictions_proba, model, len(smiles), use_probas=True) check_predictions(predictions_proba, expected_result_proba, True) # check if the predictions are (not) the same as of the comparison model if comparison_model is not None: predictions_comparison = comparison_model.predictMols( smiles, use_probas=False, **pred_kwargs ) check_predictions(predictions, predictions_comparison, expect_equal_result) if predictions_proba is not None: predictions_comparison_proba = comparison_model.predictMols( smiles, use_probas=True, **pred_kwargs ) check_predictions( predictions_proba, predictions_comparison_proba, expect_equal_result )
[docs]class MonitorsCheckMixIn(ModelDataSetsPathMixIn, ModelCheckMixIn):
[docs] def trainModelWithMonitoring( self, model: QSPRModel, ds: QSPRDataset, hyperparam_monitor: HyperparameterOptimizationMonitor, crossval_monitor: AssessorMonitor, test_monitor: AssessorMonitor, fit_monitor: FitMonitor, ) -> ( HyperparameterOptimizationMonitor, AssessorMonitor, AssessorMonitor, FitMonitor, ): score_func = ( "r2" if ds.targetProperties[0].task.isRegression() else "roc_auc_ovr" ) search_space_gs = self.getParamGrid(model, "grid") gridsearcher = GridSearchOptimization( param_grid=search_space_gs, model_assessor=CrossValAssessor( scoring=score_func, mode=EarlyStoppingMode.NOT_RECORDING, ), monitor=hyperparam_monitor, ) best_params = gridsearcher.optimize(model, ds) model.setParams(best_params) model.save() # perform crossvalidation CrossValAssessor( mode=EarlyStoppingMode.RECORDING, scoring=score_func, monitor=crossval_monitor, )(model, ds) TestSetAssessor( mode=EarlyStoppingMode.NOT_RECORDING, scoring=score_func, monitor=test_monitor, )(model, ds) # train the model on all data model.fitDataset(ds, monitor=fit_monitor) return hyperparam_monitor, crossval_monitor, test_monitor, fit_monitor
[docs] def baseMonitorTest( self, monitor: BaseMonitor, monitor_type: Literal["hyperparam", "crossval", "test", "fit"], neural_net: bool, ): """Test the base monitor.""" def check_fit_empty(monitor): self.assertEqual(len(monitor.fitLog), 0) self.assertEqual(len(monitor.batchLog), 0) self.assertIsNone(monitor.currentEpoch) self.assertIsNone(monitor.currentBatch) self.assertIsNone(monitor.bestEstimator) self.assertIsNone(monitor.bestEpoch) def check_assessment_empty(monitor): self.assertIsNone(monitor.assessmentModel) self.assertIsNone(monitor.asssessmentDataset) self.assertDictEqual(monitor.foldData, {}) self.assertIsNone(monitor.predictions) self.assertDictEqual(monitor.estimators, {}) def check_hyperparam_monitor(monitor): # calculate number of iterations from config n_iter = np.prod([len(v) for v in monitor.config["param_grid"].values()]) self.assertGreater(n_iter, 0) self.assertEqual(len(monitor.assessments), n_iter) self.assertEqual(len(monitor.parameters), n_iter) self.assertEqual(monitor.scores.shape, (n_iter, 2)) # agg score + scores self.assertEqual( max(monitor.scores.aggregated_score), monitor.bestScore, ) self.assertDictEqual( monitor.bestParameters, monitor.parameters[monitor.scores.aggregated_score.argmax()], ) check_assessment_empty(monitor) check_fit_empty(monitor) def check_assessor_monitor(monitor, n_folds, len_y): self.assertEqual( monitor.predictions.shape, (len_y, 3 if n_folds > 1 else 2), # labels + preds (+ fold) ) self.assertEqual(len(monitor.foldData), n_folds) self.assertEqual(len(monitor.fits), n_folds) self.assertEqual(len(monitor.estimators), n_folds) check_fit_empty(monitor) def check_fit_monitor(monitor): self.assertGreater(len(monitor.fitLog), 0) self.assertGreater(len(monitor.batchLog), 0) self.assertTrue(isinstance(monitor.bestEstimator, monitor.fitModel.alg)) self.assertIsNotNone(monitor.currentEpoch) self.assertIsNotNone(monitor.currentBatch) if monitor_type == "hyperparam": check_hyperparam_monitor(monitor) elif monitor_type == "crossval": check_assessor_monitor(monitor, 5, len(monitor.assessmentDataset.y)) elif monitor_type == "test": check_assessor_monitor(monitor, 1, len(monitor.assessmentDataset.y_ind)) elif monitor_type == "fit": if neural_net: check_fit_monitor(monitor) else: check_fit_empty(monitor) else: raise ValueError(f"Unknown monitor type {monitor_type}")
[docs] def fileMonitorTest( self, monitor: FileMonitor, monitor_type: Literal["hyperparam", "crossval", "test", "fit"], neural_net: bool, ): """Test if the correct files are generated""" def check_fit_files(path): self.assertTrue(os.path.exists(f"{path}/fit_log.tsv")) self.assertTrue(os.path.exists(f"{path}/batch_log.tsv")) def check_assessment_files(path, monitor): output_path = f"{path}/{monitor.assessmentType}" self.assertTrue(os.path.exists(output_path)) self.assertTrue( os.path.exists( f"{output_path}/{monitor.assessmentType}_predictions.tsv" ) ) if monitor.saveFits and neural_net: for fold in monitor.foldData: check_fit_files(f"{output_path}/fold_{fold}") def check_hyperparam_files(path, monitor): output_path = f"{path}/GridSearchOptimization" self.assertTrue(os.path.exists(output_path)) self.assertTrue( os.path.exists(f"{output_path}/GridSearchOptimization_scores.tsv") ) if monitor.saveAssessments: for assessment in monitor.assessments: check_assessment_files( f"{output_path}/iteration_{assessment}", monitor ) if monitor_type == "hyperparam": check_hyperparam_files(monitor.outDir, monitor) elif monitor_type in ["crossval", "test"]: check_assessment_files(monitor.outDir, monitor) elif monitor_type == "fit" and neural_net: check_fit_files(monitor.outDir)
[docs] def listMonitorTest( self, monitor: ListMonitor, monitor_type: Literal["hyperparam", "crossval", "test", "fit"], neural_net: bool, ): self.baseMonitorTest(monitor.monitors[0], monitor_type, neural_net) self.fileMonitorTest(monitor.monitors[1], monitor_type, neural_net)
[docs] def runMonitorTest( self, model, data, monitor_type, test_method, nerual_net, *args, **kwargs ): hyperparam_monitor = monitor_type(*args, **kwargs) crossval_monitor = deepcopy(hyperparam_monitor) test_monitor = deepcopy(hyperparam_monitor) fit_monitor = deepcopy(hyperparam_monitor) ( hyperparam_monitor, crossval_monitor, test_monitor, fit_monitor, ) = self.trainModelWithMonitoring( model, data, hyperparam_monitor, crossval_monitor, test_monitor, fit_monitor ) test_method(hyperparam_monitor, "hyperparam", nerual_net) test_method(crossval_monitor, "crossval", nerual_net) test_method(test_monitor, "test", nerual_net) test_method(fit_monitor, "fit", nerual_net)