Source code for qsprpred.models.model

"""This module holds the base class for QSPRmodels, model types should be a subclass."""

import copy
import inspect
import json
import os
import shutil
import sys
import typing
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Callable, List, Type, Union

import numpy as np
import pandas as pd
from rdkit import Chem
from rdkit.Chem import Mol

from ..data.tables.mol import MoleculeTable
from ..data.tables.qspr import QSPRDataset
from ..logs import logger
from ..models.early_stopping import EarlyStopping, EarlyStoppingMode
from ..tasks import ModelTasks
from ..utils.inspect import dynamic_import
from ..utils.serialization import JSONSerializable


[docs]class QSPRModel(JSONSerializable, ABC): """The definition of the common model interface for the package. The QSPRModel handles model initialization, fitting, predicting and saving. Attributes: name (str): name of the model data (QSPRDataset): data set used to train the model alg (Type): estimator class parameters (dict): dictionary of algorithm specific parameters estimator (Any): the underlying estimator instance of the type specified in `QSPRModel.alg`, if `QSPRModel.fit` or optimization was performed featureCalculators (MoleculeDescriptorsCalculator): feature calculator instance taken from the data set or deserialized from file if the model is loaded without data featureStandardizer (SKLearnStandardizer): feature standardizer instance taken from the data set or deserialized from file if the model is loaded without data baseDir (str): base directory of the model, the model files are stored in a subdirectory `{baseDir}/{outDir}/` earlyStopping (EarlyStopping): early stopping tracker for training of QSPRpred models that support early stopping (e.g. neural networks) randomState (int): Random state to use for all random operations for reproducibility. """ _notJSON: typing.ClassVar = ["estimator", *JSONSerializable._notJSON]
[docs] @staticmethod def handleInvalidsInPredictions( mols: list[str], predictions: np.ndarray | list[np.ndarray], failed_mask: np.ndarray, ) -> np.ndarray: """Replace invalid predictions with None. Args: mols (MoleculeTable): molecules for which the predictions were made predictions (np.ndarray): predictions made by the model failed_mask (np.ndarray): boolean mask of failed predictions Returns: np.ndarray: predictions with invalids replaced by None """ if any(failed_mask): if isinstance(predictions, list): predictions_with_invalids = [ np.full((len(mols), pred.shape[1]), None) for pred in predictions ] for i, pred in enumerate(predictions): predictions_with_invalids[i][~failed_mask, :] = pred else: predictions_with_invalids = np.full( (len(mols), predictions.shape[1]), None ) predictions_with_invalids[~failed_mask, :] = predictions predictions = predictions_with_invalids return predictions
[docs] @classmethod def loadParamsGrid( cls, fname: str, optim_type: str, model_types: str ) -> np.ndarray: """Load parameter grids for bayes or grid search parameter optimization from json file. Arguments: fname (str): file name of json file containing array with three columns containing modeltype, optimization type (grid or bayes) and model type optim_type (str): optimization type (`grid` or `bayes`) model_types (list of str): model type for hyperparameter optimization (e.g. RF) Returns: np.ndarray: array with three columns containing modeltype, optimization type (grid or bayes) and model type """ try: with open(fname) as json_file: optim_params = np.array(json.load(json_file), dtype=object) except FileNotFoundError: logger.error("Search space file (%s) not found" % fname) sys.exit() # select either grid or bayes optimization parameters from param array optim_params = optim_params[optim_params[:, 2] == optim_type, :] # check all ModelTasks to be used have parameter grid model_types = [model_types] if isinstance(model_types, str) else model_types if not set(model_types).issubset(list(optim_params[:, 0])): logger.error( "model types %s missing from models in search space dict (%s)" % (model_types, optim_params[:, 0]) ) sys.exit() logger.info("search space loaded from file") return optim_params
def __init__( self, base_dir: str, alg: Type | None = None, name: str | None = None, parameters: dict | None = None, autoload=True, random_state: int | None = None, ): """Initialize a QSPR model instance. If the model is loaded from file, the data set is not required. Note that the data set is required for fitting and optimization. Args: base_dir (str): base directory of the model, the model files are stored in a subdirectory `{baseDir}/{outDir}/` alg (Type): estimator class name (str): name of the model parameters (dict): dictionary of algorithm specific parameters autoload (bool): if `True`, the estimator is loaded from the serialized file if it exists, otherwise a new instance of alg is created random_state (int): Random state to use for shuffling and other random operations. """ self.name = name or (alg.__class__.__name__ if alg else None) if self.name is None: raise ValueError("Model name not specified.") self.baseDir = os.path.abspath(base_dir.rstrip("/")) self.targetProperties = None self.nTargets = None self.featureCalculators = None self.featureStandardizer = None # initialize estimator self.earlyStopping = EarlyStopping() if self.supportsEarlyStopping else None if autoload and os.path.exists(self.metaFile): new = self.fromFile(self.metaFile) self.__dict__.update(new.__dict__) self.name = name or (alg.__class__.__name__ if alg else None) self.baseDir = os.path.abspath(base_dir.rstrip("/")) if parameters: logger.warning( f"Explicitly specified parameters ({parameters})" f"will override model settings read from file: {self.parameters}." f"Estimator will be reloaded with the new parameters " f"and will have to be re-fitted if fitted previously." ) self.parameters = copy.deepcopy(parameters) self.estimator = self.loadEstimator(self.parameters) if random_state: logger.warning( f"Explicitly specified random state ({random_state})" f"will override model settings read from file: {self.randomState}." ) self.initRandomState(random_state) else: # make a deep copy of the params to make sure no problems happen downstream self.parameters = copy.deepcopy(parameters) # initialize an estimator instance with the given parameters self.alg = alg # initialize random state self.randomState = None self.initRandomState(random_state) # load the estimator self.estimator = self.loadEstimator(self.parameters) assert ( self.estimator is not None ), "Estimator not initialized when it should be." assert ( self.alg is not None ), "Algorithm class not initialized when it should be." def __str__(self) -> str: """Return the name of the model and the underlying class as the identifier.""" if self.estimator is not None: name = self.estimator.__class__.__name__ elif self.alg is not None: name = self.alg.__name__ else: name = "None" return f"{self.name} ({name})" def __setstate__(self, state): """Set state.""" super().__setstate__(state) if type(self.alg) is str: self.alg = dynamic_import(self.alg) self.estimator = self.loadEstimator(self.parameters)
[docs] def initFromDataset(self, data: QSPRDataset | None): if data is not None: self.targetProperties = data.targetProperties self.nTargets = len(self.targetProperties) self.featureCalculators = data.descriptorSets self.featureStandardizer = data.featureStandardizer if self.randomState is None: self.initRandomState(data.randomState) else: self.targetProperties = None self.nTargets = None self.featureCalculators = None self.featureStandardizer = None
[docs] def initRandomState(self, random_state): """Set random state if applicable. Defaults to random state of dataset if no random state is provided, Args: random_state (int): Random state to use for shuffling and other random operations. """ if random_state is None: self.randomState = int(np.random.randint(0, 2**31-1, dtype=np.int64)) logger.info( "No random state supplied." f"Setting random state to: {self.randomState}." ) self.randomState = random_state constructor_params = [ name for name, _ in inspect.signature(self.alg.__init__).parameters.items() ] common_params = ["random_state", "random_seed", "seed"] random_param = None for seed_param in common_params: if seed_param not in constructor_params: try: if self.parameters: params = { k: v for k, v in self.parameters.items() if k != seed_param } params[seed_param] = self.randomState self.alg( **params, ) else: self.alg(**{seed_param: random_state}) random_param = seed_param break except TypeError: pass else: random_param = seed_param break if random_param is not None: if self.parameters: self.parameters.update({random_param: random_state}) else: self.parameters = {random_param: random_state} self.estimator = self.loadEstimator(self.parameters) elif random_state: logger.warning( f"Random state supplied, but alg {self.alg} does not support it." " Ignoring this setting." )
@property def task(self) -> ModelTasks: """Return the task of the model, taken from the data set or deserialized from file if the model is loaded without data. Returns: ModelTasks: task of the model """ return ModelTasks.getModelTask(self.targetProperties) @property def isMultiTask(self) -> bool: """Return if model is a multitask model, taken from the data set or deserialized from file if the model is loaded without data. Returns: bool: True if model is a multitask model """ return self.task.isMultiTask() @property def classPath(self) -> str: """Return the fully classified path of the model. Returns: str: class path of the model """ return self.__class__.__module__ + "." + self.__class__.__name__ @property def outDir(self) -> str: """Return output directory of the model, the model files are stored in this directory (`{baseDir}/{name}`). Returns: str: output directory of the model """ os.makedirs(f"{self.baseDir}/{self.name}", exist_ok=True) return f"{self.baseDir}/{self.name}" @property def outPrefix(self) -> str: """Return output prefix of the model files. The model files are stored with this prefix (i.e. `{outPrefix}_meta.json`). Returns: str: output prefix of the model files """ return f"{self.outDir}/{self.name}" @property def metaFile(self) -> str: return f"{self.outPrefix}_meta.json" @property @abstractmethod def supportsEarlyStopping(self) -> bool: """Return if the model supports early stopping. Returns: bool: True if the model supports early stopping """ @property def optimalEpochs(self) -> int | None: """Return the optimal number of epochs for early stopping. Returns: int | None: optimal number of epochs """ return self._optimalEpochs @optimalEpochs.setter def optimalEpochs(self, value: int | None = None): """Set the optimal number of epochs for early stopping. Args: value (int | None, optional): optimal number of epochs """ self._optimalEpochs = value
[docs] def setParams(self, params: dict | None, reset_estimator: bool = True): """Set model parameters. The estimator is also updated with the new parameters if 'reload_estimator' is `True`. Args: params (dict): dictionary of model parameters or `None` to reset the parameters reset_estimator (bool): if `True`, the estimator is reinitialized with the new parameters """ if self.parameters is not None: self.parameters.update(params) else: self.parameters = params if reset_estimator: self.estimator = self.loadEstimator(self.parameters)
[docs] def checkData(self, ds: QSPRDataset, exception: bool = True) -> bool: """Check if the model has a data set. Args: ds (QSPRDataset): data set to check exception (bool): if true, an exception is raised if no data is set Returns: bool: True if data is set, False otherwise (if exception is False) """ has_data = ds is not None if exception and not has_data: raise ValueError( "No data set specified. " "Make sure you initialized this model " "with a 'QSPRDataset' instance to train on." ) return has_data
[docs] def convertToNumpy( self, X: pd.DataFrame | np.ndarray | QSPRDataset, y: pd.DataFrame | np.ndarray | QSPRDataset | None = None, ) -> tuple[np.ndarray, np.ndarray] | np.ndarray: """Convert the given data matrix and target matrix to np.ndarray format. Args: X (pd.DataFrame, np.ndarray, QSPRDataset): data matrix y (pd.DataFrame, np.ndarray, QSPRDataset): target matrix Returns: data matrix and/or target matrix in np.ndarray format """ if isinstance(X, QSPRDataset): X = X.getFeatures(concat=True, refit_standardizer=False) if isinstance(X, pd.DataFrame): X = X.values if y is not None: if isinstance(y, QSPRDataset): y = y.getTargetPropertiesValues(concat=True) if isinstance(y, pd.DataFrame): y = y.values return X, y else: return X
[docs] def getParameters(self, new_parameters) -> dict | None: """Get the model parameters combined with the given parameters. If both the model and the given parameters contain the same key, the value from the given parameters is used. Args: new_parameters (dict): dictionary of new parameters to add Returns: dict: dictionary of model parameters """ parameters_out = copy.deepcopy(self.parameters) if parameters_out is not None: parameters_out.update(new_parameters) else: parameters_out = new_parameters return parameters_out
[docs] def createPredictionDatasetFromMols( self, mols: list[str | Mol], smiles_standardizer: str | Callable[[str], str] = "chembl", n_jobs: int = 1, fill_value: float = np.nan, ) -> tuple[QSPRDataset, np.ndarray]: """Create a `QSPRDataset` instance from a list of SMILES strings. Args: mols (list[str | Mol]): list of SMILES strings smiles_standardizer (str, callable): smiles standardizer to use n_jobs (int): number of parallel jobs to use fill_value (float): value to fill for missing features Returns: tuple: a tuple containing the `QSPRDataset` instance and a boolean mask indicating which molecules failed to be processed """ # make a molecule table first and add the target properties if isinstance(mols[0], Mol): mols = [Chem.MolToSmiles(mol) for mol in mols] dataset = MoleculeTable.fromSMILES( f"{self.__class__.__name__}_{hash(self)}", mols, drop_invalids=False, n_jobs=n_jobs, ) for target_property in self.targetProperties: target_property.imputer = None dataset.addProperty(target_property.name, np.nan) # create the dataset and get failed molecules dataset = QSPRDataset.fromMolTable( dataset, self.targetProperties, drop_empty=False, drop_invalids=False, n_jobs=n_jobs, ) dataset.standardizeSmiles(smiles_standardizer, drop_invalid=False) failed_mask = dataset.dropInvalids().values # prepare dataset and return it dataset.prepareDataset( smiles_standardizer=smiles_standardizer, feature_calculators=self.featureCalculators, feature_standardizer=self.featureStandardizer, feature_fill_value=fill_value, shuffle=False, ) return dataset, failed_mask
[docs] def predictDataset( self, dataset: QSPRDataset, use_probas: bool = False ) -> np.ndarray | list[np.ndarray]: """ Make predictions for the given dataset. Args: dataset: a `QSPRDataset` instance use_probas: use probabilities if this is a classification model Returns: np.ndarray | list[np.ndarray]: an array of predictions or a list of arrays of predictions (for classification models with use_probas=True) """ if self.task.isRegression() or not use_probas: predictions = self.predict(dataset) # always return 2D array if self.task.isClassification(): predictions = predictions.astype(int) else: # return a list of 2D arrays predictions = self.predictProba(dataset) return predictions
[docs] def predictMols( self, mols: List[str | Mol], use_probas: bool = False, smiles_standardizer: Union[str, callable] = "chembl", n_jobs: int = 1, fill_value: float = np.nan, use_applicability_domain: bool = False, ) -> np.ndarray | list[np.ndarray]: """ Make predictions for the given molecules. Args: mols (List[str | Mol]): list of SMILES strings use_probas (bool): use probabilities for classification models smiles_standardizer: either `chembl`, `old`, or a partial function that reads and standardizes smiles. n_jobs: Number of jobs to use for parallel processing. fill_value: Value to use for missing values in the feature matrix. use_applicability_domain: Use applicability domain to return if a molecule is within the applicability domain of the model. Returns: np.ndarray | list[np.ndarray]: an array of predictions or a list of arrays of predictions (for classification models with use_probas=True) np.ndarray[bool]: boolean mask indicating which molecules fall within the applicability domain of the model """ if not self.featureCalculators: raise ValueError("No feature calculator set on this instance.") # create data set from mols dataset, failed_mask = self.createPredictionDatasetFromMols( mols, smiles_standardizer, n_jobs, fill_value ) # make predictions for the dataset predictions = self.predictDataset(dataset, use_probas) # handle invalids predictions = self.handleInvalidsInPredictions(mols, predictions, failed_mask) # return predictions and if mols are within applicability domain if requested if hasattr(self, "applicabilityDomain") and use_applicability_domain: in_domain = self.applicabilityDomain.contains( dataset.getFeatures(concat=True, ordered=True, refit_standardizer=False) ).values in_domain = self.handleInvalidsInPredictions(mols, in_domain, failed_mask) return predictions, in_domain return predictions
[docs] def cleanFiles(self): """Clean up the model files. Removes the model directory and all its contents. """ if os.path.exists(self.outDir): shutil.rmtree(self.outDir)
[docs] def fitDataset( self, ds: QSPRDataset, monitor=None, mode=EarlyStoppingMode.OPTIMAL, save_model=True, save_data=False, **kwargs, ) -> str: """Train model on the whole attached data set. ** IMPORTANT ** For models that supportEarlyStopping, `CrossValAssessor` should be run first, so that the average number of epochs from the cross-validation with early stopping can be used for fitting the model. Args: ds (QSPRDataset): data set to fit this model on monitor (FitMonitor): monitor for the fitting process, if None, the base monitor is used mode (EarlyStoppingMode): early stopping mode for models that support early stopping, by default fit the 'optimal' number of epochs previously stopped at in model assessment on train or test set, to avoid the use of extra data for a validation set. save_model (bool): save the model to file save_data (bool): save the supplied dataset to file kwargs: additional arguments to pass to fit Returns: str: path to the saved model, if `save_model` is True """ # do some checks self.checkData(ds) # init properties from data self.initFromDataset(ds) # get data X_all = ds.getFeatures(concat=True).values y_all = ds.getTargetPropertiesValues(concat=True).values # load estimator self.estimator = self.loadEstimator(self.parameters) # fit model logger.info( "Model fit started: %s" % datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) self.estimator = self.fit(X_all, y_all, mode=mode, monitor=monitor, **kwargs) logger.info( "Model fit ended: %s" % datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) if hasattr(ds, "applicabilityDomain") and ds.applicabilityDomain is not None: ds.applicabilityDomain.fit(X_all) self.applicabilityDomain = ds.applicabilityDomain if save_data: ds.save() # save model and return path if save_model: return self.save()
[docs] def toJSON(self): o_dict = json.loads(super().toJSON()) estimator_path = self.saveEstimator() estimator_path = estimator_path.replace(self.baseDir, ".") o_dict["py/state"]["baseDir"] = "." o_dict["py/state"]["estimator"] = estimator_path o_dict["py/state"]["alg"] = f"{self.alg.__module__}.{self.alg.__name__}" return json.dumps(o_dict, indent=4)
[docs] def save(self, save_estimator=False): """Save model to file. Args: save_estimator (bool): Explicitly save the estimator to file, if `True`. Note that some models may save the estimator by default even if this argument is `False`. Returns: str: absolute path to the metafile of the saved model str: absolute path to the saved estimator, if `include_estimator` is `True` """ os.makedirs(self.outDir, exist_ok=True) meta_path = self.toFile(self.metaFile) if save_estimator: est_path = self.saveEstimator() return meta_path, est_path else: return meta_path
[docs] @classmethod def fromFile(cls, filename: str) -> "QSPRModel": ret = super().fromFile(filename) model_dir = os.path.dirname(filename) ret.baseDir = os.path.dirname(model_dir) ret.estimator = ret.loadEstimatorFromFile(ret.parameters) return ret
[docs] @abstractmethod def fit( self, X: pd.DataFrame | np.ndarray, y: pd.DataFrame | np.ndarray, estimator: Any = None, mode: EarlyStoppingMode = EarlyStoppingMode.NOT_RECORDING, monitor: "FitMonitor" = None, **kwargs, ) -> Any | tuple[Any, int] | None: """Fit the model to the given data matrix or `QSPRDataset`. Note. convertToNumpy can be called here, to convert the input data to np.ndarray format. Note. if no estimator is given, the estimator instance of the model is used. Note. if a model supports early stopping, the fit function should have the `early_stopping` decorator and the mode argument should be used to set the early stopping mode. If the model does not support early stopping, the mode argument is ignored. Args: X (pd.DataFrame, np.ndarray): data matrix to fit y (pd.DataFrame, np.ndarray): target matrix to fit estimator (Any): estimator instance to use for fitting mode (EarlyStoppingMode): early stopping mode monitor (FitMonitor): monitor for the fitting process, if None, the base monitor is used kwargs: additional arguments to pass to the fit method of the estimator Returns: Any: fitted estimator instance int: in case of early stopping, the number of iterations after which the model stopped training """
[docs] @abstractmethod def predict( self, X: pd.DataFrame | np.ndarray | QSPRDataset, estimator: Any = None ) -> np.ndarray: """Make predictions for the given data matrix or `QSPRDataset`. Note. convertToNumpy can be called here, to convert the input data to np.ndarray format. Note. if no estimator is given, the estimator instance of the model is used. Args: X (pd.DataFrame, np.ndarray, QSPRDataset): data matrix to predict estimator (Any): estimator instance to use for fitting Returns: np.ndarray: 2D array containing the predictions, where each row corresponds to a sample in the data and each column to a target property """
[docs] @abstractmethod def predictProba( self, X: pd.DataFrame | np.ndarray | QSPRDataset, estimator: Any = None ) -> list[np.ndarray]: """Make predictions for the given data matrix or `QSPRDataset`, but use probabilities for classification models. Does not work with regression models. Note. convertToNumpy can be called here, to convert the input data to np.ndarray format. Note. if no estimator is given, the estimator instance of the model is used. Args: X (pd.DataFrame, np.ndarray, QSPRDataset): data matrix to make predict estimator (Any): estimator instance to use for fitting Returns: list[np.ndarray]: a list of 2D arrays containing the probabilities for each class, where each array corresponds to a target property, each row to a sample in the data and each column to a class """
[docs] @abstractmethod def loadEstimator(self, params: dict | None = None) -> object: """Initialize estimator instance with the given parameters. If `params` is `None`, the default parameters will be used. Arguments: params (dict): algorithm parameters Returns: object: initialized estimator instance """
[docs] @abstractmethod def loadEstimatorFromFile(self, params: dict | None = None) -> object: """Load estimator instance from file and apply the given parameters. Args: params (dict): algorithm parameters Returns: object: initialized estimator instance """
[docs] @abstractmethod def saveEstimator(self) -> str: """Save the underlying estimator to file. Returns: path (str): absolute path to the saved estimator """