"""
rewards
Created by: Martin Sicho
On: 26.06.22, 18:07
"""
import numpy as np
from abc import ABC, abstractmethod
from drugex.logs import logger
from drugex.training.interfaces import RewardScheme
from drugex.utils.fingerprints import get_fingerprint
from drugex.utils import get_Pareto_fronts
from rdkit import Chem, DataStructs
[docs]class ParetoRewardScheme(RewardScheme, ABC):
[docs] def getParetoFronts(self, scores):
"""
Returns Pareto fronts.
Parameters
----------
scores : np.ndarray
Matrix of scores for the multiple objectives
Returns
-------
list
`list` of Pareto fronts. Each front is a `list` of indices of the molecules in the Pareto front.
Most dominant front is the first one.
"""
return get_Pareto_fronts(scores)
[docs] @abstractmethod
def getMoleculeRank(self, fronts, smiles=None, scores=None):
"""
Ranks molecules within each Pareto front and returns the indices of the molecules in the ranked order.
Parameters
----------
fronts : list
`list` of Pareto fronts. Each front is a `list` of indices of the molecules in the Pareto front.
smiles : list
List of SMILES sequence to be ranked
scores : np.ndarray
matrix of scores for the multiple objectives
Returns
-------
rank : np.array
Indices of the ranked SMILES sequences
"""
pass
def __call__(self, smiles, scores, thresholds) -> np.ndarray:
"""
Returns the rewards for the given SMILES sequences. The reward is calculated
based on Pareto fronts and the rank of the molecule within the Pareto front.
The rank of molecule in a Pareto front depends on the distance metric used by
the specific ranking strategy.
Parameters
----------
smiles : list
List of SMILES sequence to be ranked
scores : np.ndarray
matrix of scores for the multiple objectives
thresholds : list
List of thresholds for the multiple objectives (not used, only for compatibility)
Returns
-------
rewards : np.array
Rewards for the given SMILES sequences
"""
fronts = self.getParetoFronts(scores)
ranks = self.getMoleculeRank(fronts, smiles=smiles, scores=scores)
rewards = np.zeros((len(smiles), 1))
rewards[ranks, 0] = np.arange(len(scores)) / len(scores)
return rewards
[docs]class ParetoCrowdingDistance(ParetoRewardScheme):
"""
Reward scheme that uses the NSGA-II crowding distance
ranking strategy to rank the solutions in the same Pareto frontier.
Paper: Deb, Kalyanmoy, et al. "A fast and elitist multiobjective genetic algorithm:
NSGA-II. IEEE transactions on evolutionary computation 6.2 (2002): 182-197."
"""
[docs] def getMoleculeRank(self, fronts, smiles=None, scores=None):
"""
Crowding distance algorithm to rank the solutions in the same pareto frontier.
Parameters
----------
fronts : list
`list` of Pareto fronts. Each front is a `list` of indices of the molecules in the Pareto front.
smiles : list
List of SMILES sequence to be ranked (not used in the calculation -> just a requirement of the interface because some ranking strategies need it)"
scores : np.ndarray
matrix of scores for the multiple objectives
Returns
-------
rank : np.array
Indices of the SMILES sequences ranked with the NSGA-II crowding distance method from worst to best
"""
# Rank molecules within each Pareto front based on crowding distance
ranked_fronts = []
for front in fronts:
front_scores = scores[front]
front_size = front_scores.shape[0]
crowding_distance = np.zeros(front_size)
# Calculate crowding distance for each score dimension
for i in range(front_scores.shape[1]):
sorted_indices = np.argsort(front_scores[:, i])
# Set crowding distance of boundary solutions to infinity
crowding_distance[sorted_indices[0]] = np.inf
crowding_distance[sorted_indices[-1]] = np.inf
# Get range of scores for current objective
score_range = front_scores[:, i].max() - front_scores[:, i].min()
if score_range == 0:
continue
# Calculate crowding distance for all other solutions
for j in range(1, front_size - 1):
crowding_distance[sorted_indices[j]] += (
front_scores[sorted_indices[j + 1], i]
- front_scores[sorted_indices[j - 1], i]
) / score_range
# Sort front indices based on crowding distance
sorted_indices = np.argsort(-crowding_distance)
ranked_front = front[sorted_indices] # First element is the best
ranked_fronts.append(ranked_front)
# Combine all ranked fronts
rank = np.concatenate(ranked_fronts)
rank = rank[::-1] # From worst to best
return rank
[docs]class ParetoTanimotoDistance(ParetoRewardScheme):
"""
Reward scheme that uses the Tanimoto distance ranking strategy to rank the solutions in the same Pareto frontier.
"""
def __init__(self, distance_metric : str ='min'):
"""
Args:
distance_metric: 'mean', 'min' or 'mutual' - how to compare Tanimoto
similarities of molecules in the same front
"""
super().__init__()
self.distance_metric = distance_metric
[docs] @staticmethod
def calc_fps(mols, fp_type='ECFP6'):
"""
Calculate fingerprints for a list of molecules.
Parameters
----------
mols : list
List of RDKit molecules
fp_type : str
Type of fingerprint to use
Returns
-------
fps : list
List of RDKit fingerprints
"""
fps = []
for i, mol in enumerate(mols):
try:
fps.append(get_fingerprint(mol, fp_type))
except BaseException:
fps.append(None)
return fps
[docs] def getFPs(self, smiles):
"""
Calculate fingerprints for a list of molecules.
Args:
smiles: smiles to calculate fingerprints for
Returns:
list of RDKit fingerprints
"""
mols = [Chem.MolFromSmiles(smile) for smile in smiles]
return self.calc_fps(mols)
def _min_mean_similarity_ranking(self, fronts, fps):
rank = []
for i, front in enumerate(fronts):
front_fps = [fps[f] for f in front]
if len(front) > 2 and None not in front_fps:
# find the min/average tanimoto distance for each molecule to all other molecules in the front
dist = np.array(
[self.func(1 - np.array(DataStructs.BulkTanimotoSimilarity(fp, list(np.delete(front_fps, idx)))))
for idx, fp in enumerate(front_fps)])
# sort front (molecule with the lowest min/average distance to the others is first)
fronts[i] = front[dist.argsort()]
elif None in front_fps:
logger.warning("Invalid molecule in front. Front not ranked.")
rank.extend(fronts[i].tolist())
# from worst to best (most similar in worst front to most dissimilar in best front)
return rank
def _mutual_similarity_ranking(self, fronts, fps):
"""
Alternative Tanimoto similarity based ranking strategy that tries to more closely
emulate the crowding distance with fingerprint similarities. Adapted from the
original code by @XuhanLiu (https://github.com/XuhanLiu/DrugEx/blob/cd384f4a8ed4982776e92293f77afd4ea78644f9/utils/nsgaii.py#L92).
"""
rank = []
for i, front in enumerate(fronts):
fp = [fps[f] for f in front]
if len(front) > 2 and None not in fp:
dist = np.zeros(len(front)) # array of cumulative crowded similarity distance scores for each molecule in the front -> the higher the score, the more diverse the 'crowd' around the molecule -> the better the reward in the end
for j in range(len(front)): # for each molecule in the front
tanimoto = 1 - np.array(DataStructs.BulkTanimotoSimilarity(fp[j], fp)) # calculate tanimoto distance to all other molecules in the front
order = tanimoto.argsort() # get the order from lowest to highest tanimoto distance (the molecule j itself is at index 0)
dist[order[0]] += 0 # the molecule itself is scored 0 because we want a diverse set and this is the baseline lowest score
dist[order[-1]] += 10 ** 4 # ensure the farthest molecule always gets the highest score
for k in range(1, len(order) - 1): # for all other molecules in the front
# save the sum of the differences in distances between molecule j and the other molecules in the front
# in other words: measure how the other molecules 'crowd' together around molecule j
dist[order[k]] += tanimoto[order[k + 1]] - tanimoto[order[k - 1]]
fronts[i] = front[dist.argsort()]
rank.extend(fronts[i].tolist())
return rank
[docs] def getMoleculeRank(self, fronts, smiles=None, scores=None):
"""
Get the rank of the molecules in the Pareto front based on the Tanimoto distance
of molecules in a front.
Parameters
----------
fronts : list
List of Pareto fronts
smiles : list
List of SMILES sequence to be ranked
scores : np.ndarray
Array of scores for each molecule (not used)
Returns
-------
rank : list
List of indices of molecules, ranked from worst to best
"""
fronts = fronts[::-1] # From worst to best
fps = self.getFPs(smiles)
if self.distance_metric == 'mean':
self.func = np.mean
return self._min_mean_similarity_ranking(fronts, fps)
elif self.distance_metric == 'min':
self.func = np.min
return self._min_mean_similarity_ranking(fronts, fps)
if self.distance_metric == 'mutual':
return self._mutual_similarity_ranking(fronts, fps)
[docs]class WeightedSum(RewardScheme):
"""
Reward scheme that uses the weighted sum ranking strategy to rank the solutions.
"""
def __call__(self, smiles, scores, thresholds):
"""
Reward scheme that uses the weighted sum ranking strategy to rank the solutions.
Parameters
----------
smiles : list
List of SMILES sequence to be ranked
scores : np.ndarray
matrix of scores for the multiple objectives
thresholds : np.ndarray
Thresholds for the multiple objectives
Returns
-------
rewards : np.ndarray
Array of rewards for the SMILES sequences
"""
weight = ((scores < thresholds).mean(axis=0, keepdims=True) + 0.01) / \
((scores >= thresholds).mean(axis=0, keepdims=True) + 0.01)
weight = weight / weight.sum()
return scores.dot(weight.T)