import numpy as np
from parameterized import parameterized
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from ..descriptors.fingerprints import MorganFP
from ...data import (
RandomSplit,
TemporalSplit,
BootstrapSplit,
ScaffoldSplit,
ClusterSplit,
QSPRDataset,
)
from ...data.chem.clustering import (
FPSimilarityLeaderPickerClusters,
FPSimilarityMaxMinClusters,
)
from ...data.chem.scaffolds import BemisMurckoRDKit, BemisMurcko
from ...data.sampling.folds import FoldsFromDataSplit
from ...data.sampling.splits import ManualSplit
from ...utils.testing.base import QSPRTestCase
from ...utils.testing.path_mixins import DataSetsPathMixIn
[docs]class TestDataSplitters(DataSetsPathMixIn, QSPRTestCase):
"""Small tests to only check if the data splitters work on their own.
The tests here should be used to check for all their specific parameters and edge
cases."""
[docs] def setUp(self):
super().setUp()
self.setUpPaths()
self.splitYear = 2000
[docs] def testManualSplit(self):
"""Test the manual split function, where the split is done manually."""
dataset = self.createLargeTestDataSet()
dataset.nJobs = self.nCPU
dataset.chunkSize = self.chunkSize
# Add extra column to the data frame to use for splitting
dataset.df["split"] = "train"
dataset.df.loc[dataset.df.sample(frac=0.1).index, "split"] = "test"
split = ManualSplit(dataset.df["split"], "train", "test")
dataset.prepareDataset(split=split)
self.validate_split(dataset)
@parameterized.expand(
[
(False,),
(True,),
]
)
def testRandomSplit(self, multitask):
"""Test the random split function."""
if multitask:
dataset = self.createLargeMultitaskDataSet()
else:
dataset = self.createLargeTestDataSet()
dataset = self.createLargeTestDataSet()
dataset.prepareDataset(split=RandomSplit(test_fraction=0.1))
self.validate_split(dataset)
@parameterized.expand(
[
(False,),
(True,),
]
)
def testTemporalSplit(self, multitask):
"""Test the temporal split function, where the split is done based on a time
property."""
if multitask:
dataset = self.createLargeMultitaskDataSet()
else:
dataset = self.createLargeTestDataSet()
split = TemporalSplit(
timesplit=self.splitYear,
timeprop="Year of first disclosure",
)
# prepare and validate the split
dataset.prepareDataset(split=split)
self.validate_split(dataset)
# test if dates higher than 2000 are in test set
test_set = dataset.getFeatures()[1]
years = dataset.getDF().loc[test_set.index, "Year of first disclosure"]
self.assertTrue(all(years > self.splitYear))
# test bootstrapping
if multitask:
dataset = self.createLargeMultitaskDataSet(
name="TemporalSplit_bootstrap_mt"
)
else:
dataset = self.createLargeTestDataSet(name="TemporalSplit_bootstrap")
split = TemporalSplit(
timesplit=[self.splitYear - 1, self.splitYear, self.splitYear + 1],
timeprop="Year of first disclosure",
)
bootstrap_split = BootstrapSplit(
split=split,
n_bootstraps=10,
)
for time, fold_info in zip(
split.timeSplit, list(dataset.iterFolds(bootstrap_split))
):
years = dataset.getDF().loc[fold_info[1].index, "Year of first disclosure"]
self.assertTrue(all(years > time))
@parameterized.expand(
[
(False, BemisMurckoRDKit(), None),
(
False,
BemisMurcko(use_csk=True),
["ScaffoldSplit_000", "ScaffoldSplit_001"],
),
(True, BemisMurckoRDKit(), None),
]
)
def testScaffoldSplit(self, multitask, scaffold, custom_test_list):
"""Test the scaffold split function."""
if multitask:
dataset = self.createLargeMultitaskDataSet(name="ScaffoldSplit")
else:
dataset = self.createLargeTestDataSet(name="ScaffoldSplit")
split = ScaffoldSplit(
scaffold=scaffold,
custom_test_list=custom_test_list,
)
dataset.prepareDataset(split=split)
self.validate_split(dataset)
# check that smiles in custom_test_list are in the test set
if custom_test_list:
self.assertTrue(
all(mol_id in dataset.X_ind.index for mol_id in custom_test_list)
)
# check folding by scaffold
if multitask:
dataset = self.createLargeMultitaskDataSet(name="ScaffoldSplit_folding_mt")
else:
dataset = self.createLargeTestDataSet(name="ScaffoldSplit_folding")
n_folds = 5
split = ScaffoldSplit(
scaffold=scaffold,
custom_test_list=custom_test_list,
n_folds=n_folds,
)
test_index_all = []
for k, (X_train, X_test, y_train, y_test, train_index, test_index) in enumerate(
dataset.iterFolds(split)
):
self.assertTrue(all(x not in test_index_all for x in test_index))
self.assertTrue(len(X_train) > len(X_test))
test_index_all.extend(X_test.index.tolist())
self.assertEqual(k, n_folds - 1)
self.assertEqual(len(test_index_all), len(dataset.getFeatures(concat=True)))
@parameterized.expand(
[
(
False,
FPSimilarityLeaderPickerClusters(
fp_calculator=MorganFP(radius=2, nBits=128)
),
None,
),
(
False,
FPSimilarityMaxMinClusters(fp_calculator=MorganFP(radius=2, nBits=128)),
["ClusterSplit_000", "ClusterSplit_001"],
),
(
True,
FPSimilarityMaxMinClusters(fp_calculator=MorganFP(radius=2, nBits=128)),
None,
),
(
True,
FPSimilarityLeaderPickerClusters(
fp_calculator=MorganFP(radius=2, nBits=128)
),
["ClusterSplit_000", "ClusterSplit_001"],
),
]
)
def testClusterSplit(self, multitask, clustering_algorithm, custom_test_list):
"""Test the cluster split function."""
if multitask:
dataset = self.createLargeMultitaskDataSet(name="ClusterSplit")
else:
dataset = self.createLargeTestDataSet(name="ClusterSplit")
split = ClusterSplit(
clustering=clustering_algorithm,
custom_test_list=custom_test_list,
time_limit_seconds=10,
)
dataset.prepareDataset(split=split)
self.validate_split(dataset)
# check that smiles in custom_test_list are in the test set
if custom_test_list:
self.assertTrue(
all(mol_id in dataset.X_ind.index for mol_id in custom_test_list)
)
[docs] def testSerialization(self):
"""Test the serialization of dataset with datasplit."""
dataset = self.createLargeTestDataSet()
split = ScaffoldSplit()
n_bits = 128
dataset.prepareDataset(
split=split,
feature_calculators=[MorganFP(radius=3, nBits=n_bits)],
feature_standardizer=StandardScaler(),
)
self.validate_split(dataset)
test_ids = dataset.X_ind.index.values
train_ids = dataset.y_ind.index.values
dataset.save()
dataset_new = QSPRDataset.fromFile(dataset.metaFile)
self.validate_split(dataset_new)
self.assertTrue(dataset_new.descriptorSets)
self.assertTrue(dataset_new.featureStandardizer)
self.assertTrue(len(dataset_new.featureNames) == n_bits)
self.assertTrue(all(mol_id in dataset_new.X_ind.index for mol_id in test_ids))
self.assertTrue(all(mol_id in dataset_new.y_ind.index for mol_id in train_ids))
dataset_new.clearFiles()
[docs]class TestFoldSplitters(DataSetsPathMixIn, QSPRTestCase):
"""Small tests to only check if the fold splitters work on their own.
The tests here should be used to check for all their specific parameters and
edge cases."""
[docs] def setUp(self):
super().setUp()
self.setUpPaths()
[docs] def validateFolds(self, folds, more=None):
"""Check if the folds have the data they should have after splitting."""
k = 0
tested_indices = []
for (
X_train,
X_test,
y_train,
y_test,
train_index,
test_index,
) in folds:
k += 1
self.assertEqual(len(X_train), len(y_train))
self.assertEqual(len(X_test), len(y_test))
self.assertEqual(len(train_index), len(y_train))
self.assertEqual(len(test_index), len(y_test))
tested_indices.extend(X_test.index.tolist())
if more:
more(X_train, X_test, y_train, y_test, train_index, test_index)
return k, tested_indices
[docs] def testStandardFolds(self):
"""Test the default fold generator, which is a 5-fold cross validation."""
# test default settings with regression
dataset = self.createLargeTestDataSet()
dataset.addDescriptors([MorganFP(radius=3, nBits=128)])
fold = KFold(5, shuffle=True, random_state=dataset.randomState)
generator = FoldsFromDataSplit(fold)
k, indices = self.validateFolds(generator.iterFolds(dataset))
self.assertEqual(k, 5)
self.assertFalse(set(dataset.df.index) - set(indices))
# test directly on data set
k, indices = self.validateFolds(dataset.iterFolds(fold))
self.assertEqual(k, 5)
self.assertFalse(set(dataset.df.index) - set(indices))
# test default settings with classification
dataset.makeClassification("CL", th=[20])
fold = StratifiedKFold(5, shuffle=True, random_state=dataset.randomState)
k, indices = self.validateFolds(dataset.iterFolds(fold))
self.assertEqual(k, 5)
self.assertFalse(set(dataset.df.index) - set(indices))
# test with a standarizer
MAX_VAL = 2
MIN_VAL = 1
scaler = MinMaxScaler(feature_range=(MIN_VAL, MAX_VAL))
dataset.prepareDataset(feature_standardizer=scaler)
k, indices = self.validateFolds(dataset.iterFolds(fold))
self.assertEqual(k, 5)
self.assertFalse(set(dataset.df.index) - set(indices))
def check_min_max(X_train, X_test, *args, **kwargs):
self.assertTrue(np.max(X_train.values) == MAX_VAL)
self.assertTrue(np.min(X_train.values) == MIN_VAL)
self.assertTrue(np.max(X_test.values) == MAX_VAL)
self.assertTrue(np.min(X_test.values) == MIN_VAL)
self.validateFolds(dataset.iterFolds(fold), more=check_min_max)
k, indices = self.validateFolds(dataset.iterFolds(fold))
self.assertEqual(k, 5)
self.assertFalse(set(dataset.df.index) - set(indices))
# try with a split data set
dataset.split(RandomSplit(test_fraction=0.1))
k, indices = self.validateFolds(dataset.iterFolds(fold))
self.assertEqual(k, 5)
self.assertFalse(set(dataset.X.index) - set(indices))
[docs] def testBootstrappedFold(self):
dataset = self.createLargeTestDataSet(random_state=None)
dataset.addDescriptors([MorganFP(radius=3, nBits=128)])
split = RandomSplit(0.2)
fold = BootstrapSplit(split, n_bootstraps=5)
k, indices = self.validateFolds(dataset.iterFolds(fold))
self.assertEqual(k, 5)
# check if the indices are the same if we do the same split again
split = RandomSplit(0.2)
fold = BootstrapSplit(split, n_bootstraps=5, seed=dataset.randomState)
k, indices_second = self.validateFolds(dataset.iterFolds(fold))
self.assertEqual(k, 5)
self.assertListEqual(indices, indices_second)
# check if the indices are different if we do a different split
split = RandomSplit(0.2)
fold = BootstrapSplit(split, n_bootstraps=5, seed=42)
k, indices_third = self.validateFolds(dataset.iterFolds(fold))
self.assertEqual(k, 5)
self.assertEqual(split.getSeed(), None)
self.assertNotEqual(indices, indices_third)