import json
import os
from copy import deepcopy
from typing import ClassVar
import numpy as np
import pandas as pd
from ..data.descriptors.sets import DescriptorSet
from ..data.processing.pipeline import DatasetPipeline
from ..data.sampling.splits import DataSplit
from ..data.sources.data_source import DataSource
from ..data.tables.qspr import QSPRTable
from ..logs import logger
from ..models.assessment.methods import ModelAssessor
from ..models.hyperparam_optimization import HyperparameterOptimization
from ..models.model import QSPRModel
from ..models.monitors import NullMonitor
from ..tasks import TargetSpec
from ..utils.serialization import JSONSerializable
[docs]
class Replica(JSONSerializable):
"""Class that determines settings for a single replica of a benchmarking run.
Attributes:
idx (int):
Index of the replica. This is not an identifier, but rather a number
that indicates the order of the replica in the benchmarking run.
name (str):
Name of the replica.
dataSource (DataSource):
Data source to use.
descriptors (list[DescriptorSet]):
Descriptor sets to use.
targetProps (list[TargetProperty]):
Target properties to use.
pipeline (DatasetPipeline):
Feature processing pipeline to use for the replica.
model (QSPRModel):
Current model. Use `initModel` to prepare it.
optimizer (HyperparameterOptimization):
Hyperparameter optimizer to use.
assessors (list[ModelAssessor]):
Model assessors to use.
randomSeed (int):
Random seed to use for all random operations withing the replica.
ds (QSPRDataSet):
Initialized data set. Only available after `initData` has been called.
results (pd.DataFrame):
Results from the replica. Only available after
`runAssessment` has been called.
model (QSPRModel):
Model to use for the replica. This is a deep copy of the model
provided in the constructor.
"""
_notJSON: ClassVar = [*JSONSerializable._notJSON, "ds", "results", "model"]
def __init__(
self,
idx: int,
name: str,
data_source: DataSource,
descriptors: list[DescriptorSet],
target_props: list[TargetSpec],
pipeline: DatasetPipeline,
model: QSPRModel,
optimizer: HyperparameterOptimization,
assessors: list[ModelAssessor],
subsets: dict[str, tuple[DataSplit, str, int]],
random_seed: int,
):
"""Initializes the replica.
Args:
idx (int):
Index of the replica. This is not an identifier, but rather a number
that indicates the order of the replica in the benchmarking run.
name (str):
Name of the replica.
data_source (DataSource):
Data source to use.
descriptors (list[DescriptorSet]):
Descriptor sets to use.
target_props (list[TargetProperty]):
Target properties to use.
pipeline (DatasetPipeline):
Feature processing pipeline to use for the replica.
model (QSPRModel):
Model to use for the replica.
optimizer (HyperparameterOptimization):
Hyperparameter optimizer to use.
assessors (list[ModelAssessor]):
Model assessors to use.
subsets (dict[str, tuple[DataSplit, str, int]]):
Dictionary mapping assessor names to tuples of data split, set
(Train/Test), and fold index. Used to apply assessors to subsets of
the data.
random_seed (int):
Random seed to use for all random operations withing the replica.
"""
self.idx = idx
self.name = name
self.dataSource = data_source
self.descriptors = descriptors
self.targetProps = target_props
self.pipeline = pipeline
self.optimizer = optimizer
self.assessors = assessors
self.subsets = subsets
self.randomSeed = random_seed
self.ds = None
self.results = None
self.model = deepcopy(model)
def __getstate__(self):
o_dict = super().__getstate__()
o_dict["model"] = self.model.save()
o_dict["ds"] = None
o_dict["results"] = None
return o_dict
def __setstate__(self, state):
super().__setstate__(state)
self.model = QSPRModel.fromFile(state["model"])
self.ds = None
self.results = None
def __str__(self):
return self.id
@property
def requiresGpu(self) -> bool:
"""Whether the model requires a GPU.
Returns:
bool:
Whether the model requires a GPU.
"""
return hasattr(self.model, "setGPUs")
[docs]
def setGPUs(self, gpus: list[int]):
"""Sets the GPUs to use for the model.
Args:
gpus (list[int]):
List of GPU indices to use.
"""
if hasattr(self.model, "setGPUs"):
self.model.setGPUs(gpus)
else:
raise ValueError("Model does not support GPU usage.")
[docs]
def getGPUs(self) -> list[int]:
"""Gets the GPUs to use for the model.
Returns:
list[int]:
List of GPU indices to use.
"""
if hasattr(self.model, "getGPUs"):
return self.model.getGPUs()
else:
return []
@property
def id(self) -> str:
"""A unique identifier for the replica.
Returns:
str:
A unique identifier for the replica.
"""
return f"{self.name}_{self.randomSeed}"
[docs]
def initData(self, reload: bool = False):
"""Initializes the data set for this replica.
Args:
reload (bool, optional):
Whether to overwrite all existing data and
reinitialize from scratch. Defaults to `False`.
"""
self.ds = self.dataSource.getDataSet(deepcopy(self.targetProps), )
if not reload:
self.ds.clear()
self.ds.randomState = self.randomSeed
[docs]
def addDescriptors(self, reload: bool = False):
"""Adds descriptors to the current data set. Make sure to call
`initData` first to get it from the source.
Args:
reload (bool, optional):
Whether to overwrite all existing data and
reinitialize from scratch. Defaults to `False`.
Raises:
ValueError:
If the data set has not been initialized.
"""
if self.ds is None:
raise ValueError("Data set not initialized. Call initData first.")
desc_id = "_".join(sorted([str(d) for d in self.descriptors]))
self.ds.name = f"{self.ds.name}_{desc_id}"
if self.requiresGpu:
self.ds.name = f"{self.ds.name}_gpu"
# attempt to load the data set with descriptors
if os.path.exists(self.ds.metaFile) and not reload:
logger.info(f"Reloading existing {self.ds.name} from cache...")
self.ds = QSPRTable.fromFile(self.ds.metaFile)
self.ds.randomState = self.randomSeed
self.ds.setTargetProperties(deepcopy(self.targetProps))
else:
logger.info(f"Data set {self.ds.name} not yet found. It will be created.")
# calculate descriptors if necessary
logger.info(f"Calculating descriptors for {self.ds.name}.")
self.ds.addDescriptors(deepcopy(self.descriptors), recalculate=True)
self.ds.setTargetProperties(deepcopy(self.targetProps))
self.ds.randomState = self.randomSeed
self.ds.save()
[docs]
def initModel(self):
"""Initializes the model for this replica. This includes
initializing the model from the data set and optimizing
the hyperparameters if an optimizer is specified.
Raises:
ValueError:
If the data set has not been initialized.
"""
if self.ds is None:
raise ValueError("Data set not initialized. Call initData first.")
self.model.name = f"{self.id}_{self.ds.name}"
self.model.initFromData(self.ds, self.pipeline)
self.model.initRandomState(self.randomSeed)
if self.optimizer is not None:
self.optimizer.optimize(self.model, self.ds, self.pipeline)
self.model.save()
[docs]
def runAssessment(self):
"""Runs the model assessment for this replica. This includes
running all model assessors and saving the results.
The results are saved in the `results` attribute. They can be
accessed by calling `createReport`, which combines the relevant information
from the replica and the results into one `pd.DataFrame`.
Raises:
ValueError:
If the model has not been initialized.
"""
if self.ds is None:
raise ValueError("Data set not initialized. Call initData first.")
if self.model is None:
raise ValueError("Model not initialized. Call initModel first.")
self.results = None
for assessor in self.assessors:
if assessor.name in self.subsets:
# apply assessor to subset of data only if specified
subset = self.subsets[assessor.name]
fold = [fold for fold in self.ds.split(subset[0])][subset[2]]
indices = fold[0] if subset[1] == "Train" else fold[1]
logger.debug(
f"Applying assessor {assessor.name} to subset of data for replica {self.id}"
)
scores = assessor(self.model, self.ds[indices], self.pipeline)
logger.debug(
f"Successfully applied assessor {assessor.name} to subset of data for replica {self.id}"
)
else:
logger.debug(
f"Applying assessor {assessor.name} on all data for replica {self.id}"
)
scores = assessor(self.model, self.ds, self.pipeline)
if isinstance(scores, float):
scores = np.array([scores])
logger.debug(
f"Assessor {assessor.name} scored model {self.model.name} in replica {self.id}"
)
scores_df = pd.DataFrame()
for i, fold_score in enumerate(scores):
if isinstance(fold_score, float):
if self.model.isMultiTask:
tp = self.targetProps[i]
else:
tp = self.targetProps[0]
score_df = pd.DataFrame(
{
"Assessor": [assessor.name],
"ScoreFunc":
[
(
assessor.scoreFunc.name
if hasattr(assessor.scoreFunc, "name") else
assessor.scoreFunc.__name__
)
],
"Score": [fold_score],
"TargetProperty": [tp.name],
"TargetTask": [tp.task.name],
}
)
scores_df = pd.concat([scores_df, score_df])
else:
for tp_score, tp in zip(fold_score, self.targetProps):
score_df = pd.DataFrame(
{
"Assessor": [assessor.name],
"ScoreFunc":
[
(
assessor.scoreFunc.name
if hasattr(assessor.scoreFunc, "name") else
assessor.scoreFunc.__name__
)
],
"Score": [tp_score],
"TargetProperty": [tp.name],
"TargetTask": [tp.task.name],
}
)
scores_df = pd.concat([scores_df, score_df])
if self.results is None:
self.results = scores_df
else:
self.results = pd.concat([self.results, scores_df])
[docs]
def createReport(self):
"""Creates a report from the results of this replica.
Returns:
pd.DataFrame:
A `pd.DataFrame` with the results of this replica.
Raises:
ValueError:
If the results have not been calculated.
"""
if self.results is None:
raise ValueError("Results not available. Call runAssessment first.")
results = self.results.copy()
results["ModelFile"] = self.model.metaFile
results["Algorithm"] = self.model.alg.__name__
results["AlgorithmParams"] = json.dumps(self.model.parameters)
results["ReplicaID"] = self.id
results["DataSet"] = self.ds.name
out_file = f"{self.model.outPrefix}_replica.json"
for assessor in self.assessors:
# FIXME: some problems in monitor serialization now prevent this
assessor.monitor = NullMonitor()
results["ReplicaFile"] = self.toFile(out_file)
return results