#!/usr/bin/env python3 """ Utility Script containing functions to be used for training Author: Shilpaj Bhalerao """ # Standard Library Imports import math from typing import NoReturn # Third-Party Imports import numpy as np import matplotlib.pyplot as plt import torch from torchsummary import summary from torchvision import transforms from pytorch_grad_cam import GradCAM from pytorch_grad_cam.utils.image import show_cam_on_image import torch.optim as optim import torch.nn.functional as F from torch_lr_finder import LRFinder def get_summary(model, input_size: tuple) -> NoReturn: """ Function to get the summary of the model architecture :param model: Object of model architecture class :param input_size: Input data shape (Channels, Height, Width) """ use_cuda = torch.cuda.is_available() device = torch.device("cuda" if use_cuda else "cpu") network = model.to(device) summary(network, input_size=input_size) def get_misclassified_data(model, device, test_loader): """ Function to run the model on test set and return misclassified images :param model: Network Architecture :param device: CPU/GPU :param test_loader: DataLoader for test set """ # Prepare the model for evaluation i.e. drop the dropout layer model.eval() model.to(device) # List to store misclassified Images misclassified_data = [] # Reset the gradients with torch.no_grad(): # Extract images, labels in a batch for data, target in test_loader: # Migrate the data to the device data, target = data.to(device), target.to(device) # Extract single image, label from the batch for image, label in zip(data, target): # Add batch dimension to the image image = image.unsqueeze(0) # Get the model prediction on the image output = model(image) # Convert the output from one-hot encoding to a value pred = output.argmax(dim=1, keepdim=True) # If prediction is incorrect, append the data if pred != label: misclassified_data.append((image, label, pred)) return misclassified_data # -------------------- GradCam -------------------- def display_gradcam_output(data: list, classes: list[str], inv_normalize: transforms.Normalize, model, target_layers, targets=None, number_of_samples: int = 10, transparency: float = 0.60): """ Function to visualize GradCam output on the data :param data: List[Tuple(image, label)] :param classes: Name of classes in the dataset :param inv_normalize: Mean and Standard deviation values of the dataset :param model: Model architecture :param target_layers: Layers on which GradCam should be executed :param targets: Classes to be focused on for GradCam :param number_of_samples: Number of images to print :param transparency: Weight of Normal image when mixed with activations """ # Plot configuration fig = plt.figure(figsize=(10, 10)) x_count = 5 y_count = 1 if number_of_samples <= 5 else math.floor(number_of_samples / x_count) # Create an object for GradCam cam = GradCAM(model=model, target_layers=target_layers, use_cuda=True) # Iterate over number of specified images for i in range(number_of_samples): plt.subplot(y_count, x_count, i + 1) input_tensor = data[i][0] # Get the activations of the layer for the images grayscale_cam = cam(input_tensor=input_tensor, targets=targets) grayscale_cam = grayscale_cam[0, :] # Get back the original image img = input_tensor.squeeze(0).to('cpu') img = inv_normalize(img) rgb_img = np.transpose(img, (1, 2, 0)) rgb_img = rgb_img.numpy() # Mix the activations on the original image visualization = show_cam_on_image(rgb_img, grayscale_cam, use_rgb=True, image_weight=transparency) # Display the images on the plot plt.imshow(visualization) plt.title(r"Correct: " + classes[data[i][1].item()] + '\n' + 'Output: ' + classes[data[i][2].item()]) plt.xticks([]) plt.yticks([]) def get_optimizer(model, lr, momentum=0, weight_decay=0, optimizer_type='SGD'): """Method to get object of stochastic gradient descent. Used to update weights. Args: model (Object): Neural Network model lr (float): Value of learning rate momentum (float): Value of momentum weight_decay (float): Value of weight decay optimizer_type (str): Type of optimizer SGD or ADAM Returns: object: Object of optimizer class to update weights """ if optimizer_type == 'SGD': optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum) elif optimizer_type == 'ADAM': optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) return optimizer def get_StepLR_scheduler(optimizer, step_size, gamma): """Method to get object of scheduler class. Used to update learning rate Args: optimizer (Object): Object of optimizer step_size (int): Period of learning rate decay gamma (float): Number to multiply with learning rate Returns: object: Object of StepLR class to update learning rate """ scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma, verbose=True) return scheduler def get_ReduceLROnPlateau_scheduler(optimizer, factor, patience): """Method to get object of scheduler class. Used to update learning rate Args: optimizer (Object): Object of optimizer factor (float): Number to multiply with learning rate patience (int): Number of epoch to wait Returns: object: Object of StepLR class to update learning rate """ scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=factor, patience=patience, verbose=True) return scheduler def get_OneCycleLR_scheduler(optimizer, max_lr, epochs, steps_per_epoch, max_at_epoch, anneal_strategy, div_factor, final_div_factor): """Method to get object of scheduler class. Used to update learning rate Args: optimizer (Object): Object of optimizer max_lr (float): Maximum learning rate to reach during training epochs (float): Total number of epoch steps_per_epoch (int): Total steps in an epoch max_at_epoch (int): Epoch to reach maximum learning rate anneal_strategy (string): Strategy to interpolate between minimum and maximum lr div_factor (int): Divisive factor to calculate intial learning rate final_div_factor (int): Divisive factor to calculate minimum learning rate Returns: object: Object of StepLR class to update learning rate """ scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=max_lr, epochs=epochs, steps_per_epoch=steps_per_epoch, pct_start=max_at_epoch/epochs, anneal_strategy=anneal_strategy, div_factor=div_factor, final_div_factor=final_div_factor) return scheduler def get_criterion(loss_type='cross_entropy'): """Method to get loss calculation ctiterion Args: loss_type (str): Type of loss 'nll_loss' or 'cross_entropy' loss Returns: object: Object to calculate loss """ if loss_type == 'nll_loss': criterion = F.nll_loss elif loss_type == 'cross_entropy': criterion = F.cross_entropy return criterion def get_learning_rate(model, optimizer, criterion, trainloader): """Method to find learning rate using LR finder. Args: model (Object): Object of model optimizer (Object): Object of optimizer class criterion (Object): Loss function trainloader (Object): Object of dataloader class Returns: float: Learning rate suggested by lr finder """ # Create object and perform range test lr_finder = LRFinder(model, optimizer, criterion) lr_finder.range_test(trainloader, end_lr=100, num_iter=100) # Plot result and store suggested lr plot, suggested_lr = lr_finder.plot() # Reset model and optimizer lr_finder.reset() return suggested_lr