import os
import shutil
from typing import ClassVar, Literal, Iterable, Generator, Any, Sized
import pandas as pd
from rdkit import Chem
from qsprpred.data.chem.identifiers import ChemIdentifier
from qsprpred.data.chem.standardizers import ChemStandardizer
from qsprpred.data.storage.interfaces.chem_store import ChemStore
from qsprpred.data.storage.interfaces.stored_mol import StoredMol
from qsprpred.data.storage.tabular.simple import PandasChemStore, \
ParallelizedChemStore
from qsprpred.data.storage.tabular.stored_mol import TabularMol
from qsprpred.logs import logger
from qsprpred.utils.parallel import ParallelGenerator
[docs]
class RepresentationMol(TabularMol):
[docs]
def as_rd_mol(self, add_props=False) -> Chem.Mol:
sdf = self.props["sdf"]
mol = Chem.MolFromMolBlock(
sdf, strictParsing=False, sanitize=False, removeHs=False
)
if add_props:
for prop in self.props:
mol.SetProp(prop, str(self.props[prop]))
return mol
[docs]
def sdf(self) -> str:
return self.props["sdf"]
[docs]
def to_file(self, directory, extension=".csv") -> str:
"""
Write a minimal file containing the SMILES and the ID of the molecule.
Used for ligrep (.csv is the preferred format).
"""
filename = os.path.join(directory, self.id + extension)
if not os.path.isfile(filename):
with open(filename, "w") as f:
f.write("SMILES,id\n")
f.write(f"{self.smiles},{self.id}\n")
return filename
[docs]
class PandasRepresentationStore(
ParallelizedChemStore
):
_notJSON: ClassVar = [*ChemStore._notJSON, "representations"]
def __init__(
self,
name: str,
path: str,
chem_store: ChemStore | None = None,
df: pd.DataFrame | None = None,
store_format: str = "pkl",
add_rdkit: bool = False,
overwrite: bool = False,
chunk_processor: ParallelGenerator = None,
chunk_size: int | None = None,
n_jobs: int = 1,
) -> None:
super().__init__()
self.storage = chem_store
self.rootDir = path
self.path = os.path.abspath(os.path.join(self.rootDir, name))
self.name = name
if df is not None:
raise NotImplementedError(
"Supplying an initial set of representations is not yet supported."
)
if overwrite:
logger.warning(
"Overwriting the representations will not clear the main storage."
"Run clear() on the main storage to clear it separately."
)
self.clear()
if not os.path.exists(self.metaFile):
logger.info(f"Creating new representation store at {self.baseDir}")
assert self.storage is not None, "Storage with molecules must be provided"
self.representations = PandasChemStore(
f"{self.name}_representations",
path=self.baseDir,
df=pd.DataFrame(
columns=[
self.idProp,
"parent_id",
self.smilesProp,
"sdf"
]
),
smiles_col=self.smilesProp,
add_rdkit=add_rdkit,
overwrite=overwrite,
store_format=store_format,
chunk_processor=chunk_processor,
chunk_size=chunk_size,
n_jobs=n_jobs,
)
else:
logger.info(f"Loading representation store at {self.baseDir}")
self.reload()
@property
def name(self) -> str:
"""Name of the data set."""
return self._name
@name.setter
def name(self, value: str):
"""Set the name of the data set."""
self._name = value
self.path = os.path.abspath(os.path.join(self.rootDir, value))
@property
def nJobs(self) -> int:
return self.representations.nJobs
@nJobs.setter
def nJobs(self, n_jobs: int):
self.representations.nJobs = n_jobs
@property
def chunkProcessor(self) -> ParallelGenerator:
return self.representations.chunkProcessor
@property
def baseDir(self) -> str:
return os.path.join(self.path, self.name)
@property
def metaFile(self) -> str:
return os.path.join(self.baseDir, "meta.json")
@property
def smilesProp(self) -> str:
return self.storage.smilesProp
[docs]
def getRepresentations(
self,
mol_id: str,
recursive=True,
is_root=False
) -> list[StoredMol]:
"""Find all representations of a molecule recursively.
Args:
mol_id (str):
identifier of the molecule to find representations for
recursive (bool):
whether to find representations recursively or just one level
is_root (bool):
whether the molecule is the root molecule
(the parent of all representations) -> will be searched for
in the main storage
"""
if not is_root:
mol = self.representations.getMol(mol_id)
mol.__class__ = RepresentationMol
else:
mol = self.storage.getMol(mol_id)
children = list(self.representations.searchOnProperty(
"parent_id",
[mol.id],
name=self.name,
exact=True
)) or None
if children is not None:
for child in children:
child.__class__ = RepresentationMol
child.parent = mol
if recursive:
child.representations = self.getRepresentations(child.id)
return children
@staticmethod
def _attach_reps_to_mol(mol: StoredMol, reps: list[StoredMol]):
reps_orig = mol.representations or []
reps_new = reps or []
reps_combined = reps_orig + reps_new
if not reps_combined:
mol.representations = None
else:
mol.representations = reps_combined
for rep in mol.representations:
rep.parent = mol
[docs]
def getMol(self, mol_id: str) -> StoredMol:
"""Retrieve a molecule with all its representations attached.
Args:
mol_id (str):
identifier of the molecule to retrieve
Returns:
(StoredMol):
molecule with all its representations
attached to its `representations` attribute
"""
try:
is_root = True
mol = self.storage.getMol(mol_id)
except ValueError:
is_root = False
mol = self.representations.getMol(mol_id)
mol.parent = self.getMol(mol.props["parent_id"])
reps = self.getRepresentations(mol_id, is_root=is_root)
self._attach_reps_to_mol(mol, reps)
return mol
[docs]
def addMols(self, smiles: Iterable[str], props: dict[str, list] | None = None,
*args, **kwargs) -> list[StoredMol]:
"""Add new representations to the store.
It is required that
the properties contain a 'parent_id' property that points to the
parent molecule in the underlying `storage` object or another representation
stored in this object itself.
The 'sdf' property
must also be provided, which defines the representation of the molecule
in SDF format. Other properties can be provided as well to indicate the nature
of the representation.
Args:
smiles:
The SMILES of the representations to add.
props:
The properties of the representations to add.
*args:
Additional arguments.
**kwargs:
Additional keyword arguments.
Returns:
(list[StoredMol]):
The added representations.
"""
assert props is not None, "Properties must be provided."
assert "parent_id" in props, \
"Parent ID is missing. It must be provided as 'parent_id' property."
assert "sdf" in props, \
"SDF table is missing. It must be provided as 'sdf' property."
# TODO: add checks for the parent_id and sdf properties
return self.representations.addMols(smiles, props, *args, **kwargs)
[docs]
def removeRepresentations(self, mol_id: str):
"""Remove all representations of a molecule from the store."""
reps = self.getRepresentations(mol_id, recursive=False, is_root=True)
if reps is not None:
for child in reps:
self.removeRepresentations(child.id)
self.representations.removeMol(child.id)
[docs]
def removeMol(self, mol_id: str):
"""Remove all representations of a molecule from the store."""
return self.removeRepresentations(mol_id)
[docs]
def getMolIDs(self) -> tuple[str, ...]:
"""Get the identifiers of all representations in the store."""
return self.storage.getMolIDs()
[docs]
def getMolCount(self):
"""Get the number of representations in the store."""
return self.storage.getMolCount()
[docs]
def iterMols(self) -> Generator[StoredMol, None, None]:
"""Iterate over all molecules in the attached storage
with their representations added.
Yields:
(StoredMol):
molecule with all its representations attached to its `representations` attribute
"""
for mol in self.storage.iterMols():
reps = self.getRepresentations(mol.id, is_root=True)
self._attach_reps_to_mol(mol, reps)
yield mol
[docs]
def iterChunks(self, size: int | None = None, on_props: list | None = None,
chunk_type: Literal["mol", "smiles", "rdkit", "df"] = "mol") -> \
Generator[list[StoredMol | str | Chem.Mol | pd.DataFrame], None, None]:
"""Iterate over chunks of molecules with their representations added.
Args:
size (int):
size of the chunks to yield
on_props (list):
properties to chunk on
chunk_type (str):
type of the chunk to yield
Yields:
(list[StoredMol | str | Chem.Mol | pd.DataFrame]):
chunk of molecules with all representations
attached to their `representations` attribute
"""
storage_props = self.storage.getProperties()
for chunk in self.storage.iterChunks(
size,
[prop for prop in on_props if
prop in storage_props] if on_props else None,
chunk_type
):
if chunk_type == "mol":
for mol in chunk:
reps = self.getRepresentations(mol.id, is_root=True) or []
self._attach_reps_to_mol(mol, reps)
yield chunk
@property
def idProp(self) -> str:
return self.storage.idProp
[docs]
def getProperty(self, name: str, ids: tuple[str] | None = None) -> Iterable[Any]:
if self.representations.hasProperty(name):
return self.representations.getProperty(name, ids)
if self.storage.hasProperty(name):
return self.storage.getProperty(name, ids)
raise ValueError(f"Property '{name}' not found in storage.")
[docs]
def getProperties(self) -> list[str]:
return self.storage.getProperties() + self.representations.getProperties()
[docs]
def addProperty(self, name: str, data: Sized, ids: list[str] | None = None):
return self.representations.addProperty(name, data, ids)
[docs]
def hasProperty(self, name: str) -> bool:
return self.representations.hasProperty(name) or self.storage.hasProperty(name)
[docs]
def removeProperty(self, name: str):
if self.representations.hasProperty(name):
return self.representations.removeProperty(name)
if self.storage.hasProperty(name):
return self.storage.removeProperty(name)
[docs]
def getSubset(self, subset: Iterable[str],
ids: Iterable[str] | None = None) -> "PandasRepresentationStore":
# FIXME: return a new instance of this store
return self.representations.getSubset(subset, ids)
[docs]
def getDF(self) -> pd.DataFrame:
return self.representations.getDF()
[docs]
def dropEntries(self, ids: Iterable[str]):
return self.representations.dropEntries(ids)
[docs]
def addEntries(self, ids: list[str], props: dict[str, list],
raise_on_existing: bool = True):
assert "parent_id" in props, \
"Parent ID is missing. It must be provided as 'parent_id' property."
assert "sdf" in props, \
"SDF table is missing. It must be provided as 'sdf' property."
# FIXME: add checks for the parent_id and sdf properties
return self.representations.addEntries(ids, props, raise_on_existing)
def __getstate__(self):
o_dict = super().__getstate__()
o_dict["representations"] = os.path.relpath(self.representations.save(),
self.baseDir)
return o_dict
def __setstate__(self, state):
super().__setstate__(state)
self.representations = PandasChemStore.fromFile(
os.path.join(self.baseDir, state["representations"])
)
[docs]
def save(self) -> str:
return self.toFile(self.metaFile)
[docs]
def reload(self):
self.__dict__.update(self.fromFile(self.metaFile).__dict__)
[docs]
def clear(self, files_only: bool = True):
"""Clear the storage."""
self.representations.clear(files_only)
if os.path.exists(self.path):
shutil.rmtree(self.path)
@property
def chunkSize(self) -> int:
return self.representations.chunkSize
@chunkSize.setter
def chunkSize(self, chunk_size: int):
self.representations.chunkSize = chunk_size
[docs]
def searchWithSMARTS(self, patterns: list[str]) -> "PandasRepresentationStore":
# FIXME: return a new instance of this store
return self.representations.searchWithSMARTS(patterns)
[docs]
def searchOnProperty(self, prop_name: str, values: list[float | int | str],
exact=False) -> "PandasRepresentationStore":
# FIXME: return a new instance of this store
return self.representations.searchOnProperty(prop_name, values, exact)
@property
def identifier(self) -> ChemIdentifier:
return self.representations.identifier
[docs]
def applyIdentifier(self, identifier: ChemIdentifier):
return self.representations.applyIdentifier(identifier)
@property
def standardizer(self) -> ChemStandardizer:
return self.representations.standardizer
[docs]
def applyStandardizer(self, standardizer: ChemStandardizer):
return self.representations.applyStandardizer(standardizer)
[docs]
def getSummary(self) -> pd.DataFrame:
"""Show the number of representations for each parent molecule."""
reps = self.representations.getDF()
return reps.groupby("parent_id").size().to_frame("num_reps")