"""PyTorch model for DFM model implementation.""" # Copyright (C) 2020 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions # and limitations under the License. import math import torch import torch.nn.functional as F import torchvision from torch import Tensor, nn from anomalib.models.components import PCA, DynamicBufferModule, FeatureExtractor class SingleClassGaussian(DynamicBufferModule): """Model Gaussian distribution over a set of points.""" def __init__(self): super().__init__() self.register_buffer("mean_vec", Tensor()) self.register_buffer("u_mat", Tensor()) self.register_buffer("sigma_mat", Tensor()) self.mean_vec: Tensor self.u_mat: Tensor self.sigma_mat: Tensor def fit(self, dataset: Tensor) -> None: """Fit a Gaussian model to dataset X. Covariance matrix is not calculated directly using: ``C = X.X^T`` Instead, it is represented in terms of the Singular Value Decomposition of X: ``X = U.S.V^T`` Hence, ``C = U.S^2.U^T`` This simplifies the calculation of the log-likelihood without requiring full matrix inversion. Args: dataset (Tensor): Input dataset to fit the model. """ num_samples = dataset.shape[1] self.mean_vec = torch.mean(dataset, dim=1) data_centered = (dataset - self.mean_vec.reshape(-1, 1)) / math.sqrt(num_samples) self.u_mat, self.sigma_mat, _ = torch.linalg.svd(data_centered, full_matrices=False) def score_samples(self, features: Tensor) -> Tensor: """Compute the NLL (negative log likelihood) scores. Args: features (Tensor): semantic features on which density modeling is performed. Returns: nll (Tensor): Torch tensor of scores """ features_transformed = torch.matmul(features - self.mean_vec, self.u_mat / self.sigma_mat) nll = torch.sum(features_transformed * features_transformed, dim=1) + 2 * torch.sum(torch.log(self.sigma_mat)) return nll def forward(self, dataset: Tensor) -> None: """Provides the same functionality as `fit`. Transforms the input dataset based on singular values calculated earlier. Args: dataset (Tensor): Input dataset """ self.fit(dataset) class DFMModel(nn.Module): """Model for the DFM algorithm. Args: backbone (str): Pre-trained model backbone. layer (str): Layer from which to extract features. pool (int): _description_ n_comps (float, optional): Ratio from which number of components for PCA are calculated. Defaults to 0.97. score_type (str, optional): Scoring type. Options are `fre` and `nll`. Defaults to "fre". """ def __init__( self, backbone: str, layer: str, pooling_kernel_size: int, n_comps: float = 0.97, score_type: str = "fre" ): super().__init__() self.backbone = getattr(torchvision.models, backbone) self.pooling_kernel_size = pooling_kernel_size self.n_components = n_comps self.pca_model = PCA(n_components=self.n_components) self.gaussian_model = SingleClassGaussian() self.score_type = score_type self.feature_extractor = FeatureExtractor(backbone=self.backbone(pretrained=True), layers=[layer]).eval() def fit(self, dataset: Tensor) -> None: """Fit a pca transformation and a Gaussian model to dataset. Args: dataset (Tensor): Input dataset to fit the model. """ self.pca_model.fit(dataset) features_reduced = self.pca_model.transform(dataset) self.gaussian_model.fit(features_reduced.T) def score(self, features: Tensor) -> Tensor: """Compute scores. Scores are either PCA-based feature reconstruction error (FRE) scores or the Gaussian density-based NLL scores Args: features (torch.Tensor): semantic features on which PCA and density modeling is performed. Returns: score (Tensor): numpy array of scores """ feats_projected = self.pca_model.transform(features) if self.score_type == "nll": score = self.gaussian_model.score_samples(feats_projected) elif self.score_type == "fre": feats_reconstructed = self.pca_model.inverse_transform(feats_projected) score = torch.sum(torch.square(features - feats_reconstructed), dim=1) else: raise ValueError(f"unsupported score type: {self.score_type}") return score def get_features(self, batch: Tensor) -> Tensor: """Extract features from the pretrained network. Args: batch (Tensor): Image batch. Returns: Tensor: Tensor containing extracted features. """ self.feature_extractor.eval() features = self.feature_extractor(batch) for layer in features: batch_size = len(features[layer]) if self.pooling_kernel_size > 1: features[layer] = F.avg_pool2d(input=features[layer], kernel_size=self.pooling_kernel_size) features[layer] = features[layer].view(batch_size, -1) features = torch.cat(list(features.values())).detach() return features def forward(self, batch: Tensor) -> Tensor: """Computer score from input images. Args: batch (Tensor): Input images Returns: Tensor: Scores """ feature_vector = self.get_features(batch) return self.score(feature_vector.view(feature_vector.shape[:2]))