Source code for qsprpred.benchmarks.runner

import itertools
import logging
import os
import queue
import random
import sys
import traceback
from threading import Lock as TLock, Thread
from typing import Generator, Any

import pandas as pd

from qsprpred.extra.gpu.utils.parallel import TorchJITGenerator
from .replica import Replica
from .settings.benchmark import BenchmarkSettings
from ..logs import logger
from ..utils.parallel import ParallelGenerator, MultiprocessingJITGenerator, \
    ThreadsJITGenerator, PebbleJITGenerator


[docs]class ExcThread(Thread): """Thread that can catch exceptions from the target function.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.bucket = queue.Queue()
[docs] def run(self): try: super().run() except Exception: self.bucket.put(sys.exc_info())
[docs]class BenchmarkRunner: """Class that runs benchmarking experiments as defined by `BenchmarkSettings`. It translates the settings into a list of `Replica` objects with its `iterReplicas` method and runs them in parallel. Each replica is processed by the `runReplica` method. The report from each replica is appended to a `resultsFile`, which is read and returned by the `run` method after the runners is finished with all replicas. All outputs generated by the replicas and the `BenchmarkSettings` used are saved in the `dataDir`. The random seed for each replica is determined in a pseudo-random way from `BenchmarkSettings.random_seed`. The `getSeedList` method is used to generate a list of seeds from this 'master' seed. There are some caveats to this method (see the docstring of `getSeedList`). Attributes: settings (BenchmarkSettings): Benchmark settings. nProc (int): Number of processes to use. resultsFile (str): Path to the results file. dataDir (str): Path to the directory to store data. """ # default locks for threads lock_data_t = TLock() lock_report_t = TLock()
[docs] class ReplicaException(Exception): """Custom exception for errors in a replica. Attributes: replicaID (int): ID of the replica that caused the error. exception (Exception): Exception that was raised. """ def __init__(self, replica_id: str, exception: Exception): """Initialize the exception. Args: replica_id (str): ID of the replica that caused the error. exception (Exception): Exception that was raised. """ self.replicaID = replica_id self.exception = exception
logLevel = logging.DEBUG def __init__( self, settings: BenchmarkSettings, data_dir: str = "./data", results_file: str | None = None, parallel_generator_cpu: ParallelGenerator | None = None, parallel_generator_gpu: ParallelGenerator | None = None, ): """Initialize the runner. Args: settings (BenchmarkSettings): Benchmark settings. data_dir (str, optional): Path to the directory to store data. Defaults to "./data". If the directory does not exist, it will be created. results_file (str, optional): Path to the results file. Defaults to "{data_dir}/data/results.tsv". parallel_generator_cpu (ParallelGenerator, optional): Parallel generator for CPU replicas. Defaults to `MultiprocessingPoolGenerator` on all available CPUs. parallel_generator_gpu (ParallelGenerator, optional): Parallel generator for GPU replicas. Defaults to `None`. """ logger.debug("Initializing BenchmarkRunner...") self.settings = settings self.parallelGeneratorCPU = (parallel_generator_cpu or MultiprocessingJITGenerator( os.cpu_count() )) self.parallelGeneratorGPU = parallel_generator_gpu self.dataDir = data_dir self.resultsFile = results_file if results_file else f"{data_dir}/results.tsv" os.makedirs(self.dataDir, exist_ok=True) logger.debug(f"Saving settings to: {self.dataDir}/settings.json") self.settings.toFile(f"{self.dataDir}/settings.json") @property def nRuns(self) -> int: """Returns the total number of benchmarking runs. This is the product of the number of replicas, data sources, descriptors, target properties, data preparation settings and models as defined in the `BenchmarkSettings`. Returns: int: Total number of benchmarking runs. """ benchmark_settings = self.settings benchmark_settings.checkConsistency() ret = ( benchmark_settings.n_replicas * len(benchmark_settings.data_sources) * len(benchmark_settings.descriptors) * len(benchmark_settings.target_props) * len(benchmark_settings.prep_settings) * len(benchmark_settings.models) ) if len(benchmark_settings.optimizers) > 0: ret *= len(benchmark_settings.optimizers) return ret
[docs] def createLocks(self, generator: ParallelGenerator): """Creates locks for input and output operations done in `runReplica`. This method should be overridden if the generator uses a different multiprocessing library or threading implementation. At the moment, this method can only provide locks for the `MultiprocessingPoolGenerator`. Args: generator (ParallelGenerator): Parallel generator to use. Returns: Any: Lock for data set operations. Any: Lock for final report file operations. Raises: ValueError: If the generator is not a `MultiprocessingPoolGenerator` and lock types cannot be determined automatically. """ if isinstance(generator, TorchJITGenerator): import torch.multiprocessing manager = torch.multiprocessing.Manager() elif isinstance(generator, (MultiprocessingJITGenerator, PebbleJITGenerator)): import multiprocessing manager = multiprocessing.Manager() elif isinstance(generator, ThreadsJITGenerator): return self.lock_data_t, self.lock_report_t else: raise ValueError( f"Unknown generator type: {generator}. " f"Cannot create locks. " f"Override this method to create locks for this generator." ) return manager.Lock(), manager.Lock()
[docs] def processReplicas( self, generator: ParallelGenerator, replicas: Generator[Replica, None, None], raise_errors=False, ): """Processes replicas in parallel using the given `ParallelGenerator`. Each generated replica is run by the `runReplica` method, which is executed in parallel by the parallel generator according to its implementation. Args: generator (ParallelGenerator): Parallel generator to use. replicas (Generator[Replica, None, None]): Generator that yields `Replica` objects. raise_errors (bool, optional): Whether to raise the first encountered `ReplicaException` and stop the benchmarking run. Defaults to `False`, in which case replicas that raise an exception are skipped and errors are logged. """ lock_data, lock_report = self.createLocks(generator) for result in generator( replicas, self.runReplica, self.resultsFile, lock_data, lock_report ): if isinstance(result, self.ReplicaException): if raise_errors: raise result.exception else: logger.error( f"Error in replica {result.replicaID}: {result.exception}" ) elif isinstance(result, Exception): raise result else: logger.debug(f"Return success from replica: {result}")
[docs] def run(self, raise_errors=False) -> pd.DataFrame: """Runs the benchmarking experiments. Args: raise_errors (bool, optional): Whether to raise the first encountered `ReplicaException` and stop the benchmarking run. Defaults to `False`, in which case replicas that raise an exception are skipped and errors are logged. Returns: pd.DataFrame: Results from the benchmarking experiments. """ logger.debug(f"Performing {self.nRuns} replica runs...") if self.parallelGeneratorGPU is not None: gpu_replicas = ( replica for replica in self.iterReplicas() if replica.requiresGpu ) cpu_replicas = ( replica for replica in self.iterReplicas() if not replica.requiresGpu ) else: cpu_replicas = self.iterReplicas() gpu_replicas = None # run gpu replicas if there are any gpu_thread = None if gpu_replicas is not None: gpu_thread = ExcThread( target=self.processReplicas, args=(self.parallelGeneratorGPU, gpu_replicas, raise_errors), ) gpu_thread.start() logger.debug("Started GPU replicas thread...") # run cpu replicas logger.debug("Starting CPU replicas...") self.processReplicas(self.parallelGeneratorCPU, cpu_replicas, raise_errors) logger.debug("Finished CPU replicas.") # wait for gpu replicas to finish if gpu_thread is not None: logger.debug("Waiting for GPU replicas to finish...") gpu_thread.join() if gpu_thread.bucket.qsize() > 0: raise gpu_thread.bucket.get()[1] else: logger.debug("Finished GPU replicas.") logger.debug("Finished all replica runs.") return pd.read_table(self.resultsFile)
[docs] def getSeedList(self, seed: int | None = None) -> list[int]: """ Get a list of seeds for the replicas from one 'master' randomSeed. The list of seeds is generated by sampling from the range of possible seeds (0 to 2**31-1) with the given randomSeed as the random randomSeed for the random module. This means that the list of seeds will be the same for each run of the benchmarking experiment with the same 'master' randomSeed. This is useful for reproducibility, but it also avoids recalculating replicas that were already calculated. Caveat: If the randomSeed in `BenchmarkSettings.randomSeed` is the same, but the number of replicas is different (i.e. the settings themselves change) then this code will still generate the same seeds for experiments that might not overlap with previous experiments. Therefore, take this into account when you already calculated some replicas, but decided to change your experiment settings. Args: seed (int, optional): 'Master' randomSeed. Defaults to `BenchmarkSettings.randomSeed`. Returns: list[int]: list of seeds for the replicas """ seed = seed or self.settings.random_seed random.seed(seed) return random.sample(range(2 ** 31), self.nRuns)
[docs] def iterReplicas(self) -> Generator[Replica, None, None]: """Generator that yields `Replica` objects for each benchmarking run. This is done by iterating over the product of the data sources, descriptors, target properties, data preparation settings, models and optimizers as defined in the `BenchmarkSettings`. The random randomSeed for each replica is determined in a pseudo-random way from `BenchmarkSettings.randomSeed` via the `getSeedList` method. Yields: Generator[Replica, None, None]: `Replica` objects for each benchmarking run. """ benchmark_settings = self.settings benchmark_settings.checkConsistency() indices = [x + 1 for x in range(benchmark_settings.n_replicas)] optimizers = ( benchmark_settings.optimizers if len(benchmark_settings.optimizers) > 0 else [None] ) product = itertools.product( indices, [benchmark_settings.name], benchmark_settings.data_sources, benchmark_settings.descriptors, benchmark_settings.target_props, benchmark_settings.prep_settings, benchmark_settings.models, optimizers, ) seeds = self.getSeedList(benchmark_settings.random_seed) for idx, settings in enumerate(product): yield self.makeReplica( *settings, random_seed=seeds[idx], assessors=benchmark_settings.assessors, )
[docs] def makeReplica(self, *args, **kwargs) -> Replica: """Returns a `Replica` object for the given settings. This is useful for debugging. Returns: Replica: Replica object. """ return Replica(*args, **kwargs)
[docs] @classmethod def getLoggerForReplica(cls, replica: Replica, level: int = logging.DEBUG): """Returns a logger for the given replica. Args: replica (Replica): Replica to get the logger for. level (int, optional): Log level. Defaults to logging.DEBUG. """ replica_logger = logging.getLogger(replica.id) if len(replica_logger.handlers) > 0: return replica_logger replica_logger.setLevel(level) sh = logging.StreamHandler() formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s") sh.setFormatter(formatter) sh.setLevel(level) replica_logger.addHandler(sh) return replica_logger
[docs] @classmethod def checkReplicaInResultsFile(cls, replica: Replica, results_file: str) -> bool: """Checks if the replica is already present in the results file. This method is thread-safe. Args: replica (Replica): Replica to check. results_file (str): Path to the results file. Returns: bool: Whether the replica is already present in the results file. """ if not os.path.exists(results_file): return False df_results = pd.read_table(results_file) return df_results.ReplicaID.isin([replica.id]).any()
[docs] @classmethod def replicaToReport(cls, replica: Replica) -> pd.DataFrame: """Converts a replica to a report. Args: replica (Replica): Replica to convert. Returns: pd.DataFrame: Report from the replica. """ return replica.createReport()
[docs] @classmethod def appendReportToResults(cls, df_report: pd.DataFrame, results_file: str): """Appends a report to the results file. This method is thread-safe. Args: df_report (pd.DataFrame): Report to append. results_file (str): Path to the results file. """ df_report.to_csv( results_file, sep="\t", index=False, mode="a", header=not os.path.exists(results_file), )
[docs] @classmethod def initData(cls, replica: Replica): """Initializes the data set for this replica. This method is thread-safe. Args: replica (Replica): Replica to initialize. """ logger = cls.getLoggerForReplica(replica, cls.logLevel) logger.debug("Initializing data set...") replica.initData() logger.debug("Done.") logger.debug("Adding descriptors...") replica.addDescriptors() logger.debug("Done.")
[docs] @classmethod def runReplica( cls, replica: Replica, results_file: str, lock_data: Any, lock_report: Any, gpu: int | None = None, ) -> str | ReplicaException: """Runs a single replica. This is executed in parallel by the `run` method. It is a classmethod so that it can be pickled and executed in parallel more easily. Args: replica (Replica): Replica to run. results_file (str): Path to the results file. lock_data (Any): Lock for data operations. lock_report (Any): Lock for report operations. Returns: str | ReplicaException: ID of the replica that was run or a `ReplicaException` if an error was encountered. """ if gpu is not None: replica.setGPUs([gpu]) logger = cls.getLoggerForReplica(replica, cls.logLevel) logger.debug(f"Starting replica: {replica.id}") try: with lock_data: logger.debug(f"Checking {replica.id} in results file: {results_file}") if cls.checkReplicaInResultsFile(replica, results_file): logger.warning(f"Skipping {replica.id}. Already in results file.") return replica.id logger.debug("Initializing data...") cls.initData(replica) logger.debug("Done.") logger.debug("Preparing data set...") replica.prepData() logger.debug("Done.") logger.debug("Initializing model...") replica.initModel() logger.debug("Done.") logger.debug("Running assessments...") replica.runAssessment() logger.debug("Done.") logger.debug("Creating report...") df_report = cls.replicaToReport(replica) logger.debug("Done.") with lock_report: logger.debug(f"Adding report to: {results_file}") cls.appendReportToResults(df_report, results_file) logger.debug("Done.") logger.debug(f"Finished replica: {replica.id}") return replica.id except Exception as e: logger.error(f"Error in replica '{replica}': {e}") traceback.print_exception(type(e), e, e.__traceback__) return cls.ReplicaException(replica.id, e)