Source code for bird_cloud_gnn.gnn_model

"""Module for creating GCN class"""

import os
import dgl
import numpy as np
from dgl.dataloading import GraphDataLoader
from dgl.nn.pytorch.conv import GraphConv
from torch import nn
from torch import optim
from torch.nn.modules import Module
from tqdm import tqdm


os.environ["DGLBACKEND"] = "pytorch"


[docs] class GCN(nn.Module): """Graph Convolutional Network construction module A n-layer GCN is constructed from input features and list of layers Each layer computes new node representations by aggregating neighbour information. Args: in_feats (int): the number of input features layers_data (list): is a list of tuples of size of hidden layer and activation function Attributes: in_feats (int): the number of input features layers (nn.ModuleList): list of layers name (str): name of the model num_classes (int): the last size should correspond to the number of classes were predicting Methods: oneline_description(): Description of the model to uniquely identify it in logs forward(g, in_feats): Computes the output of the model. fit(train_dataloader, learning_rate=0.01, num_epochs=20): Train the model. evaluate(test_dataloader): Evaluate model. fit_and_evaluate(train_dataloader, test_dataloader, callback=None, learning_rate=0.01, num_epochs=20, sch_explr_gamma=0.99, sch_multisteplr_milestones=None, sch_multisteplr_gamma=0.1): Fit the model while evaluating every iteraction. """ def __init__(self, in_feats: int, layers_data: list): """ The __init__ function is the constructor for a class. It is called when an object of that class is instantiated. It can have multiple arguments and it will always be called before __new__(). The __init__ function does not return anything. Args: self: Access variables that belongs to the class object in_feats: the number of input features layers_data: is a list of tuples of size of hidden layer and activation function Returns: The self object """ super().__init__() self.in_feats = in_feats self.layers = nn.ModuleList() self.name = "" for size, activation in layers_data: self.layers.append(GraphConv(in_feats, size)) self.name = self.name + f"{in_feats}-{size}_" in_feats = size # For the next layer if activation is not None: assert isinstance( activation, Module ), "Each tuples should contain a size (int) and a torch.nn.modules.Module." self.layers.append(activation) self.name = self.name + repr(activation).split("(", 1)[0] + "_" self.num_classes = size # the last size should correspond to the number of classes were predicting
[docs] def oneline_description(self): """Description of the model to uniquely identify it in logs""" return "-".join(["in_", f"{self.name}", "mean-out"])
[docs] def forward(self, g, in_feats): """ The forward function computes the output of the model. Args: self: Access the attributes of the class g: Access the graph structure and send messages between nodes in_feat: Pass the input feature of the node Returns: The output of the second convolutional layer """ for layer in self.layers: if isinstance(layer, (nn.ReLU, nn.LeakyReLU, nn.ELU)): in_feats = layer(in_feats) else: in_feats = layer(g, in_feats) g.ndata["h"] = in_feats return dgl.mean_nodes(g, "h")
[docs] def fit(self, train_dataloader, learning_rate=0.01, num_epochs=20): """ Train the model. Args: train_dataloader: Data loader, such as `SubsetRandomSampler` learning_rate (float, optional): Learning rate passed to the optimization. Defaults to 0.01. num_epochs (int, optional): Number of epochs of training. Defaults to 20. """ self.train() optimizer = optim.Adam(self.parameters(), lr=learning_rate) for _ in range(num_epochs): for batched_graph, labels in train_dataloader: pred = self(batched_graph, batched_graph.ndata["x"].float()) loss = nn.functional.cross_entropy(pred, labels) optimizer.zero_grad() loss.backward() optimizer.step()
[docs] def evaluate(self, test_dataloader): """ Evaluate model. Args: test_dataloader: Data loader, such as `SubsetRandomSampler`. Returns: accuracy: Accuracy """ self.eval() num_correct = 0 num_tests = 0 for batched_graph, labels in test_dataloader: pred = self(batched_graph, batched_graph.ndata["x"].float()) num_correct += (pred.argmax(1) == labels).sum().item() num_tests += len(labels) assert pred.dim() == self.num_classes accuracy = num_correct / num_tests return accuracy
# pylint: disable=too-many-arguments
[docs] def fit_and_evaluate( self, train_dataloader, test_dataloader, callback=None, learning_rate=0.01, num_epochs=20, sch_explr_gamma=0.99, sch_multisteplr_milestones=None, sch_multisteplr_gamma=0.1, ): """Fit the model while evaluating every iteraction. Args: train_dataloader (RandomWSubsetSampler): Data loader to train set. test_dataloader (RandomWSubsetSampler): Data loader to test set. callback (callable, optional): Callback function. If defined, should receive a dict that stores "Loss/train", "Accuracy/train", "Loss/test", "Accuracy/test", and "epoch" of a single epoch. To send a stop signal, return True. Defaults to None. learning_rate (float, optional): Learning rate. Defaults to 0.01. num_epochs (int, optional): Number of training epochs. Defaults to 20. sch_explr_gamma (float): The exponential decay rate of the learning rate. sch_multisteplr_milestones (list): epoch numbers where the learning rate is decreased by a factor of sch_multisteplr_gamma. If None this is done at epoch 100 sch_multisteplr_gamma (float): If a stepped decay of the learning rate is taken, the multiplication factor """ if sch_multisteplr_milestones is None: sch_multisteplr_milestones = [min(num_epochs, 100)] progress_bar = tqdm(total=num_epochs) optimizer = optim.Adam(self.parameters(), lr=learning_rate) schedulers = [ optim.lr_scheduler.ExponentialLR(optimizer, gamma=sch_explr_gamma), optim.lr_scheduler.MultiStepLR( optimizer, milestones=sch_multisteplr_milestones, gamma=sch_multisteplr_gamma, ), ] epoch_values = {} for epoch in range(num_epochs): epoch_values["epoch"] = epoch train_loss = 0.0 num_correct = 0 num_total = 0 num_false_positive = 0 num_false_negative = 0 self.train() for batched_graph, labels in train_dataloader: pred = self(batched_graph, batched_graph.ndata["x"].float()) loss = nn.functional.cross_entropy(pred, labels) train_loss += loss.item() num_correct += (pred.argmax(1) == labels).sum().item() num_total += len(labels) if self.num_classes == 2: num_false_positive += ( ((pred.argmax(1) != labels) & (pred.argmax(1) == 1)) .sum() .item() ) num_false_negative += ( ((pred.argmax(1) != labels) & (pred.argmax(1) == 0)) .sum() .item() ) optimizer.zero_grad() loss.backward() optimizer.step() epoch_values["Loss/train"] = train_loss epoch_values["Accuracy/train"] = num_correct / num_total if self.num_classes == 2: epoch_values["FalseNegativeRate/train"] = num_false_negative / num_total epoch_values["FalsePositiveRate/train"] = num_false_positive / num_total test_loss = 0.0 num_correct = 0 num_total = 0 num_false_positive = 0 num_false_negative = 0 self.eval() for batched_graph, labels in test_dataloader: pred = self(batched_graph, batched_graph.ndata["x"].float()) test_loss += nn.functional.cross_entropy(pred, labels).item() num_correct += (pred.argmax(1) == labels).sum().item() num_total += len(labels) if self.num_classes == 2: num_false_positive += ( ((pred.argmax(1) != labels) & (pred.argmax(1) == 1)) .sum() .item() ) num_false_negative += ( ((pred.argmax(1) != labels) & (pred.argmax(1) == 0)) .sum() .item() ) epoch_values["Loss/test"] = test_loss epoch_values["Accuracy/test"] = num_correct / num_total for i, pg in enumerate(optimizer.param_groups): epoch_values[f"LearningRate/ParGrp{i}"] = pg["lr"] # to visualise distribution of tensors for i, layer in enumerate(self.layers): if not isinstance(layer, (nn.ReLU, nn.LeakyReLU, nn.ELU)): epoch_values[f"Layer/conv{i}"] = layer.weight.detach() if self.num_classes == 2: epoch_values["FalseNegativeRate/test"] = num_false_negative / num_total epoch_values["FalsePositiveRate/test"] = num_false_positive / num_total progress_bar.set_postfix({"Epoch": epoch}) progress_bar.update(1) for scheduler in schedulers: scheduler.step() if callback is not None: user_request_stop = callback(epoch_values) if user_request_stop is True: # Check for explicit True break
[docs] def infer(self, dataset, batch_size=1024): """ Using the model do inference on a dataset. Args: dataset: A `RadarDataSet` where for each graph inference needs to be done. Returns: labels: A numpy array with infered labels for each graph """ self.eval() dataloader = GraphDataLoader( shuffle=False, dataset=dataset, batch_size=batch_size, drop_last=False ) labels = np.array([]) for batched_graph, _ in dataloader: pred = ( self(batched_graph, batched_graph.ndata["x"].float()).argmax(1).numpy() ) labels = np.concatenate([labels, pred]) return labels