import json
import os
from typing import Callable, Generator
import numpy as np
import pandas as pd
from qsprpred.data.processing.target_transformers import Discretizer
from qsprpred.data.sampling.splits import DataSplit
from .interfaces.qspr_data_set import QSPRDataSet
from .mol import MoleculeTable
from ..storage.interfaces.chem_store import ChemStore
from ...logs import logger
from ...tasks import TargetSpec, TargetTasks
[docs]
class QSPRTable(QSPRDataSet, MoleculeTable):
"""Implementation of `QSPRDataSet` using a collection of `PandasDataTable` objects.
Attributes:
targetProperties (str): property to be predicted with QSPRmodel
"""
def __init__(
self,
storage: ChemStore | None = None,
name: str | None = None,
target_props: list[TargetSpec | dict] | None = None,
path: str = ".",
random_state: int | None = None,
store_format: str = "pkl",
drop_empty_target_props: bool = True,
):
"""Construct QSPRdata, also apply transformations of output property if
specified.
Args:
storage (ChemStore | None):
storage object to use for saving the data. Defaults to `None`.
name (str):
data name, used in saving the data
target_props (list[TargetSpec | dict] | None):
target properties, names should correspond with target column names
in df. If `None`, target specifications will be inferred if this data
set has been saved previously. Defaults to `None`.
path (str, optional):
path to the directory where the data set will be saved. Defaults to ".".
random_state (int, optional): random state for splitting the data.
store_format (str, optional):
format to use for storing the data ('pkl' or 'csv').
drop_empty_target_props (bool, optional):
whether to ignore entries with empty target properties. Defaults to
`True`.
Raises:
`ValueError`: Raised if threshold given with non-classification task.
"""
super().__init__(
storage=storage,
name=name or f"{storage}_qspr_data",
path=path,
random_state=random_state,
store_format=store_format,
)
# load target specifications if not specified and file exists
if target_props is None and os.path.exists(self.metaFile):
meta = json.load(open(self.metaFile, "r"))
target_props = meta["py/state"]["_targetProperties"]
target_props = [
TargetSpec.fromJSON(json.dumps(x)) for x in target_props
]
elif target_props is None:
raise ValueError(
"Target specifications must be specified for a new QSPRTable.")
# populate feature matrix and target specifications
self._targetProperties = []
self.setTargetProperties(target_props, drop_empty_target_props)
logger.info(
f"Dataset '{self.name}' created for "
f"Targets: '{self.targetProperties}'. "
f"Number of samples: {len(self.storage)}. "
)
self.splits = {}
@property
def targetProperties(self) -> list[TargetSpec]:
"""Returns the specifications of target properties of the dataset."""
return self._targetProperties
@targetProperties.setter
def targetProperties(self, target_properties: list[TargetSpec]):
"""Set the target properties of the dataset."""
raise NotImplementedError(
"targetProperties is a read-only property. Use `setTargetProperties` to set "
"the target properties."
)
[docs]
@classmethod
def fromDF(
cls,
name: str,
df: pd.DataFrame,
target_props: list[TargetSpec | dict],
path: str = ".",
smiles_col: str = "SMILES",
drop_empty_target_props: bool = True,
**kwargs,
) -> "QSPRTable":
"""Create `QSPRTable` from a pandas DataFrame.
Args:
name (str): name of the data set
df (pd.DataFrame): data frame containing the data
target_props (list[TargetProperty | dict]): target properties to use
path (str): path to the directory where the data set will be saved
smiles_col (str): name of the column containing SMILES
drop_empty_target_props (bool, optional): whether to drop rows with empty
target property values. Defaults to `True`.
**kwargs: additional keyword arguments for `MoleculeTable` constructor
Returns:
QSPRTable: created data set
"""
mt = super().fromDF(name, df, path, smiles_col, **kwargs)
return QSPRTable.fromMolTable(mt, target_props, name=name, path=path,
drop_empty_target_props=drop_empty_target_props)
[docs]
@classmethod
def fromTableFile(
cls,
name: str,
filename: str,
path: str,
*args,
sep: str = "\t",
target_props: list[TargetSpec | dict] | None = None,
**kwargs,
):
r"""Create `QSPRTable` from table file (i.e. CSV or TSV).
Args:
name (str): name of the data set
filename (str): path to the table file
path (str): path to the directory where the data set will be saved
*args: additional arguments for `MolTable` constructor
sep (str, optional): separator in the table file. Defaults to "\t".
target_props (list[TargetProperty | dict], optional): target properties to
use. Defaults to `None`.
**kwargs: additional keyword arguments for `MolTable` constructor
Returns:
QSPRTable: `QSPRTable` object
"""
mt = super().fromTableFile(name, filename, path, *args, sep=sep, **kwargs)
return QSPRTable.fromMolTable(mt, target_props, name=mt.name, path=path)
[docs]
@classmethod
def fromSDF(cls, name: str, filename: str, smiles_prop: str, *args, **kwargs):
"""Create `QSPRTable` from SDF file.
It is currently not implemented for `QSPRTable`, but you can convert from
'MoleculeTable' with the 'fromMolTable' method.
Args:
name (str): name of the data set
filename (str): path to the SDF file
smiles_prop (str): name of the property in the SDF file containing SMILES
*args: additional arguments for `QSPRTable` constructor
**kwargs: additional keyword arguments for `QSPRTable` constructor
"""
raise NotImplementedError(
f"SDF loading not implemented for {QSPRTable.__name__}, yet. You can "
"convert from 'MoleculeTable' with 'fromMolTable'."
)
[docs]
@classmethod
def fromMolTable(
cls,
mol_table: MoleculeTable,
target_props: list[TargetSpec | dict],
*args,
path: str = ".",
name: str | None = None,
**kwargs,
) -> "QSPRTable":
"""Create QSPRTable from a MoleculeTable.
Args:
mol_table (MoleculeTable): `MoleculeTable` to use as the data source
target_props (list): list of target properties to use
*args:
additional positional arguments to pass to the constructor of
`QSPRTable`
path (str): path to the directory where the data set will be saved
name (str): name of the data set
**kwargs:
additional keyword arguments to pass to the constructor of `QSPRTable`
Returns:
QSPRTable: created data set
"""
name = mol_table.name if name is None else name
kwargs["random_state"] = (
mol_table.randomState
if "random_state" not in kwargs else kwargs["random_state"]
)
kwargs["store_format"] = (
mol_table.storeFormat
if "store_format" not in kwargs else kwargs["store_format"]
)
ds = QSPRTable(
mol_table.storage,
name,
target_props,
path,
*args,
**kwargs,
)
ds.descriptors = mol_table.descriptors
return ds
[docs]
def addTargetProperty(self, target_spec: TargetSpec | dict,
drop_empty: bool = True):
"""Add a target property to the dataset.
Args:
target_spec (TargetSpec | dict):
target property specification to add or dictionary to initialize a
TargetSpec
drop_empty (bool):
whether to drop rows with empty target property values. Defaults to
`True`.
"""
logger.debug(f"Adding target property '{target_spec}' to dataset.")
if isinstance(target_spec, dict):
target_spec = TargetSpec.fromDict(target_spec)
assert (
target_spec.name in self.getProperties()
), f"Property {target_spec.name} not found in data set."
self.restoreTargetProperty(target_spec)
if target_spec.name in self.getTargetPropertiesNames():
logger.warning(
f"Target property '{target_spec}' already exists in dataset. It will be overwritten."
)
self._targetProperties = [
tp for tp in self.targetProperties if tp.name != target_spec.name
]
self._targetProperties.append(target_spec)
if target_spec.task.isClassification():
self.makeClassification(target_spec.name, target_spec.th)
self.checkClassification(target_spec.name)
if drop_empty:
self.dropEmptyEntries([target_spec.name])
[docs]
def getTargetSpecs(self, names: list | None) -> list[TargetSpec]:
"""Get the target specifications with the given names.
Args:
names (list[str]): name of the target properties
Returns:
(list[TargetSpec]): list of target specifications
"""
if names is None:
return self.targetProperties
if not all(name in self.getTargetPropertiesNames() for name in names):
logger.warning(
f"Some target properties {names} not found in dataset. "
f"Available target properties: {self.getTargetPropertiesNames()}"
)
return [tp for tp in self.targetProperties if tp.name in names]
[docs]
def getTargetSpec(self, name: str) -> TargetSpec:
"""Get the target specification of a single target property by its name.
Args:
name (str): name of the target property
Returns:
TargetSpec: target specification with the given name
Raises:
ValueError: if the target property with the given name is not found
"""
for tp in self.targetProperties:
if tp.name == name:
return tp
raise ValueError(f"Target property '{name}' not found in dataset.")
[docs]
def setTargetProperties(
self,
target_props: list[TargetSpec | dict],
drop_empty: bool = True,
):
"""Set list of target properties for the dataset.
Args:
target_props (list[TargetSpec | dict]):
list of target properties specifications or dictionaries to initialize
the TargetSpec objects from.
drop_empty (bool, optional):
whether to drop rows with empty target property values. Defaults to
`True`.
"""
assert isinstance(target_props, list), (
"target_props should be a list of TargetSpec objects or dictionaries to "
"initialize TargetSpec objects from. Not a %s." % type(target_props)
)
if isinstance(target_props[0], dict):
assert all(isinstance(d, dict) for d in target_props), (
"target_props should be a list of TargetSpec objects or "
"dictionaries to initialize TargetSpec objects from, not a mix."
)
target_props = TargetSpec.fromList(target_props)
else:
assert all(isinstance(d, TargetSpec) for d in target_props), (
"target_props should be a list of TargetSpec objects or "
"dictionaries to initialize TargetSpec objects from, not a mix."
)
self._targetProperties = []
for prop in target_props:
self.addTargetProperty(prop, drop_empty)
[docs]
def unsetTargetProperty(self, name: str | TargetSpec):
"""Unset a target property. It will not remove it from the data set, but
will make it unavailable for training.
Args:
name (str | TargetSpec):
name or specification of the target property to drop
"""
name = name.name if isinstance(name, TargetSpec) else name
assert (
name in self.getTargetPropertiesNames()
), f"Target property '{name}' not found in dataset."
assert (
len(self.targetProperties) > 1
), "Cannot drop task from single-task dataset."
self._targetProperties = [tp for tp in self.targetProperties if tp.name != name]
[docs]
def restoreTargetProperty(self, prop: TargetSpec | str):
"""Reset target property to its original value.
Args:
prop (TargetProperty | str): target property to reset
"""
if isinstance(prop, str):
prop = self.getTargetSpec(prop)
if f"{prop.name}_original" in self.getProperties():
# restore original values
self.addProperty(prop.name, self.getProperty(f"{prop.name}_original"))
else:
# save original values for next reset
self.addProperty(f"{prop.name}_original", self.getProperty(prop.name))
[docs]
def makeClassification(
self,
target_property: str,
th: list[float] | None = None,
):
"""Switch to classification task using the given threshold values.
Args:
target_property (str):
Name of target property to use for classification
th (list[float], optional):
list of threshold values. If not provided, it is assumed that
the target property is already discretized and can be used for
classification.
"""
assert isinstance(th, (list, type(None))), (
"Thresholds must be a list of floats or None. "
f"Got {type(th)} instead."
)
if isinstance(th, list):
assert len(th) > 0, (
"Thresholds must be a non-empty list of floats. "
)
assert len(th) == 1 or len(th) > 3, (
"Thresholds must be a single float for binary classification or "
"a list of at least 3 floats for multi-class classification."
)
assert target_property in self.getTargetPropertiesNames(), (
f"Target property '{target_property}' not found in dataset. "
f"Available target properties: {self.getTargetPropertiesNames()} "
f"To convert a regression task to classification, first add the "
f"property as a target property with the "
f"`addTargetProperty` method."
)
self.restoreTargetProperty(target_property)
target_values = self.getTarget(target_property).copy()
target_spec = self.getTargetSpec(target_property)
if target_values.isna().all():
logger.debug(
f"Target property '{target_property}' has all NaNs. This happens "
"on the initialization of a PredictionDataSet, but should not happen "
"otherwise."
)
assert target_spec.task.isClassification(), (
f"Target property '{target_property}' is not a classification task. "
" and it has no values."
)
else:
# convert target values to discrete classes if needed
if th is None:
assert all(
value is None or (type(value) in (int, bool)) or
(isinstance(value, float) and value.is_integer())
for value in target_values
), (
"Precomputed classification target must be integers or booleans."
"Set the `th` argument to a list of threshold values to convert "
"float values to discrete classes for classification."
)
else:
discretizer = Discretizer(target=target_property, th=th)
target_values = discretizer.fitTransform(None, target_values)[1][
target_property]
self.addProperty(target_property, target_values)
# update target specification
n_classes = len(target_values.dropna().unique())
task = TargetTasks.MULTICLASS if n_classes > 2 else TargetTasks.SINGLECLASS
target_spec.task = task
if th is None:
target_spec.setTh(th, n_classes=n_classes)
else:
target_spec.setTh(th)
logger.info(
f"Target property '{target_property}' converted to classification.")
[docs]
def makeRegression(self, target_property: str):
"""Switch to regression task using the given target property.
Args:
target_property (str): name of the target property to use for regression
"""
target_spec = self.getTargetSpec(target_property)
self.restoreTargetProperty(target_spec)
target_spec.task = TargetTasks.REGRESSION
if hasattr(target_spec, "th"):
del target_spec.th
logger.info(f"Target property '{target_property}' converted to regression.")
[docs]
def checkClassification(
self,
target_property: str,
) -> bool:
"""Checks the validity of the target property for classification tasks.
Args:
target_property (str):
Name of the target property to use for classification
Returns:
bool: `True` if the target property is correctly set up for classification,
`False` otherwise.
"""
target_values = self.getTarget(target_property)
target_spec = self.getTargetSpec(target_property)
if not all(
value is None or np.isnan(value) or (type(value) in (int, bool)) or
(isinstance(value, float) and value.is_integer())
for value in target_values
):
logger.warning(
f"Classification target property '{target_property}' "
"should only contain integers or booleans. "
"Either convert it to discrete values using "
"`makeClassification` method with a threshold, "
"change the property values using `addProperty`, "
"or set the task to REGRESSION."
)
return False
n_classes = len(target_values.dropna().unique())
if n_classes == 1:
logger.warning(
f"Classification target property '{target_property}' task "
f"is set to {target_spec.task}, but it contains only "
"1 class. Perhaps you meant to set the task to REGRESSION? "
"Training a classification model with only one class "
"is not meaningful."
)
return False
elif n_classes == 2 and target_spec.task == TargetTasks.MULTICLASS:
logger.warning(
f"Classification target property '{target_property}' task "
"is set to MULTICLASS, but it contains only "
f"2 classes. Perhaps you meant to set the task to "
"SINGLECLASS?"
)
return False
elif n_classes > 2 and target_spec.task == TargetTasks.SINGLECLASS:
logger.warning(
f"Classification target property '{target_property}' task "
"is set to SINGLECLASS, but it contains more than "
f"2 classes ({n_classes}). Perhaps you meant to set the task to "
"MULTICLASS?"
)
return False
return True
@property
def isMultiTask(self) -> bool:
"""Check if the dataset contains multiple target properties.
Returns:
(bool): `True` if the dataset contains multiple target properties
"""
return len(self.targetProperties) > 1
@property
def nTargetProperties(self) -> int:
"""Get the number of target properties in the dataset."""
return len(self.targetProperties)
[docs]
def getTargets(self) -> pd.DataFrame:
"""Get the target property values
Returns:
(pd.DataFrame): target property values
"""
return self.getDF()[self.getTargetPropertiesNames()]
[docs]
def getTarget(self, name: str | TargetSpec) -> pd.Series:
"""Get the target property values for the given target property.
Args:
name (str | TargetSpec): name or specification of the target property
Returns:
(pd.Series): target property values
"""
if isinstance(name, TargetSpec):
name = name.name
assert name in self.getTargetPropertiesNames(), f"Target property '{name}' not found in dataset."
return self.getDF()[name]
[docs]
def getSubset(
self,
subset: list[str],
ids: list[str] | None = None,
name: str | None = None,
path: str = ".",
**kwargs,
) -> "QSPRTable":
"""Get a subset of the data set.
Args:
subset (list[str]): list of columns to include in the subset
ids (list[str], optional): list of IDs to include in the subset. Defaults to
`None`.
name (str, optional): name of the subset. Defaults to `None`.
path (str, optional): path to the directory where the subset will be saved.
Defaults to ".".
**kwargs: additional keyword arguments for the constructor of `QSPRTable`.
Returns:
QSPRTable: subset of the data set
"""
# add target properties if not already in the subset
# as the QSPRTable requires them
subset = list(set(subset + self.getTargetPropertiesNames()))
mt = super().getSubset(subset, ids, name, path, **kwargs)
ds = self.fromMolTable(
mt, self.targetProperties, name=mt.name, path=path,
drop_empty_target_props=False, **kwargs
)
return ds
[docs]
def addSplit(self, split: DataSplit, name: str):
"""Add a split to the dataset.
Performs the split and stores the split object and the indices of the split.
If the split has a random state, it will be set to the random state of the
dataset if it is not set.
Args:
split (DataSplit): split to add
name (str): name of the split
"""
self.splits[name] = {
"split": split,
"ids": [(train_idx.tolist(), test_idx.tolist()) for train_idx, test_idx in
self.split(split)],
}
[docs]
def getSplit(self, name: str, as_type: str = "split"
) -> (DataSplit | list[tuple[pd.Index, pd.Index]]):
"""Get the split with the given name.
Args:
name (str): name of the split
as_type (str): Determines the type of output. Can be one of:
- "split": Returns a DataSplit object.
- "ids": Returns train and test indices.
Returns:
DataSplit: split if `as_type` is "split"
list[tuple[pd.Index, pd.Index]]:
train and test indices if `as_type` is "ids"
"""
split = self.splits[name]
if as_type == "split":
return split["split"]
if as_type == "ids":
return split["ids"]
else:
raise ValueError(
f"Unknown as_type: {as_type}, "
"should be 'split' or 'ids'."
)
[docs]
def iterSplit(self, name: str, as_type: str = "ids"
) -> (
Generator[tuple[pd.Index, pd.Index], None, None] |
Generator[
tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray], None, None] |
Generator[tuple[
pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame], None, None] |
Generator[tuple["QSPRTable", "QSPRTable"], None, None]
):
"""Get the split with the given name.
Args:
name (str): name of the split
as_type (str): Determines the type of output. Can be one of:
- "ids": yields train and test indices.
- "numpy": Yields train and test numpy arrays.
- "pandas": Yields train and test pandas DataFrames.
- "QSPRTable": Yields train and test QSPRTable objects.
Yields:
tuple[pd.Index, pd.Index]: train and test indices if `as_type` is "ids"
tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
train descriptors, train targets, test descriptors, test targets
`as_type` is "numpy"
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
train descriptors, train targets, test descriptors, test targets
`as_type` is "pandas"
tuple[QSPRTable, QSPRTable]:
train and test QSPRTable objects if `as_type` is "QSPRTable"
"""
split = self.splits[name]
if as_type == "ids":
for ids in split["ids"]:
yield ids
elif as_type == "numpy":
X = self.getDescriptors()
y = self.getTargets()
for ids in split["ids"]:
train_idx, test_idx = ids[0], ids[1]
yield (
X.loc[train_idx].values,
y.loc[train_idx].values,
X.loc[test_idx].values,
y.loc[test_idx].values
)
elif as_type == "pandas":
X = self.getDescriptors()
y = self.getTargets()
for ids in split["ids"]:
train_idx, test_idx = ids
yield (
X.loc[train_idx],
y.loc[train_idx],
X.loc[test_idx],
y.loc[test_idx]
)
elif as_type == "QSPRTable":
for ids in split["ids"]:
train = self.getSubset(self.getProperties(), ids[0])
test = self.getSubset(self.getProperties(), ids[1])
yield train, test
else:
raise ValueError(
f"Unknown as_type: {as_type}, "
"should be 'ids', 'numpy', 'pandas' or 'QSPRTable'."
)
[docs]
def split(
self,
split: DataSplit,
) -> Generator[
tuple[
pd.Index,
pd.Index
],
None,
None,
]:
"""Create folds from Descriptors and Targets. Can be used either for
cross-validation, bootstrapping or train-test split.
Args:
split (DataSplit): Split to apply to the data
X (pd.DataFrame): data to apply the split to
y (pd.DataFrame | None): target data to apply the split to
Yields:
pd.Index, pd.Index: indices of the train and test set
"""
if hasattr(split, "dataSet"):
split.setDataSet(self)
if hasattr(split, "randomState"):
if split.randomState is None:
split.randomState = self.randomState
X = self.getDescriptors()
y = self.getTargets()
folds = split.split(X, y)
for train_idx, test_idx in folds:
# get QSPRTable indices from numerical index
train_idx = X.index[train_idx]
test_idx = X.index[test_idx]
yield train_idx, test_idx
def __getitem__(self, ids: list[str]) -> "QSPRTable":
"""Get a subset of the data set.
This method is used to get a subset of the data set by providing a list of IDs.
It is the same as calling `getSubset` method for all properties.
It uses the same random state as the original data set.
Args:
ids (list[str]): list of IDs to include in the subset
Returns:
QSPRTable: subset of the data set
"""
# FIXME: setting the random state here is not ideal, this should be done in the
# getSubset method
return self.getSubset(self.getProperties(), ids, random_state=self.randomState)
[docs]
def filter(self, table_filters: list[Callable]):
"""Filter the data set using the given filters.
Args:
table_filters (list[DataFilter]): list of filters to apply
"""
for filter in table_filters:
ret, _ = filter.transform(self.getDescriptors(), self.getTargets())
ids = pd.Series(
self.getProperty(self.idProp), index=self.getProperty(self.idProp)
)
ids_to_drop = ids[~ids.isin(ret.index)].values
self.dropEntries(ids_to_drop)
def __setstate__(self, state):
super().__setstate__(state)
for split in self.splits.values():
if hasattr(split["split"], "setDataSet"):
split["split"].setDataSet(self)