from typing import Generator
import pandas as pd
from sklearn.base import BaseEstimator
from qsprpred.data.sampling.splits import DataSplit
from qsprpred.data.tables.qspr import QSPRTable
from .feature_transformers import SklearnStep
from .step import Step
from ..descriptors.sets import DescriptorSet
from ...utils.interfaces.randomized import Randomized
from ...utils.serialization import JSONSerializable
[docs]
class Pipeline(Randomized, JSONSerializable):
"""Pipeline class for for sequentially applying data preprocessing steps.
Attributes:
steps (dict[str, Step | BaseEstimator]): Dictionary of named steps in the
pipeline, if the step is a scikit-learn transformer, it will be wrapped in
a SklearnStep.
fixed (list[str]): List of step names that should not be fitted, only
transformed
fitOn (dict[str, str]): Settings for which data a step should be fitted on.
Either 'train', 'test' or 'both', if not specified the step is fitted on
the training data.
applyTo (dict[str, str]): Settings for which data a step should be applied to.
Either 'train', 'test' or 'both', if not specified the step is applied to
both.
randomState (int | None): Random state for the pipeline
skip (list[str]): List of step names to skip
fitted (bool): Whether the pipeline is fitted
"""
def __init__(
self,
steps: dict[str, Step | BaseEstimator] | None = None,
fixed: list[str] | None = None,
fit_on: dict[str, str] | None = None,
apply_to: dict[str, str] | None = None,
skip: list[str] | None = None,
seed: int | None = None,
):
"""Initialize the Pipeline
Args:
steps (dict[str, Step | BaseEstimator]): Dictionary of named steps in the
pipeline, if the step is a scikit-learn transformer, it will be wrapped
in a SklearnStep.
fixed (list[str]): List of step names that should not be fitted, only
transformed
fit_on (dict[str, str]): Settings for which data a step should be fitted on.
Either 'train', 'test' or 'both', if not specified the step is fitted on
the training data.
apply_to (dict[str, str]): Settings for which data a step should be applied
to. Either 'train', 'test' or 'both', if not specified the step is
applied to both.
skip (list[str]): List of step names to skip
seed (int | None): Random state for the pipeline
"""
self.steps = steps if steps is not None else {}
self.fixed = fixed if fixed is not None else []
self.fitOn = fit_on if fit_on is not None else {}
self.applyTo = apply_to if apply_to is not None else {}
for name, step in self.steps.items():
if not isinstance(step, Step):
if hasattr(step, 'fit_transform'):
self.steps[name] = SklearnStep(step)
self.randomState = seed
self._skip = skip if skip is not None else []
self._fitted = False
# FIXME: featureNames is only set in the apply method, as only after applying
# the pipeline we know which features are present in the data (i.e. filters
# may remove some features). Some models like Neural Net model, require the
# number of features on initialization. However, if removing, adding or changing
# steps, the feature names may change, so this is not a reliable way to get
# the feature names. This should be fixed in the future.
self.featureNames = None
@property
def randomState(self) -> int | None:
"""Get the random state for the object."""
return self.seed
@randomState.setter
def randomState(self, seed: int | None):
"""Set the random state for the object.
Args:
seed (int | None):
The seed to use to randomize the action. If `None`,
a random seed is used instead of a fixed one.
"""
self.seed = seed
[docs]
def apply(
self,
X_train: pd.DataFrame,
y_train: pd.DataFrame | None = None,
X_test: pd.DataFrame | None = None,
y_test: pd.DataFrame | None = None,
fit: bool = True,
) -> tuple[
pd.DataFrame, pd.DataFrame | None, pd.DataFrame | None, pd.DataFrame | None]:
"""Apply the pipeline to the data
If fit is True, the pipeline is fitted to the training data and
then applied to the train and test data. If fit is False, the pipeline is only
applied to the data.
Args:
X_train (pd.DataFrame): training data to apply the pipeline to
y_train (pd.DataFrame | None): training target data to apply the pipeline to
X_test (pd.DataFrame | None): test data to apply the pipeline to
y_test (pd.DataFrame | None): test target data to apply the pipeline to
fit (bool): whether to fit the pipeline
Returns:
X_train (pd.DataFrame): transformed training data
y_train (pd.DataFrame | None): transformed training targets
X_test (pd.DataFrame | None): transformed test data
y_test (pd.DataFrame | None): transformed test targets
"""
if not self.fitted and not fit and len(self.steps) > 0:
raise ValueError("Pipeline must be fitted before transforming data")
for name, step in self.steps.items():
if name in self.skip:
continue
if fit:
if hasattr(step, 'randomState'):
step.randomState = self.randomState if step.randomState is None else step.randomState
# Fit step on the specified data
if name not in self.fixed:
if self.fitOn.get(name, 'train') == 'train':
step.fit(X_train, y_train)
elif self.fitOn.get(name, 'train') == 'both':
X_all, y_all = pd.concat([X_train, X_test]), pd.concat(
[y_train, y_test])
step.fit(X_all, y_all)
elif self.fitOn.get(name, 'train') == 'test':
step.fit(X_test, y_test)
else:
raise ValueError(
f"Unknown value for {name} fit_on: {self.fitOn.get(name)}")
self._fitted = True
# Apply step on the specified data
if self.applyTo.get(name, 'both') in ['train', 'both']:
X_train, y_train = step.transform(X_train, y_train)
if self.applyTo.get(name, 'both') in ['test', 'both']:
if X_test is not None:
X_test, y_test = step.transform(X_test, y_test)
if self.applyTo.get(name, 'both') not in ['train', 'test', 'both']:
raise ValueError(
f"Unknown value for {name} apply_to: {self.applyTo.get(name)}")
# Check number of features is still consistent between training and test data
if X_test is not None:
assert X_train.shape[1] == X_test.shape[
1], f"Number of features in training and test data is not consistent after step {name}"
assert all(
X_train.columns == X_test.columns), f"Feature names in training and test data are not consistent after step {name}"
self.featureNames = X_train.columns.tolist()
return X_train, y_train, X_test, y_test
[docs]
def removeStep(self, name: str):
"""Remove a step from the pipeline
Args:
name (str): name of the step to remove
"""
self.steps.pop(name)
[docs]
def addStep(self, name: str, step: Step, fit_on: str = 'train',
apply_to: str = 'both', fixed: bool = False):
"""Add a step to the pipeline
Args:
name (str): name of the step
step (Step): step to add to the pipeline
fit_on (str): whether to fit the step on 'train', 'test' or 'both'
apply_to (str): whether to apply the step on 'train', 'test' or 'both'
fixed (bool): whether the step should be fixed and not fitted
"""
if not isinstance(step, Step):
if hasattr(step, 'fit_transform'):
step = SklearnStep(step)
self.steps[name] = step
self.fitOn[name] = fit_on
self.applyTo[name] = apply_to
if fixed:
self.fixed.append(name)
[docs]
def orderSteps(self, order: list[str]):
"""Order the steps in the pipeline
Args:
order (list[str]): list of step names in the desired order
"""
assert set(order) == set(self.steps.keys()), "Order must contain all step names"
self.steps = {name: self.steps[name] for name in order}
@property
def fitted(self) -> bool:
"""Check if the pipeline is fitted"""
return self._fitted
@property
def skip(self) -> list[str]:
"""Get the steps to skip
The steps to skip are not fitted or transformed, but
are still present in the pipeline.
Returns:
list[str]: list of step names to skip
"""
return self._skip
[docs]
def addSkip(self, name: str):
"""Add a step to the skip list
Args:
name (str): name of the step to skip
"""
self._skip.append(name)
[docs]
def removeSkip(self, name: str):
"""Remove a step from the skip list
Args:
name (str): name of the step to remove from the skip list
"""
self._skip.remove(name)
def __str__(self):
steps = []
for name, obj in self.steps.items():
step = f"{name} ({obj.__class__.__name__}): "
step += f"fit_on={self.fitOn.get(name, 'train')}, "
step += f"apply_to={self.applyTo.get(name, 'both')}, "
step += f"fixed={name in self.fixed}"
step += f", skip={name in self.skip}"
if hasattr(obj, 'fitted'):
step += f", fitted={obj.fitted}"
if hasattr(obj, 'randomState'):
step += f", randomState={obj.randomState}"
steps.append(step)
return (
f"{self.__class__.__name__}\n"
f"steps:\n " +
f"\n ".join(steps) +
f"\nseed: {self.seed}"
f"\nfitted: {self.fitted}"
)
[docs]
class DatasetPipeline(Pipeline):
"""Pipeline class for applying data preprocessing steps to a QSPRDataset.
Attributes:
feature_calculators (list[DescriptorSet] | None): List of feature calculators
to apply to the dataset. If None, no feature calculators are applied.
originalfeatureNames (list[str] | None): Original feature names in the dataset
before applying the pipeline.
"""
def __init__(
self,
feature_calculators: list[DescriptorSet] | None = None,
steps: dict[str, Step | BaseEstimator] | None = None,
fixed: list[str] | None = None,
fit_on: dict[str, str] | None = None,
apply_to: dict[str, str] | None = None,
skip: list[str] | None = None,
seed: int | None = None,
):
"""Initialize the DatasetPipeline
Args:
feature_calculators (list[DescriptorSet] | None): List of feature
calculators to apply to the dataset.
steps (dict[str, Step | BaseEstimator]): Dictionary of named steps in the
pipeline, if the step is a scikit-learn transformer, it will be wrapped
in a SklearnStep.
fixed (list[str]): List of step names that should not be fitted, only
transformed
fit_on (dict[str, str]): Settings for which data a step should be fitted on.
Either 'train', 'test' or 'both', if not specified the step is fitted on
the training data.
apply_to (dict[str, str]): Settings for which data a step should be applied
to. Either 'train', 'test' or 'both', if not specified the step is
applied to both.
skip (list[str]): List of step names to skip
seed (int | None): Random state for the pipeline
"""
super().__init__(steps, fixed, fit_on, apply_to, skip, seed)
self.originalfeatureNames = None
self.feature_calculators = feature_calculators
[docs]
def applyOnDataSet(
self,
dataset: QSPRTable,
split: DataSplit | None = None,
fit: bool = True,
seed: int | None = None,
) -> Generator[
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame] | tuple[
pd.DataFrame, pd.DataFrame],
None,
None,
]:
"""Apply the pipeline to the dataset
Note. the random state of the dataset is used to randomize the pipeline
when the seed of feature calculators, splits or steps is not set.
Args:
dataset (QSPRTable): dataset to apply the pipeline to
split (DataSplit): split to apply to the dataset
seed (int | None): seed to randomize the pipeline,
if None, the random state of the dataset is used
fit (bool): whether to fit the pipeline
Yields:
X_train (pd.DataFrame): transformed training data
y_train (pd.DataFrame): transformed training targets
X_test (pd.DataFrame | None): transformed test data if split is not None
y_test (pd.DataFrame | None): transformed test targets if split is not None
"""
if fit:
self.randomState = dataset.randomState if seed is None else seed
# prepare X and y from the dataset
if self.feature_calculators is not None:
for feature_calculator in self.feature_calculators:
if hasattr(feature_calculator,
'randomState') and feature_calculator.randomState is None:
feature_calculator.randomState = self.randomState
dataset.addDescriptors(self.feature_calculators)
X = dataset.getDescriptors()
if self.fitted and not fit:
expected_features = (
self.originalfeatureNames
if self.originalfeatureNames is not None
else []
)
missing_features = set(expected_features) - set(X.columns.tolist())
assert not missing_features, (
"Some features are missing in the dataset, please check if any "
"descriptors that were added to the dataset directly before fitting "
"the pipeline are missing in the dataset. "
f"Missing: {sorted(missing_features)}"
)
else:
self.originalfeatureNames = X.columns.tolist()
y = dataset.getTargets()
# set the dataset for each step
for step in self.steps.values():
if hasattr(step, 'setDataSet'):
step.setDataSet(dataset)
# split the dataset and apply the pipeline
if split is None:
X, y, _, _ = super().apply(X, y, fit=fit)
yield X, y
else:
if isinstance(split, str):
split = dataset.getSplit(split)
if hasattr(split, 'setDataSet'):
split.setDataSet(dataset)
if hasattr(split, 'randomState') and split.randomState is None:
split.randomState = self.randomState
if hasattr(split, 'random_state') and split.random_state is None:
# FIXME: this is to set the random state for scikit-learn splits,
# but it may give unexpected results in other contexts
split.random_state = self.randomState
for train_index, test_index in dataset.split(split):
X_train, y_train, X_test, y_test = (
X.loc[train_index], y.loc[train_index], X.loc[test_index],
y.loc[test_index]
)
yield super().apply(X_train, y_train, X_test, y_test, fit)
def __str__(self):
feature_calculators = [
"None"] if self.feature_calculators is None else self.feature_calculators
return (
super().__str__() +
f"\nfeature_calculators: {', '.join([str(fc) for fc in feature_calculators])}")