import numpy as np
from parameterized import parameterized
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler
from ..descriptors.fingerprints import MorganFP
from ..processing.pipeline import DatasetPipeline
from ...data import (
BootstrapSplit,
ClusterSplit,
QSPRTable,
RandomSplit,
ScaffoldSplit,
TemporalSplit,
)
from ...data.chem.clustering import (
FPSimilarityLeaderPickerClusters,
FPSimilarityMaxMinClusters,
)
from ...data.chem.scaffolds import BemisMurcko, BemisMurckoRDKit
from ...data.sampling.splits import ManualSplit
from ...utils.testing.base import QSPRTestCase
from ...utils.testing.check_mixins import DataPrepCheckMixIn
from ...utils.testing.path_mixins import DataSetsPathMixIn
from ..descriptors.fingerprints import MorganFP
from ..processing.pipeline import DatasetPipeline
[docs]
class TestDataSplitters(DataSetsPathMixIn, QSPRTestCase, DataPrepCheckMixIn):
"""Small tests to only check if the data splitters work on their own.
The tests here should be used to check for all their specific parameters and edge
cases."""
[docs]
def setUp(self):
super().setUp()
self.setUpPaths()
[docs]
def testManualSplit(self):
"""Test the manual split function, where the split is done manually."""
dataset = self.createLargeTestDataSet()
dataset.nJobs = self.nCPU
dataset.chunkSize = self.chunkSize
# Add extra column to the data frame to use for splitting
df = dataset.getDF()
test_ids = df.sample(frac=0.1, random_state=42).index
train_ids = df.index.difference(test_ids)
dataset.addProperty("split", "test", ids=test_ids)
dataset.addProperty("split", "train", ids=train_ids)
split = ManualSplit("split", "train", "test")
dataset.addSplit(split, name="split")
# check if the split is correctly stored
self.checkSplit(dataset, "split")
# test if the split corresponds to the manually selected ids
train_ids_split, test_ids_split = dataset.getSplit("split", as_type="ids")[0]
self.assertTrue(all(train_ids.sort_values() == train_ids_split))
self.assertTrue(all(test_ids.sort_values() == test_ids_split))
#Test if also works for multiple splits
# Add extra column to the data frame to use for splitting
df = dataset.getDF()
test_ids2 = df.sample(frac=0.1, random_state=1).index
train_ids2 = df.index.difference(test_ids2)
dataset.addProperty("split2", "test", ids=test_ids2)
dataset.addProperty("split2", "train", ids=train_ids2)
double_split = ManualSplit(["split", "split2"], "train", "test")
dataset.addSplit(double_split, name="double_split")
# check if the split is correctly stored
self.checkSplit(dataset, "double_split")
# test if the split corresponds to the manually selected ids
double_split_iterator = dataset.getSplit("double_split", as_type="ids")
self.assertTrue(len(double_split_iterator) == 2)
train_ids_split, test_ids_split = double_split_iterator[1]
self.assertTrue(all(train_ids2.sort_values() == train_ids_split))
self.assertTrue(all(test_ids2.sort_values() == test_ids_split))
@parameterized.expand([
(False,),
(True,),
])
def testRandomSplit(self, multitask):
"""Test the random split function."""
if multitask:
dataset = self.createLargeMultitaskDataSet()
else:
dataset = self.createLargeTestDataSet()
dataset.addSplit(RandomSplit(test_fraction=0.1), name="RandomSplit")
self.checkSplit(dataset, "RandomSplit")
@parameterized.expand([
(False,),
(True,),
])
def testTemporalSplit(self, multitask):
"""Test the temporal split function, where the split is done based on a time
property."""
if multitask:
dataset = self.createLargeMultitaskDataSet()
else:
dataset = self.createLargeTestDataSet()
split_year = 2000
split = TemporalSplit(
timesplit=split_year,
timeprop="Year of first disclosure",
)
dataset.addSplit(split, name="temp_split")
self.checkSplit(dataset, "temp_split")
# test if dates higher than 2000 are in test set
train_ids, test_ids = dataset.getSplit("temp_split", as_type="ids")[0]
years = dataset.getDF().loc[test_ids, "Year of first disclosure"]
self.assertTrue(all(years > split_year))
# test bootstrapping
if multitask:
dataset = self.createLargeMultitaskDataSet(
name="TemporalSplit_bootstrap_mt"
)
else:
dataset = self.createLargeTestDataSet(name="TemporalSplit_bootstrap")
split = TemporalSplit(
timesplit=[split_year - 1, split_year, split_year + 1],
timeprop="Year of first disclosure",
)
bootstrap_split = BootstrapSplit(
split=split,
n_bootstraps=10,
)
for time, fold_info in zip(
split.timeSplit, list(dataset.split(bootstrap_split))
):
years = dataset.getDF().loc[fold_info[1], "Year of first disclosure"]
self.assertTrue(all(years > time))
@parameterized.expand(
[
(False, BemisMurckoRDKit(), None),
(
False,
BemisMurcko(use_csk=True),
[
"ScaffoldSplit_storage_library_000",
"ScaffoldSplit_storage_library_001",
],
),
(True, BemisMurckoRDKit(), None),
]
)
def testScaffoldSplit(self, multitask, scaffold, custom_test_list):
"""Test the scaffold split function."""
if multitask:
dataset = self.createLargeMultitaskDataSet(name="ScaffoldSplit")
else:
dataset = self.createLargeTestDataSet(name="ScaffoldSplit")
split = ScaffoldSplit(
scaffold=scaffold,
custom_test_list=custom_test_list,
)
dataset.addSplit(split, name="scaffold_split")
self.checkSplit(dataset, "scaffold_split")
# check that smiles in custom_test_list are in the test set
if custom_test_list:
test_index = dataset.getSplit("scaffold_split", as_type="ids")[0][1]
self.assertTrue(all(mol_id in test_index for mol_id in custom_test_list))
# check folding by scaffold
if multitask:
dataset = self.createLargeMultitaskDataSet(name="ScaffoldSplit_folding_mt")
else:
dataset = self.createLargeTestDataSet(name="ScaffoldSplit_folding")
n_folds = 5
split = ScaffoldSplit(
scaffold=scaffold,
custom_test_list=custom_test_list,
n_folds=n_folds,
)
test_index_all = []
for k, (train_index, test_index) in enumerate(
dataset.split(split)
):
self.assertTrue(all(x not in test_index_all for x in test_index))
self.assertTrue(len(train_index) > len(test_index))
test_index_all.extend(test_index.tolist())
self.assertEqual(k, n_folds - 1)
self.assertEqual(len(test_index_all), len(dataset.getDescriptors()))
@parameterized.expand(
[
(
False,
FPSimilarityLeaderPickerClusters(
fp_calculator=MorganFP(radius=2, nBits=128)
),
None,
),
(
False,
FPSimilarityMaxMinClusters(
fp_calculator=MorganFP(radius=2, nBits=128)),
[
"ClusterSplit_storage_library_000",
"ClusterSplit_storage_library_001",
],
),
(
True,
FPSimilarityMaxMinClusters(
fp_calculator=MorganFP(radius=2, nBits=128)),
None,
),
(
True,
FPSimilarityLeaderPickerClusters(
fp_calculator=MorganFP(radius=2, nBits=128)
),
[
"ClusterSplit_storage_library_000",
"ClusterSplit_storage_library_001",
],
),
]
)
def testClusterSplit(self, multitask, clustering_algorithm, custom_test_list):
"""Test the cluster split function."""
if multitask:
dataset = self.createLargeMultitaskDataSet(name="ClusterSplit")
else:
dataset = self.createLargeTestDataSet(name="ClusterSplit")
split = ClusterSplit(
clustering=clustering_algorithm,
custom_test_list=custom_test_list,
time_limit_seconds=10,
)
dataset.addSplit(split, name="cluster_split")
self.checkSplit(dataset, "cluster_split")
# check that smiles in custom_test_list are in the test set
if custom_test_list:
test_index = dataset.getSplit("cluster_split", as_type="ids")[0][1]
self.assertTrue(all(mol_id in test_index for mol_id in custom_test_list))
[docs]
def testSerialization(self):
"""Test the serialization of dataset with datasplit."""
dataset = self.createLargeTestDataSet()
split = ScaffoldSplit()
dataset.addSplit(split, name="scaffold_split")
self.checkSplit(dataset, "scaffold_split")
train_ids, test_ids = dataset.getSplit("scaffold_split", as_type="ids")[0]
dataset.save()
dataset_new = QSPRTable.fromFile(dataset.metaFile)
self.checkSplit(dataset_new, "scaffold_split")
train_ids_new, test_ids_new = dataset_new.getSplit("scaffold_split", as_type="ids")[0]
self.assertTrue(all(mol_id in train_ids_new for mol_id in train_ids))
self.assertTrue(all(mol_id in test_ids_new for mol_id in test_ids))
dataset_new.clear()
[docs]
class TestFoldSplitters(DataSetsPathMixIn, QSPRTestCase):
"""Small tests to only check if the fold splitters work on their own.
The tests here should be used to check for all their specific parameters and
edge cases."""
[docs]
def setUp(self):
super().setUp()
self.setUpPaths()
[docs]
def validateFolds(self, folds, more=None):
"""Check if the folds have the data they should have after splitting."""
k = 0
tested_indices = []
for (
X_train,
y_train,
X_test,
y_test,
) in folds:
k += 1
self.assertEqual(len(X_train), len(y_train))
self.assertEqual(len(X_test), len(y_test))
tested_indices.extend(X_test.index.tolist())
if more:
more(X_train, X_test, y_train, y_test)
return k, tested_indices
[docs]
def testStandardFolds(self):
"""Test the default fold generator, which is a 5-fold cross validation."""
# test default settings with regression
dataset = self.createLargeTestDataSet()
dataset.addDescriptors([MorganFP(radius=3, nBits=128)])
fold_split = KFold(5, shuffle=True, random_state=dataset.randomState)
dataset.addSplit(fold_split, name="fold_split")
k, indices = self.validateFolds(
dataset.iterSplit("fold_split", as_type="pandas"))
self.assertEqual(k, 5)
df = dataset.getDF()
self.assertFalse(set(df.index) - set(indices))
# test default settings with classification
dataset.makeClassification("CL", th=[20])
fold_split = StratifiedKFold(5, shuffle=True, random_state=dataset.randomState)
dataset.addSplit(fold_split, name="fold_split")
k, indices = self.validateFolds(
dataset.iterSplit("fold_split", as_type="pandas"))
self.assertEqual(k, 5)
self.assertFalse(set(df.index) - set(indices))
# test in a pipeline (with a standarizer)
MAX_VAL = 2
MIN_VAL = 1
scaler = MinMaxScaler(feature_range=(MIN_VAL, MAX_VAL))
pipeline = DatasetPipeline(steps={"standardizer": scaler})
def check_min_max(X_train, X_test, *args, **kwargs):
self.assertTrue(np.max(X_train.values) == MAX_VAL)
self.assertTrue(np.min(X_train.values) == MIN_VAL)
self.assertTrue(np.max(X_test.values) == MAX_VAL)
self.assertTrue(np.min(X_test.values) == MIN_VAL)
self.validateFolds(pipeline.applyOnDataSet(dataset, "fold_split"),
check_min_max)
k, indices = self.validateFolds(
dataset.iterSplit("fold_split", as_type="pandas"))
self.assertEqual(k, 5)
self.assertFalse(set(df.index) - set(indices))
# try with a split data set
train_ids, _ = next(dataset.split(RandomSplit(test_fraction=0.1)))
train_set = dataset[train_ids]
train_set.addSplit(fold_split, name="fold_split")
k, indices = self.validateFolds(
train_set.iterSplit("fold_split", as_type="pandas"))
self.assertEqual(k, 5)
self.assertFalse(set(train_ids) - set(indices))
[docs]
def testBootstrappedFold(self):
dataset = self.createLargeTestDataSet(random_state=1)
dataset.addDescriptors([MorganFP(radius=3, nBits=128)])
split = RandomSplit(0.2)
fold = BootstrapSplit(split, n_bootstraps=5)
dataset.addSplit(fold, name="fold_split")
k, indices = self.validateFolds(
dataset.iterSplit("fold_split", as_type="pandas"))
self.assertEqual(k, 5)
# check if the indices are the same if we do the same split again
split = RandomSplit(0.2)
fold = BootstrapSplit(split, n_bootstraps=5, seed=dataset.randomState)
dataset.addSplit(fold, name="fold_split2")
k, indices_second = self.validateFolds(
dataset.iterSplit("fold_split2", as_type="pandas"))
self.assertEqual(k, 5)
self.assertListEqual(indices, indices_second)
# check if the indices are different if we do a different split
split = RandomSplit(0.2)
fold = BootstrapSplit(split, n_bootstraps=5, seed=42)
dataset.addSplit(fold, name="fold_split3")
k, indices_third = self.validateFolds(
dataset.iterSplit("fold_split3", as_type="pandas"))
self.assertEqual(k, 5)
self.assertEqual(split.randomState, None)
self.assertNotEqual(indices, indices_third)