Source code for qsprpred.data.chem.clustering

import os
from abc import ABC, abstractmethod

import numpy as np
import pandas as pd
from rdkit import Chem, DataStructs
from rdkit.Chem import Mol
from rdkit.SimDivFilters import rdSimDivPickers

from .scaffolds import BemisMurckoRDKit, Scaffold
from .. import MoleculeTable
from ..descriptors.fingerprints import Fingerprint, MorganFP
from ...logs import logger

from qsprpred.data.processing.mol_processor import MolProcessorWithID


[docs]class MoleculeClusters(MolProcessorWithID, ABC): """ Abstract base class for clustering molecules. Attributes: nClusters (int): number of clusters """ def __call__(self, mols: list[str | Mol], props, *args, **kwargs): """ Calculate the clusters for a list of molecules. Args: mol (str | Mol): SMILES or RDKit molecule to calculate the cluster for. Returns: list of cluster index for each molecule """ if isinstance(mols[0], Mol): mols = [Chem.MolToSmiles(mol) for mol in mols] clusters = self.get_clusters(mols) # map clusters to molecules output = np.array([-1]*len(mols)) for cluster_idx, molecule_idxs in clusters.items(): output[molecule_idxs] = cluster_idx return pd.Series(output, index=props[self.idProp])
[docs] @abstractmethod def get_clusters(self, smiles_list: list[str]) -> dict: """ Cluster molecules. Args: smiles_list (list): list of molecules to be clustered Returns: clusters (dict): dictionary of clusters, where keys are cluster indices and values are indices of molecules """
def _set_nClusters(self, N: int) -> None: self.nClusters = self.nClusters if self.nClusters is not None else N // 10 if self.nClusters < 10: self.nClusters = 10 logger.warning( f"Number of initial clusters is too small to combine them well,\ it has set to {self.nClusters}" )
[docs] def supportsParallel(self) -> bool: return False
@abstractmethod def __str__(self): pass
[docs]class RandomClusters(MoleculeClusters): """ Randomly cluster molecules. Attributes: seed (int): random seed nClusters (int): number of clusters id_prop (str): name of the property to be used as ID """ def __init__( self, seed: int = 42, n_clusters: int | None = None, id_prop: str | None = None ): super().__init__(id_prop=id_prop) self.seed = seed self.nClusters = n_clusters
[docs] def get_clusters(self, smiles_list: list[str]) -> dict: """ Cluster molecules. Args: smiles_list (list): list of molecules to be clustered Returns: clusters (dict): dictionary of clusters, where keys are cluster indices \ and values are indices of molecules """ self._set_nClusters(len(smiles_list)) # Initialize clusters clusters = {i: [] for i in range(self.nClusters)} # Randomly assign each molecule to a cluster indices = np.random.RandomState(seed=self.seed).permutation(len(smiles_list)) for i, index in enumerate(indices): clusters[i % self.nClusters].append(index) return clusters
def __str__(self): return "RandomClusters"
[docs]class ScaffoldClusters(MoleculeClusters): """ Cluster molecules based on scaffolds. Attributes: scaffold (Scaffold): scaffold generator id_prop (str): name of the property to be used as ID """ def __init__( self, scaffold: Scaffold = BemisMurckoRDKit(), id_prop: str | None = None ): super().__init__(id_prop=id_prop) self.scaffold = scaffold
[docs] def get_clusters(self, smiles_list: list[str]) -> dict: """ Cluster molecules. Args: smiles_list (list): list of molecules to be clustered Returns: clusters (dict): dictionary of clusters, where keys are cluster indices and values are indices of molecules """ # Generate scaffolds for each molecule mt = MoleculeTable( "scaffolds", pd.DataFrame({"SMILES": smiles_list}), n_jobs=os.cpu_count() ) mt.addScaffolds([self.scaffold]) scaffolds = ( mt.getScaffolds([self.scaffold]) .loc[mt.getDF().index, :] .iloc[:, 0] .tolist() ) # Get unique scaffolds and initialize clusters unique_scaffolds = sorted(list(set(scaffolds))) clusters = {i: [] for i in range(len(unique_scaffolds))} # Cluster molecules based on scaffolds for i, scaffold in enumerate(scaffolds): clusters[unique_scaffolds.index(scaffold)].append(i) return clusters
def __str__(self): return f"ScaffoldClusters_{self.scaffold}"
[docs]class FPSimilarityClusters(MoleculeClusters): def __init__( self, fp_calculator: Fingerprint = MorganFP(radius=3, nBits=2048), id_prop: str | None = None, ) -> None: super().__init__(id_prop=id_prop) self.fp_calculator = fp_calculator
[docs] def get_clusters(self, smiles_list: list[str]) -> dict: """ Cluster a list of SMILES strings based on molecular dissimilarity. Args: smiles_list (list): list of SMILES strings to be clustered Returns: clusters (dict): dictionary of clusters, where keys are cluster indices and values are indices of molecules """ # Get fingerprints for each molecule mols = [Chem.MolFromSmiles(smiles) for smiles in smiles_list] fps = self.fp_calculator.getDescriptors( mols, props={self.fp_calculator.idProp: [str(x) for x in range(len(mols))]} ) # Convert np fingerprints to rdkit fingerprints fps = [ DataStructs.cDataStructs.CreateFromBitString("".join(fp.astype(str))) for fp in fps ] # Get cluster centroids and initialize clusters centroid_indices = self._get_centroids(fps) clusters = {i: [] for i in range(len(centroid_indices))} # Cluster molecules based on centroids for i, fp in enumerate(fps): similarities = [ DataStructs.FingerprintSimilarity(fp, fps[j]) for j in centroid_indices ] clusters[np.argmax(similarities)].append(i) return clusters
@abstractmethod def _get_centroids(self, fps: list) -> list: pass
[docs]class FPSimilarityMaxMinClusters(FPSimilarityClusters): """ Cluster molecules based on molecular fingerprint with MaxMin algorithm. Attributes: fp_calculator (FingerprintSet): fingerprint calculator nClusters (int): number of clusters seed (int): random seed initialCentroids (list): list of indices of initial cluster centroids id_prop (str): name of the property to be used as ID """ def __init__( self, n_clusters: int | None = None, seed: int | None = None, initial_centroids: list[str] | None = None, fp_calculator: Fingerprint = MorganFP(radius=3, nBits=2048), id_prop: str | None = None, ): super().__init__(fp_calculator=fp_calculator, id_prop=id_prop) self.nClusters = n_clusters self.seed = seed self.initialCentroids = initial_centroids def _get_centroids(self, fps: list) -> list: """ Get cluster centroids with MaxMin algorithm. Args: fps (list): list of molecular fingerprints Returns: centroid_indices (list): list of indices of cluster centroids """ self._set_nClusters(len(fps)) picker = rdSimDivPickers.MaxMinPicker() self.centroid_indices = picker.LazyBitVectorPick( fps, len(fps), self.nClusters, firstPicks=self.initialCentroids if self.initialCentroids else [], seed=self.seed if self.seed is not None else -1, ) return self.centroid_indices def __str__(self): return "FPSimilarityMaxMinClusters"
[docs]class FPSimilarityLeaderPickerClusters(FPSimilarityClusters): """ Cluster molecules based on molecular fingerprint with LeaderPicker algorithm. Attributes: fp_calculator (FingerprintSet): fingerprint calculator similarity_threshold (float): similarity threshold id_prop (str): name of the property to be used as ID """ def __init__( self, similarity_threshold: float = 0.7, fp_calculator: Fingerprint = MorganFP(radius=3, nBits=2048), id_prop: str | None = None, ): super().__init__(fp_calculator=fp_calculator, id_prop=id_prop) self.similarityThreshold = similarity_threshold self.fpCalculator = fp_calculator def _get_centroids(self, fps: list) -> list: """ Get cluster centroids with LeaderPicker algorithm. """ picker = rdSimDivPickers.LeaderPicker() self.centroid_indices = picker.LazyBitVectorPick( fps, len(fps), self.similarityThreshold ) return self.centroid_indices def __str__(self): return "FPSimilarityLeaderPickerClusters"