Spaces:
Build error
Build error
# Copyright (c) 2018-2022 Samet Akcay, Durham University, UK | |
# SPDX-License-Identifier: MIT | |
# | |
# Copyright (C) 2020-2022 Intel Corporation | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
"""Torch models defining encoder, decoder, Generator and Discriminator. | |
Code adapted from https://github.com/samet-akcay/ganomaly. | |
""" | |
import math | |
from typing import Tuple | |
import torch | |
from torch import Tensor, nn | |
from anomalib.data.utils.image import pad_nextpow2 | |
class Encoder(nn.Module): | |
"""Encoder Network. | |
Args: | |
input_size (Tuple[int, int]): Size of input image | |
latent_vec_size (int): Size of latent vector z | |
num_input_channels (int): Number of input channels in the image | |
n_features (int): Number of features per convolution layer | |
extra_layers (int): Number of extra layers since the network uses only a single encoder layer by default. | |
Defaults to 0. | |
""" | |
def __init__( | |
self, | |
input_size: Tuple[int, int], | |
latent_vec_size: int, | |
num_input_channels: int, | |
n_features: int, | |
extra_layers: int = 0, | |
add_final_conv_layer: bool = True, | |
): | |
super().__init__() | |
self.input_layers = nn.Sequential() | |
self.input_layers.add_module( | |
f"initial-conv-{num_input_channels}-{n_features}", | |
nn.Conv2d(num_input_channels, n_features, kernel_size=4, stride=2, padding=4, bias=False), | |
) | |
self.input_layers.add_module(f"initial-relu-{n_features}", nn.LeakyReLU(0.2, inplace=True)) | |
# Extra Layers | |
self.extra_layers = nn.Sequential() | |
for layer in range(extra_layers): | |
self.extra_layers.add_module( | |
f"extra-layers-{layer}-{n_features}-conv", | |
nn.Conv2d(n_features, n_features, kernel_size=3, stride=1, padding=1, bias=False), | |
) | |
self.extra_layers.add_module(f"extra-layers-{layer}-{n_features}-batchnorm", nn.BatchNorm2d(n_features)) | |
self.extra_layers.add_module(f"extra-layers-{layer}-{n_features}-relu", nn.LeakyReLU(0.2, inplace=True)) | |
# Create pyramid features to reach latent vector | |
self.pyramid_features = nn.Sequential() | |
pyramid_dim = min(*input_size) // 2 # Use the smaller dimension to create pyramid. | |
while pyramid_dim > 4: | |
in_features = n_features | |
out_features = n_features * 2 | |
self.pyramid_features.add_module( | |
f"pyramid-{in_features}-{out_features}-conv", | |
nn.Conv2d(in_features, out_features, kernel_size=4, stride=2, padding=1, bias=False), | |
) | |
self.pyramid_features.add_module(f"pyramid-{out_features}-batchnorm", nn.BatchNorm2d(out_features)) | |
self.pyramid_features.add_module(f"pyramid-{out_features}-relu", nn.LeakyReLU(0.2, inplace=True)) | |
n_features = out_features | |
pyramid_dim = pyramid_dim // 2 | |
# Final conv | |
if add_final_conv_layer: | |
self.final_conv_layer = nn.Conv2d( | |
n_features, | |
latent_vec_size, | |
kernel_size=4, | |
stride=1, | |
padding=0, | |
bias=False, | |
) | |
def forward(self, input_tensor: Tensor): | |
"""Return latent vectors.""" | |
output = self.input_layers(input_tensor) | |
output = self.extra_layers(output) | |
output = self.pyramid_features(output) | |
if self.final_conv_layer is not None: | |
output = self.final_conv_layer(output) | |
return output | |
class Decoder(nn.Module): | |
"""Decoder Network. | |
Args: | |
input_size (Tuple[int, int]): Size of input image | |
latent_vec_size (int): Size of latent vector z | |
num_input_channels (int): Number of input channels in the image | |
n_features (int): Number of features per convolution layer | |
extra_layers (int): Number of extra layers since the network uses only a single encoder layer by default. | |
Defaults to 0. | |
""" | |
def __init__( | |
self, | |
input_size: Tuple[int, int], | |
latent_vec_size: int, | |
num_input_channels: int, | |
n_features: int, | |
extra_layers: int = 0, | |
): | |
super().__init__() | |
self.latent_input = nn.Sequential() | |
# Calculate input channel size to recreate inverse pyramid | |
exp_factor = math.ceil(math.log(min(input_size) // 2, 2)) - 2 | |
n_input_features = n_features * (2**exp_factor) | |
# CNN layer for latent vector input | |
self.latent_input.add_module( | |
f"initial-{latent_vec_size}-{n_input_features}-convt", | |
nn.ConvTranspose2d( | |
latent_vec_size, | |
n_input_features, | |
kernel_size=4, | |
stride=1, | |
padding=0, | |
bias=False, | |
), | |
) | |
self.latent_input.add_module(f"initial-{n_input_features}-batchnorm", nn.BatchNorm2d(n_input_features)) | |
self.latent_input.add_module(f"initial-{n_input_features}-relu", nn.ReLU(True)) | |
# Create inverse pyramid | |
self.inverse_pyramid = nn.Sequential() | |
pyramid_dim = min(*input_size) // 2 # Use the smaller dimension to create pyramid. | |
while pyramid_dim > 4: | |
in_features = n_input_features | |
out_features = n_input_features // 2 | |
self.inverse_pyramid.add_module( | |
f"pyramid-{in_features}-{out_features}-convt", | |
nn.ConvTranspose2d( | |
in_features, | |
out_features, | |
kernel_size=4, | |
stride=2, | |
padding=1, | |
bias=False, | |
), | |
) | |
self.inverse_pyramid.add_module(f"pyramid-{out_features}-batchnorm", nn.BatchNorm2d(out_features)) | |
self.inverse_pyramid.add_module(f"pyramid-{out_features}-relu", nn.ReLU(True)) | |
n_input_features = out_features | |
pyramid_dim = pyramid_dim // 2 | |
# Extra Layers | |
self.extra_layers = nn.Sequential() | |
for layer in range(extra_layers): | |
self.extra_layers.add_module( | |
f"extra-layers-{layer}-{n_input_features}-conv", | |
nn.Conv2d(n_input_features, n_input_features, kernel_size=3, stride=1, padding=1, bias=False), | |
) | |
self.extra_layers.add_module( | |
f"extra-layers-{layer}-{n_input_features}-batchnorm", nn.BatchNorm2d(n_input_features) | |
) | |
self.extra_layers.add_module( | |
f"extra-layers-{layer}-{n_input_features}-relu", nn.LeakyReLU(0.2, inplace=True) | |
) | |
# Final layers | |
self.final_layers = nn.Sequential() | |
self.final_layers.add_module( | |
f"final-{n_input_features}-{num_input_channels}-convt", | |
nn.ConvTranspose2d( | |
n_input_features, | |
num_input_channels, | |
kernel_size=4, | |
stride=2, | |
padding=1, | |
bias=False, | |
), | |
) | |
self.final_layers.add_module(f"final-{num_input_channels}-tanh", nn.Tanh()) | |
def forward(self, input_tensor): | |
"""Return generated image.""" | |
output = self.latent_input(input_tensor) | |
output = self.inverse_pyramid(output) | |
output = self.extra_layers(output) | |
output = self.final_layers(output) | |
return output | |
class Discriminator(nn.Module): | |
"""Discriminator. | |
Made of only one encoder layer which takes x and x_hat to produce a score. | |
Args: | |
input_size (Tuple[int,int]): Input image size. | |
num_input_channels (int): Number of image channels. | |
n_features (int): Number of feature maps in each convolution layer. | |
extra_layers (int, optional): Add extra intermediate layers. Defaults to 0. | |
""" | |
def __init__(self, input_size: Tuple[int, int], num_input_channels: int, n_features: int, extra_layers: int = 0): | |
super().__init__() | |
encoder = Encoder(input_size, 1, num_input_channels, n_features, extra_layers) | |
layers = [] | |
for block in encoder.children(): | |
if isinstance(block, nn.Sequential): | |
layers.extend(list(block.children())) | |
else: | |
layers.append(block) | |
self.features = nn.Sequential(*layers[:-1]) | |
self.classifier = nn.Sequential(layers[-1]) | |
self.classifier.add_module("Sigmoid", nn.Sigmoid()) | |
def forward(self, input_tensor): | |
"""Return class of object and features.""" | |
features = self.features(input_tensor) | |
classifier = self.classifier(features) | |
classifier = classifier.view(-1, 1).squeeze(1) | |
return classifier, features | |
class Generator(nn.Module): | |
"""Generator model. | |
Made of an encoder-decoder-encoder architecture. | |
Args: | |
input_size (Tuple[int,int]): Size of input data. | |
latent_vec_size (int): Dimension of latent vector produced between the first encoder-decoder. | |
num_input_channels (int): Number of channels in input image. | |
n_features (int): Number of feature maps in each convolution layer. | |
extra_layers (int, optional): Extra intermediate layers in the encoder/decoder. Defaults to 0. | |
add_final_conv_layer (bool, optional): Add a final convolution layer in the decoder. Defaults to True. | |
""" | |
def __init__( | |
self, | |
input_size: Tuple[int, int], | |
latent_vec_size: int, | |
num_input_channels: int, | |
n_features: int, | |
extra_layers: int = 0, | |
add_final_conv_layer: bool = True, | |
): | |
super().__init__() | |
self.encoder1 = Encoder( | |
input_size, latent_vec_size, num_input_channels, n_features, extra_layers, add_final_conv_layer | |
) | |
self.decoder = Decoder(input_size, latent_vec_size, num_input_channels, n_features, extra_layers) | |
self.encoder2 = Encoder( | |
input_size, latent_vec_size, num_input_channels, n_features, extra_layers, add_final_conv_layer | |
) | |
def forward(self, input_tensor): | |
"""Return generated image and the latent vectors.""" | |
latent_i = self.encoder1(input_tensor) | |
gen_image = self.decoder(latent_i) | |
latent_o = self.encoder2(gen_image) | |
return gen_image, latent_i, latent_o | |
class GanomalyModel(nn.Module): | |
"""Ganomaly Model. | |
Args: | |
input_size (Tuple[int,int]): Input dimension. | |
num_input_channels (int): Number of input channels. | |
n_features (int): Number of features layers in the CNNs. | |
latent_vec_size (int): Size of autoencoder latent vector. | |
extra_layers (int, optional): Number of extra layers for encoder/decoder. Defaults to 0. | |
add_final_conv_layer (bool, optional): Add convolution layer at the end. Defaults to True. | |
wadv (int, optional): Weight for adversarial loss. Defaults to 1. | |
wcon (int, optional): Image regeneration weight. Defaults to 50. | |
wenc (int, optional): Latent vector encoder weight. Defaults to 1. | |
""" | |
def __init__( | |
self, | |
input_size: Tuple[int, int], | |
num_input_channels: int, | |
n_features: int, | |
latent_vec_size: int, | |
extra_layers: int = 0, | |
add_final_conv_layer: bool = True, | |
wadv: int = 1, | |
wcon: int = 50, | |
wenc: int = 1, | |
) -> None: | |
super().__init__() | |
self.generator: Generator = Generator( | |
input_size=input_size, | |
latent_vec_size=latent_vec_size, | |
num_input_channels=num_input_channels, | |
n_features=n_features, | |
extra_layers=extra_layers, | |
add_final_conv_layer=add_final_conv_layer, | |
) | |
self.discriminator: Discriminator = Discriminator( | |
input_size=input_size, | |
num_input_channels=num_input_channels, | |
n_features=n_features, | |
extra_layers=extra_layers, | |
) | |
self.weights_init(self.generator) | |
self.weights_init(self.discriminator) | |
self.loss_enc = nn.SmoothL1Loss() | |
self.loss_adv = nn.MSELoss() | |
self.loss_con = nn.L1Loss() | |
self.loss_bce = nn.BCELoss() | |
self.wadv = wadv | |
self.wcon = wcon | |
self.wenc = wenc | |
def weights_init(module: nn.Module): | |
"""Initialize DCGAN weights. | |
Args: | |
module (nn.Module): [description] | |
""" | |
classname = module.__class__.__name__ | |
if classname.find("Conv") != -1: | |
nn.init.normal_(module.weight.data, 0.0, 0.02) | |
elif classname.find("BatchNorm") != -1: | |
nn.init.normal_(module.weight.data, 1.0, 0.02) | |
nn.init.constant_(module.bias.data, 0) | |
def get_discriminator_loss(self, images: Tensor) -> Tensor: | |
"""Calculates loss for discriminator. | |
Args: | |
images (Tensor): Input images. | |
Returns: | |
Tensor: Discriminator loss. | |
""" | |
fake, _, _ = self.generator(images) | |
pred_real, _ = self.discriminator(images) | |
pred_fake, _ = self.discriminator(fake.detach()) | |
error_discriminator_real = self.loss_bce( | |
pred_real, torch.ones(size=pred_real.shape, dtype=torch.float32, device=pred_real.device) | |
) | |
error_discriminator_fake = self.loss_bce( | |
pred_fake, torch.zeros(size=pred_fake.shape, dtype=torch.float32, device=pred_fake.device) | |
) | |
loss_discriminator = (error_discriminator_fake + error_discriminator_real) * 0.5 | |
return loss_discriminator | |
def get_generator_loss(self, images: Tensor) -> Tensor: | |
"""Calculates loss for generator. | |
Args: | |
images (Tensor): Input images. | |
Returns: | |
Tensor: Generator loss. | |
""" | |
fake, latent_i, latent_o = self.generator(images) | |
pred_real, _ = self.discriminator(images) | |
pred_fake, _ = self.discriminator(fake) | |
error_enc = self.loss_enc(latent_i, latent_o) | |
error_con = self.loss_con(images, fake) | |
error_adv = self.loss_adv(pred_real, pred_fake) | |
loss_generator = error_adv * self.wadv + error_con * self.wcon + error_enc * self.wenc | |
return loss_generator | |
def forward(self, batch: Tensor) -> Tensor: | |
"""Get scores for batch. | |
Args: | |
batch (Tensor): Images | |
Returns: | |
Tensor: Regeneration scores. | |
""" | |
padded_batch = pad_nextpow2(batch) | |
self.generator.eval() | |
_, latent_i, latent_o = self.generator(padded_batch) | |
return torch.mean(torch.pow((latent_i - latent_o), 2), dim=1).view(-1) # convert nx1x1 to n | |