Source code for qsprpred.extra.utils.parallel

import time
from typing import Any, Callable

from distributed import Client, Future

from qsprpred.utils.parallel import JITParallelGenerator


[docs]class DaskJITGenerator(JITParallelGenerator): """Uses the `dask` library to parallelize the processing of an input generator. The main benefit of using `dask` is that it supports distributed computing across multiple machines. """
[docs] def getPool(self) -> Any: return Client(n_workers=self.nWorkers, threads_per_worker=1)
[docs] def checkResultAvailable(self, process: Future): time.sleep(0.1) return process.done()
[docs] def getResult(self, process: Future) -> Any: return process.result()
[docs] def checkProcess(self, process: Future): if process.status == "error": raise process.exception
[docs] def handleException(self, process: Future, exception: Exception) -> Any: return exception
[docs] def createJob(self, pool: Client, process_func: Callable, *args, **kwargs) -> Any: return pool.submit( process_func, *args, **kwargs )