julien.blanchon
add app
c8c12e9
"""PyTorch model for DFM model implementation."""
# Copyright (C) 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
import math
import torch
import torch.nn.functional as F
import torchvision
from torch import Tensor, nn
from anomalib.models.components import PCA, DynamicBufferModule, FeatureExtractor
class SingleClassGaussian(DynamicBufferModule):
"""Model Gaussian distribution over a set of points."""
def __init__(self):
super().__init__()
self.register_buffer("mean_vec", Tensor())
self.register_buffer("u_mat", Tensor())
self.register_buffer("sigma_mat", Tensor())
self.mean_vec: Tensor
self.u_mat: Tensor
self.sigma_mat: Tensor
def fit(self, dataset: Tensor) -> None:
"""Fit a Gaussian model to dataset X.
Covariance matrix is not calculated directly using:
``C = X.X^T``
Instead, it is represented in terms of the Singular Value Decomposition of X:
``X = U.S.V^T``
Hence,
``C = U.S^2.U^T``
This simplifies the calculation of the log-likelihood without requiring full matrix inversion.
Args:
dataset (Tensor): Input dataset to fit the model.
"""
num_samples = dataset.shape[1]
self.mean_vec = torch.mean(dataset, dim=1)
data_centered = (dataset - self.mean_vec.reshape(-1, 1)) / math.sqrt(num_samples)
self.u_mat, self.sigma_mat, _ = torch.linalg.svd(data_centered, full_matrices=False)
def score_samples(self, features: Tensor) -> Tensor:
"""Compute the NLL (negative log likelihood) scores.
Args:
features (Tensor): semantic features on which density modeling is performed.
Returns:
nll (Tensor): Torch tensor of scores
"""
features_transformed = torch.matmul(features - self.mean_vec, self.u_mat / self.sigma_mat)
nll = torch.sum(features_transformed * features_transformed, dim=1) + 2 * torch.sum(torch.log(self.sigma_mat))
return nll
def forward(self, dataset: Tensor) -> None:
"""Provides the same functionality as `fit`.
Transforms the input dataset based on singular values calculated earlier.
Args:
dataset (Tensor): Input dataset
"""
self.fit(dataset)
class DFMModel(nn.Module):
"""Model for the DFM algorithm.
Args:
backbone (str): Pre-trained model backbone.
layer (str): Layer from which to extract features.
pool (int): _description_
n_comps (float, optional): Ratio from which number of components for PCA are calculated. Defaults to 0.97.
score_type (str, optional): Scoring type. Options are `fre` and `nll`. Defaults to "fre".
"""
def __init__(
self, backbone: str, layer: str, pooling_kernel_size: int, n_comps: float = 0.97, score_type: str = "fre"
):
super().__init__()
self.backbone = getattr(torchvision.models, backbone)
self.pooling_kernel_size = pooling_kernel_size
self.n_components = n_comps
self.pca_model = PCA(n_components=self.n_components)
self.gaussian_model = SingleClassGaussian()
self.score_type = score_type
self.feature_extractor = FeatureExtractor(backbone=self.backbone(pretrained=True), layers=[layer]).eval()
def fit(self, dataset: Tensor) -> None:
"""Fit a pca transformation and a Gaussian model to dataset.
Args:
dataset (Tensor): Input dataset to fit the model.
"""
self.pca_model.fit(dataset)
features_reduced = self.pca_model.transform(dataset)
self.gaussian_model.fit(features_reduced.T)
def score(self, features: Tensor) -> Tensor:
"""Compute scores.
Scores are either PCA-based feature reconstruction error (FRE) scores or
the Gaussian density-based NLL scores
Args:
features (torch.Tensor): semantic features on which PCA and density modeling is performed.
Returns:
score (Tensor): numpy array of scores
"""
feats_projected = self.pca_model.transform(features)
if self.score_type == "nll":
score = self.gaussian_model.score_samples(feats_projected)
elif self.score_type == "fre":
feats_reconstructed = self.pca_model.inverse_transform(feats_projected)
score = torch.sum(torch.square(features - feats_reconstructed), dim=1)
else:
raise ValueError(f"unsupported score type: {self.score_type}")
return score
def get_features(self, batch: Tensor) -> Tensor:
"""Extract features from the pretrained network.
Args:
batch (Tensor): Image batch.
Returns:
Tensor: Tensor containing extracted features.
"""
self.feature_extractor.eval()
features = self.feature_extractor(batch)
for layer in features:
batch_size = len(features[layer])
if self.pooling_kernel_size > 1:
features[layer] = F.avg_pool2d(input=features[layer], kernel_size=self.pooling_kernel_size)
features[layer] = features[layer].view(batch_size, -1)
features = torch.cat(list(features.values())).detach()
return features
def forward(self, batch: Tensor) -> Tensor:
"""Computer score from input images.
Args:
batch (Tensor): Input images
Returns:
Tensor: Scores
"""
feature_vector = self.get_features(batch)
return self.score(feature_vector.view(feature_vector.shape[:2]))