import copy
import itertools
from unittest import skipIf
import numpy as np
import pandas as pd
from mlchemad.applicability_domains import KNNApplicabilityDomain as KNNAD
from parameterized import parameterized
from rdkit.Chem import Mol
from sklearn.preprocessing import StandardScaler
from .mol_processor import MolProcessor
from ..descriptors.fingerprints import MorganFP
from ..descriptors.sets import DataFrameDescriptorSet
from ... import TargetTasks
from ...data import QSPRDataset
from ...data.processing.applicability_domain import MLChemADWrapper, KNNApplicabilityDomain
from ...data.processing.data_filters import CategoryFilter, RepeatsFilter
from ...data.processing.feature_filters import (
BorutaFilter,
HighCorrelationFilter,
LowVarianceFilter,
)
from ...data.processing.feature_standardizers import SKLearnStandardizer
from ...utils.testing.base import QSPRTestCase
from ...utils.testing.path_mixins import DataSetsPathMixIn, PathMixIn
[docs]class TestDataFilters(DataSetsPathMixIn, QSPRTestCase):
"""Small tests to only check if the data filters work on their own.
The tests here should be used to check for all their specific parameters and
edge cases."""
[docs] def setUp(self):
super().setUp()
self.setUpPaths()
[docs] def testCategoryFilter(self):
"""Test the category filter, which drops specific values from dataset
properties."""
remove_cation = CategoryFilter(name="moka_ionState7.4", values=["cationic"])
df_anion = remove_cation(self.getBigDF())
self.assertTrue((df_anion["moka_ionState7.4"] == "cationic").sum() == 0)
only_cation = CategoryFilter(
name="moka_ionState7.4", values=["cationic"], keep=True
)
df_cation = only_cation(self.getBigDF())
self.assertTrue((df_cation["moka_ionState7.4"] != "cationic").sum() == 0)
[docs] def testRepeatsFilter(self):
"""Test the duplicate filter, which drops rows with identical descriptors
from dataset."""
descriptor_names = [f"Descriptor_{i}" for i in range(3)]
df = pd.DataFrame(
data=np.array(
[
["C", 1, 2, 1, 1],
["CC", 1, 2, 2, 2],
["CCC", 1, 2, 3, 3],
["C", 1, 2, 1, 4],
["C", 1, 2, 1, 5],
["CC", 1, 2, 2, 6], # 3rd "descriptor" is length of SMILES
]
),
columns=["SMILES", *descriptor_names, "Year"],
)
# only warnings
df_copy = copy.deepcopy(df)
dup_filter1 = RepeatsFilter(keep=True)
df_copy = dup_filter1(df_copy, df_copy[descriptor_names])
self.assertEqual(len(df_copy), len(df))
self.assertTrue(df_copy.equals(df))
# drop duplicates
df_copy = copy.deepcopy(df)
dup_filter2 = RepeatsFilter(keep=False)
df_copy = dup_filter2(df_copy, df_copy[descriptor_names])
self.assertEqual(len(df_copy), 1) # only CCC has one occurence
self.assertTrue(df_copy.equals(df.iloc[[2]]))
# keep first, by year
df_copy = copy.deepcopy(df)
dup_filter3 = RepeatsFilter(keep="first", timecol="Year")
df_copy = dup_filter3(df_copy, df_copy[descriptor_names])
self.assertEqual(len(df_copy), 3) # three unique SMILES
self.assertTrue(df_copy.equals(df.iloc[[0, 1, 2]])) # keep first by year
# check with additional columns
df_copy = copy.deepcopy(df)
df_copy["proteinid"] = ["A", "B", "B", "B", "B", "B"]
dup_filter4 = RepeatsFilter(additional_cols=["proteinid"])
df_copy = dup_filter4(df_copy, df_copy[descriptor_names])
self.assertEqual(len(df_copy), 2) # C (protein A, idx 0) and CCC are unique,
# but C (protein B, idx 3) is a duplicate
# of C (protein B, idx 4) and is dropped
[docs] def testConsistency(self):
dataset = self.createLargeTestDataSet()
remove_cation = CategoryFilter(name="moka_ionState7.4", values=["cationic"])
self.assertTrue((dataset.getDF()["moka_ionState7.4"] == "cationic").sum() > 0)
dataset.filter([remove_cation])
self.assertEqual(len(dataset.getDF()), len(dataset.getFeatures(concat=True)))
self.assertTrue((dataset.getDF()["moka_ionState7.4"] == "cationic").sum() == 0)
[docs]class TestFeatureFilters(PathMixIn, QSPRTestCase):
"""Tests to check if the feature filters work on their own.
Note: This also tests the `DataframeDescriptorSet`,
as it is used to add test descriptors.
"""
[docs] def setUp(self):
"""Set up the small test Dataframe."""
super().setUp()
self.nCPU = 2 # just to test parallel processing
self.chunkSize = 2
self.setUpPaths()
descriptors = [
"Descriptor_F1",
"Descriptor_F2",
"Descriptor_F3",
"Descriptor_F4",
"Descriptor_F5",
]
self.df_descriptors = pd.DataFrame(
data=np.array(
[
[1, 4, 2, 6, 2],
[1, 8, 4, 2, 4],
[1, 4, 3, 2, 5],
[1, 8, 4, 9, 8],
[1, 4, 2, 3, 9],
[1, 8, 4, 7, 12],
]
),
columns=descriptors,
)
self.df = pd.DataFrame(
data=np.array([["C", 1], ["C", 2], ["C", 3], ["C", 4], ["C", 5], ["C", 6]]),
columns=["SMILES", "y"],
)
self.dataset = QSPRDataset(
"TestFeatureFilters",
target_props=[{"name": "y", "task": TargetTasks.REGRESSION}],
df=self.df,
store_dir=self.generatedPath,
n_jobs=self.nCPU,
chunk_size=self.chunkSize,
)
self.df_descriptors["QSPRID"] = self.dataset.getProperty(
self.dataset.idProp
).values
self.df_descriptors.set_index("QSPRID", inplace=True, drop=True)
self.dataset.addDescriptors([DataFrameDescriptorSet(self.df_descriptors)])
self.descriptors = self.dataset.featureNames
[docs] def recalculateWithMultiIndex(self):
self.dataset.dropDescriptorSets(self.dataset.descriptorSets, full_removal=True)
self.df_descriptors["ID_COL1"] = (
self.dataset.getProperty(self.dataset.idProp)
.apply(lambda x: x.split("_")[0])
.to_list()
)
self.df_descriptors["ID_COL2"] = (
self.dataset.getProperty(self.dataset.idProp)
.apply(lambda x: x.split("_")[1])
.to_list()
)
self.dataset.addProperty("ID_COL1", self.df_descriptors["ID_COL1"].values)
self.dataset.addProperty("ID_COL2", self.df_descriptors["ID_COL2"].values)
self.dataset.addDescriptors(
[
DataFrameDescriptorSet(
self.df_descriptors,
["ID_COL1", "ID_COL2"],
)
]
)
[docs] def testDefaultDescriptorAdd(self):
"""Test adding without index columns."""
self.dataset.nJobs = 1
df_new = self.dataset.getFeatures(concat=True).copy()
calc = DataFrameDescriptorSet(df_new, suffix="new_df_desc")
self.dataset.addDescriptors([calc])
@parameterized.expand(
[
(True,),
(False,),
]
)
def testLowVarianceFilter(self, use_index_cols):
"""Test the low variance filter, which drops features with a variance below
a threshold."""
if use_index_cols:
self.recalculateWithMultiIndex()
self.dataset.filterFeatures([LowVarianceFilter(0.01)])
# check if correct columns selected and values still original
self.assertListEqual(list(self.dataset.featureNames), self.descriptors[1:])
self.assertListEqual(list(self.dataset.X.columns), self.descriptors[1:])
@parameterized.expand(
[
(True,),
(False,),
]
)
def testHighCorrelationFilter(self, use_index_cols):
"""Test the high correlation filter, which drops features with a correlation
above a threshold."""
if use_index_cols:
self.recalculateWithMultiIndex()
self.dataset.filterFeatures([HighCorrelationFilter(0.8)])
# check if correct columns selected and values still original
self.descriptors.pop(2)
self.assertListEqual(list(self.dataset.featureNames), self.descriptors)
self.assertListEqual(list(self.dataset.X.columns), self.descriptors)
@parameterized.expand(
[
(True,),
(False,),
]
)
@skipIf(
int(np.__version__.split(".")[1]) >= 24,
"numpy 1.24.0 not compatible with boruta",
)
def testBorutaFilter(self, use_index_cols):
"""Test the Boruta filter, which removes the features which are statistically as
relevant as random features."""
if use_index_cols:
self.recalculateWithMultiIndex()
self.dataset.filterFeatures([BorutaFilter()])
# check if correct columns selected and values still original
self.assertListEqual(list(self.dataset.featureNames), self.descriptors[-1:])
self.assertListEqual(list(self.dataset.X.columns), self.descriptors[-1:])
[docs]class TestFeatureStandardizer(DataSetsPathMixIn, QSPRTestCase):
"""Test the feature standardizer."""
[docs] def setUp(self):
"""Create a small test dataset with MorganFP descriptors."""
super().setUp()
self.setUpPaths()
self.dataset = self.createSmallTestDataSet(self.__class__.__name__)
self.dataset.addDescriptors([MorganFP(radius=3, nBits=128)])
[docs] def testFeaturesStandardizer(self):
"""Test the feature standardizer fitting, transforming and serialization."""
scaler = SKLearnStandardizer.fromFit(self.dataset.X, StandardScaler())
scaled_features = scaler(self.dataset.X)
scaler.toFile(f"{self.generatedPath}/test_scaler.json")
scaler_fromfile = SKLearnStandardizer.fromFile(
f"{self.generatedPath}/test_scaler.json"
)
scaled_features_fromfile = scaler_fromfile(self.dataset.X)
self.assertIsInstance(scaled_features, np.ndarray)
self.assertEqual(scaled_features.shape, (len(self.dataset), 128))
self.assertEqual(
np.array_equal(scaled_features, scaled_features_fromfile), True
)
[docs]def getCombos():
return list(
itertools.product(
[1, None],
[50, None],
[None, ["fu", "CL"], ["SMILES"]],
[True, False],
[None, [1, 2]],
[None, {"a": 1}],
)
)
[docs]class TestMolProcessor(DataSetsPathMixIn, QSPRTestCase):
[docs] def setUp(self):
super().setUp()
self.setUpPaths()
[docs] class TestingProcessor(MolProcessor):
def __call__(self, mols, props, *args, **kwargs):
assert "QSPRID" in props, "QSPRID not in props"
result = []
for mol in mols:
result.append((mol, *props.keys(), *args, *kwargs.keys()))
return np.array(result)
@property
def supportsParallel(self):
return True
@property
def requiredProps(self) -> list[str]:
return ["QSPRID"]
@parameterized.expand([["_".join([str(i) for i in x]), *x] for x in getCombos()])
def testMolProcess(self, _, n_jobs, chunk_size, props, add_rdkit, args, kwargs):
dataset = self.createLargeTestDataSet()
dataset.nJobs = n_jobs
dataset.chunkSize = chunk_size
self.assertTrue(dataset.nJobs is not None)
self.assertTrue(dataset.chunkSize is not None)
self.assertTrue(dataset.nJobs > 0)
self.assertTrue(dataset.chunkSize > 0)
result = dataset.processMols(
self.TestingProcessor(),
add_props=props,
as_rdkit=add_rdkit,
proc_args=args,
proc_kwargs=kwargs,
)
expected_props = (
[*props, "QSPRID", "SMILES"]
if props is not None
else dataset.getProperties()
)
expected_props = set(expected_props)
expected_args = set(args) if args is not None else set()
expected_kwargs = set(kwargs) if kwargs is not None else set()
expected_cols = (
len(expected_props) + len(expected_args) + len(expected_kwargs) + 1
)
for item in result:
if dataset.nJobs > 1:
self.assertTrue(item.shape[0] <= dataset.chunkSize)
else:
self.assertTrue(item.shape[0] == len(dataset))
self.assertEqual(
item.shape[1],
expected_cols,
)
for prop in expected_props:
self.assertIn(prop, item)
if add_rdkit:
self.assertIsInstance(item[0, 0], Mol)
else:
self.assertIsInstance(item[0, 0], str)
[docs]class testApplicabilityDomain(DataSetsPathMixIn, QSPRTestCase):
"""Test the applicability domain."""
[docs] def setUp(self):
"""Create a small test dataset with MorganFP descriptors."""
super().setUp()
self.setUpPaths()
self.dataset = self.createSmallTestDataSet(self.__class__.__name__)
self.dataset.addDescriptors([MorganFP(radius=3, nBits=1000)])
[docs] def testApplicabilityDomain(self):
"""Test the applicability domain fitting, transforming and serialization."""
ad = MLChemADWrapper(
KNNAD(dist="jaccard", scaling=None, alpha=0.95)
)
ad.fit(self.dataset.X)
self.assertIsInstance(ad.contains(self.dataset.X), pd.Series)
ad.toFile(f"{self.generatedPath}/test_ad.json")
ad_fromfile = MLChemADWrapper.fromFile(f"{self.generatedPath}/test_ad.json")
self.assertIsInstance(ad_fromfile.contains(self.dataset.X), pd.Series)
[docs] def testContinousAD(self):
"""Test the applicability domain for continuous data."""
ad = KNNApplicabilityDomain(dist="euclidean", scaling="standard", alpha=0.95)
ad.fit(self.dataset.X)
with self.assertRaises(ValueError):
ad.contains(ad.contains(self.dataset.X))
self.assertIsInstance(ad.transform(self.dataset.X), pd.Series)
ad.threshold = 0.3
ad.direction = "<"
self.assertIsInstance(ad.contains(self.dataset.X), pd.Series)
ad.toFile(f"{self.generatedPath}/test_ad.json")
ad_fromfile = MLChemADWrapper.fromFile(f"{self.generatedPath}/test_ad.json")