"""This module holds the base class for DNN models
as well as fully connected NN subclass.
"""
import inspect
from collections import defaultdict
import numpy as np
import torch
from torch import nn, optim
from torch.nn import functional as f
from torch.utils.data import DataLoader, TensorDataset
from ....logs import logger
from ....models.monitors import BaseMonitor, FitMonitor
[docs]class Base(nn.Module):
"""Base structure for all classification/regression DNN models.
Mainly, it provides the general methods for training, evaluating model and
predicting the given data.
Attributes:
n_epochs (int):
(maximum) number of epochs to train the model
lr (float):
learning rate
batch_size (int):
batch size
patience (int):
number of epochs to wait before early stop if no progress on validation
set score, if patience = -1, always train to `n_epochs`
tol (float):
minimum absolute improvement of loss necessary to count as progress
on best validation score
device (torch.device):
device to run the model on
gpus (list):
list of gpus to run the model on
"""
def __init__(
self,
device: str,
gpus: list[int],
n_epochs: int = 1000,
lr: float = 1e-4,
batch_size: int = 256,
patience: int = 50,
tol: float = 0,
):
"""Initialize the DNN model.
Args:
device (str):
device to run the model on
gpus (list):
list of gpus to run the model on
n_epochs (int):
(maximum) number of epochs to train the model
lr (float):
learning rate
batch_size (int):
batch size
patience (int):
number of epochs to wait before early stop if no progress on validation
set score, if patience = -1, always train to `n_epochs`
tol (float):
minimum absolute improvement of loss necessary to count as progress
on best validation score
"""
super().__init__()
self.n_epochs = n_epochs
self.lr = lr
self.batch_size = batch_size
self.patience = patience
self.tol = tol
self.device = torch.device(device)
self.gpus = gpus
if len(self.gpus) > 1:
logger.warning(
f"At the moment multiple gpus is not possible: "
f"running DNN on gpu: {gpus[0]}."
)
[docs] def fit(
self,
X_train,
y_train,
X_valid=None,
y_valid=None,
monitor: FitMonitor | None = None,
) -> int:
"""Training the DNN model.
Training is, similar to the scikit-learn or Keras style.
It saves the optimal value of parameters.
Args:
X_train (np.ndarray or pd.Dataframe):
training data (m X n), m is the No. of samples, n is the No. of features
y_train (np.ndarray or pd.Dataframe):
training target (m X l), m is the No. of samples, l is
the No. of classes or tasks
X_valid (np.ndarray or pd.Dataframe):
validation data (m X n), m is the No. of samples, n is
the No. of features
y_valid (np.ndarray or pd.Dataframe):
validation target (m X l), m is the No. of samples, l is
the No. of classes or tasks
monitor (FitMonitor):
monitor to use for training, if None, use base monitor
Returns:
int:
the epoch number when the optimal model is saved
"""
self.to(self.device)
monitor = BaseMonitor() if monitor is None else monitor
train_loader = self.getDataLoader(X_train, y_train)
valid_loader = None
# if validation data is provided, use early stopping
if X_valid is not None and y_valid is not None:
valid_loader = self.getDataLoader(X_valid, y_valid)
patience = self.patience
else:
patience = -1
if "optim" in self.__dict__:
optimizer = self.optim
else:
optimizer = optim.Adam(self.parameters(), lr=self.lr)
# record the minimum loss value based on the calculation of the
# loss function by the current epoch
best_loss = np.inf
best_weights = self.state_dict()
last_save = 0 # record the epoch when optimal model is saved.
for epoch in range(self.n_epochs):
monitor.onEpochStart(epoch)
loss = None
# decrease learning rate over the epochs
for param_group in optimizer.param_groups:
param_group["lr"] = self.lr * (1 - 1 / self.n_epochs) ** (epoch * 10)
for i, (Xb, yb) in enumerate(train_loader):
monitor.onBatchStart(i)
# Batch of target tenor and label tensor
Xb, yb = Xb.to(self.device), yb.to(self.device)
optimizer.zero_grad()
# predicted probability tensor
y_ = self(Xb, is_train=True)
# ignore all the NaN values
ix = yb == yb
if self.n_class > 1:
yb, y_ = yb[ix], y_[ix[:, -1], :]
else:
yb, y_ = yb[ix], y_[ix]
# loss function calculation based on predicted tensor and label tensor
if self.n_class > 1:
loss = self.criterion(y_, yb.long())
else:
loss = self.criterion(y_, yb)
loss.backward()
optimizer.step()
monitor.onBatchEnd(i, float(loss))
if patience == -1:
monitor.onEpochEnd(epoch, loss.item())
else:
# loss value on validation set based on which optimal model is saved.
loss_valid = self.evaluate(valid_loader)
if loss_valid + self.tol < best_loss:
best_weights = self.state_dict()
best_loss = loss_valid
last_save = epoch
elif epoch - last_save > patience: # early stop
break
monitor.onEpochEnd(epoch, loss.item(), loss_valid)
if patience == -1:
best_weights = self.state_dict()
self.load_state_dict(best_weights)
return self, last_save
[docs] def evaluate(self, loader) -> float:
"""Evaluate the performance of the DNN model.
Args:
loader (torch.util.data.DataLoader):
data loader for test set,
including m X n target FloatTensor and l X n label FloatTensor
(m is the No. of sample, n is the No. of features, l is the
No. of classes or tasks)
Return:
loss (float):
the average loss value based on the calculation of loss
function with given test set.
"""
self.to(self.device)
loss = 0
for Xb, yb in loader:
Xb, yb = Xb.to(self.device), yb.to(self.device)
y_ = self.forward(Xb)
ix = yb == yb
if self.n_class > 1:
yb, y_ = yb[ix], y_[ix[:, -1], :]
else:
yb, y_ = yb[ix], y_[ix]
if self.n_class > 1:
loss += self.criterion(y_, yb.long()).item()
else:
loss += self.criterion(y_, yb).item()
loss = loss / len(loader)
return loss
[docs] def predict(self, X_test) -> np.ndarray:
"""Predicting the probability of each sample in the given dataset.
Args:
X_test (ndarray):
m X n target array (m is the No. of sample,
n is the No. of features)
Returns:
score (ndarray):
probability of each sample in the given dataset,
it is an m X l FloatTensor (m is the No. of sample, l is the
No. of classes or tasks.)
"""
self.to(self.device)
loader = self.getDataLoader(X_test)
score = []
for X_b in loader:
X_b = X_b.to(self.device)
y_ = self.forward(X_b)
score.append(y_.detach().cpu())
score = torch.cat(score, dim=0).numpy()
return score
@classmethod
def _get_param_names(cls) -> list:
"""Get the class parameter names.
Function copied from sklearn.base_estimator!
Returns:
parameter names (list): list of the class parameter names.
"""
init_signature = inspect.signature(cls.__init__)
parameters = [
p
for p in init_signature.parameters.values()
if p.name != "self" and p.kind != p.VAR_KEYWORD
]
return sorted([p.name for p in parameters])
[docs] def get_params(self, deep=True) -> dict:
"""Get parameters for this estimator.
Function copied from sklearn.base_estimator!
Args:
deep (bool): If True, will return the parameters for this estimator
Returns:
params (dict): Parameter names mapped to their values.
"""
out = {}
for key in self._get_param_names():
value = getattr(self, key)
if deep and hasattr(value, "get_params"):
deep_items = value.get_params().items()
out.update((key + "__" + k, val) for k, val in deep_items)
out[key] = value
return out
[docs] def set_params(self, **params) -> "Base":
"""Set the parameters of this estimator.
Function copied from sklearn.base_estimator!
The method works on simple estimators as well as on nested objects
(such as :class:`~sklearn.pipeline.Pipeline`). The latter have
parameters of the form ``<component>__<parameter>`` so that it's
possible to update each component of a nested object.
Args:
**params : dict Estimator parameters.
Returns:
self : estimator instance
"""
if not params:
# Simple optimization to gain speed (inspect is slow)
return self
valid_params = self.get_params(deep=True)
# grouped by prefix
nested_params = defaultdict(dict)
for key, value in params.items():
key, delim, sub_key = key.partition("__")
if key not in valid_params:
local_valid_params = self._get_param_names()
raise ValueError(
f"Invalid parameter {key!r} for estimator {self}. "
f"Valid parameters are: {local_valid_params!r}."
)
if delim:
nested_params[key][sub_key] = value
else:
setattr(self, key, value)
valid_params[key] = value
for key, sub_params in nested_params.items():
valid_params[key].set_params(**sub_params)
return self
[docs] def getDataLoader(self, X, y=None):
"""Convert data to tensors and get generator over dataset with dataloader.
Args:
X (numpy 2d array): input dataset
y (numpy 1d column vector): output data
"""
# if pandas dataframe is provided, convert it to numpy array
if hasattr(X, "values"):
X = X.values
if y is not None and hasattr(y, "values"):
y = y.values
if y is None:
tensordataset = torch.Tensor(X)
else:
tensordataset = TensorDataset(torch.Tensor(X), torch.Tensor(y))
return DataLoader(tensordataset, batch_size=self.batch_size)
[docs]class STFullyConnected(Base):
"""Single task DNN classification/regression model.
It contains four fully connected layers between which are
dropout layers for robustness.
Attributes:
n_dim (int): the No. of columns (features) for input tensor
n_class (int): the No. of columns (classes) for output tensor.
device (torch.cude): device to run the model on
gpus (list): list of gpu ids to run the model on
n_epochs (int): max number of epochs
lr (float): neural net learning rate
batch_size (int): batch size for training
patience (int): early stopping patience
tol (float): early stopping tolerance
is_reg (bool): whether the model is for regression or classification
neurons_h1 (int): No. of neurons in the first hidden layer
neurons_hx (int): No. of neurons in the second hidden layer
extra_layer (bool): whether to add an extra hidden layer
dropout_frac (float): dropout fraction
criterion (torch.nn.Module): the loss function
dropout (torch.nn.Module): the dropout layer
fc0 (torch.nn.Module): the first fully connected layer
fc1 (torch.nn.Module): the second fully connected layer
fc2 (torch.nn.Module): the third fully connected layer
fc3 (torch.nn.Module): the fourth fully connected layer
activation (torch.nn.Module): the activation function
"""
def __init__(
self,
n_dim,
n_class,
device,
gpus,
n_epochs=100,
lr=None,
batch_size=256,
patience=50,
tol=0,
is_reg=True,
neurons_h1=256,
neurons_hx=128,
extra_layer=False,
dropout_frac=0.25,
):
"""Initialize the STFullyConnected model.
Args:
n_dim (int):
the No. of columns (features) for input tensor
n_class (int):
the No. of columns (classes) for output tensor.
device (torch.cude):
device to run the model on
gpus (list):
list of gpu ids to run the model on
n_epochs (int):
max number of epochs
lr (float):
neural net learning rate
batch_size (int):
batch size
patience (int):
number of epochs to wait before early stop if no progress on
validation set score, if patience = -1, always train to n_epochs
tol (float):
minimum absolute improvement of loss necessary to
count as progress on best validation score
is_reg (bool, optional):
Regression model (True) or Classification model (False)
neurons_h1 (int):
number of neurons in first hidden layer
neurons_hx (int):
number of neurons in other hidden layers
extra_layer (bool):
add third hidden layer
dropout_frac (float):
dropout fraction
"""
if not lr:
lr = 1e-4 if is_reg else 1e-5
super().__init__(
device=device,
gpus=gpus,
n_epochs=n_epochs,
lr=lr,
batch_size=batch_size,
patience=patience,
tol=tol,
)
self.n_dim = n_dim
self.is_reg = is_reg
self.n_class = n_class if not self.is_reg else 1
self.neurons_h1 = neurons_h1
self.neurons_hx = neurons_hx
self.extra_layer = extra_layer
self.dropout_frac = dropout_frac
self.dropout = None
self.fc0 = None
self.fc1 = None
self.fc2 = None
self.fc3 = None
self.activation = None
self.criterion = None
self.initModel()
[docs] def initModel(self):
"""Define the layers of the model."""
self.dropout = nn.Dropout(self.dropout_frac)
self.fc0 = nn.Linear(self.n_dim, self.neurons_h1)
self.fc1 = nn.Linear(self.neurons_h1, self.neurons_hx)
if self.extra_layer:
self.fc2 = nn.Linear(self.neurons_hx, self.neurons_hx)
self.fc3 = nn.Linear(self.neurons_hx, self.n_class)
if self.is_reg:
# loss function for regression
self.criterion = nn.MSELoss()
elif self.n_class == 1:
# loss and activation function of output layer for binary classification
self.criterion = nn.BCELoss()
self.activation = nn.Sigmoid()
else:
# loss and activation function of output layer for multiple classification
self.criterion = nn.CrossEntropyLoss()
self.activation = nn.Softmax(dim=1)
[docs] def set_params(self, **params) -> "STFullyConnected":
"""Set parameters and re-initialize model.
Args:
**params: parameters to be set
Returns:
self (STFullyConnected): the model itself
"""
super().set_params(**params)
self.initModel()
return self
[docs] def forward(self, X, is_train=False) -> torch.Tensor:
"""Invoke the class directly as a function.
Args:
X (FloatTensor):
m X n FloatTensor, m is the No. of samples, n is
the No. of features.
is_train (bool, optional):
is it invoked during training process (True) or
just for prediction (False)
Returns:
y (FloatTensor): m X n FloatTensor, m is the No. of samples,
n is the No. of classes
"""
y = f.relu(self.fc0(X))
if is_train:
y = self.dropout(y)
y = f.relu(self.fc1(y))
if self.extra_layer:
if is_train:
y = self.dropout(y)
y = f.relu(self.fc2(y))
if is_train:
y = self.dropout(y)
if self.is_reg:
y = self.fc3(y)
else:
y = self.activation(self.fc3(y))
return y