File size: 8,241 Bytes
c8c12e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
"""Test Dataset."""

import os

import numpy as np
import pytest

from anomalib.config import update_input_size_config
from anomalib.data import (
    BTechDataModule,
    FolderDataModule,
    MVTecDataModule,
    get_datamodule,
)
from anomalib.pre_processing.transforms import Denormalize, ToNumpy
from tests.helpers.config import get_test_configurable_parameters
from tests.helpers.dataset import TestDataset, get_dataset_path


@pytest.fixture(autouse=True)
def mvtec_data_module():
    datamodule = MVTecDataModule(
        root=get_dataset_path(dataset="MVTec"),
        category="leather",
        image_size=(256, 256),
        train_batch_size=1,
        test_batch_size=1,
        num_workers=0,
    )
    datamodule.prepare_data()
    datamodule.setup()

    return datamodule


@pytest.fixture(autouse=True)
def btech_data_module():
    """Create BTech Data Module."""
    datamodule = BTechDataModule(
        root=get_dataset_path(dataset="BTech"),
        category="01",
        image_size=(256, 256),
        train_batch_size=1,
        test_batch_size=1,
        num_workers=0,
    )
    datamodule.prepare_data()
    datamodule.setup()

    return datamodule


@pytest.fixture(autouse=True)
def folder_data_module():
    """Create Folder Data Module."""
    root = get_dataset_path(dataset="bottle")
    datamodule = FolderDataModule(
        root=root,
        normal_dir="good",
        abnormal_dir="broken_large",
        mask_dir=os.path.join(root, "ground_truth/broken_large"),
        task="segmentation",
        split_ratio=0.2,
        seed=0,
        image_size=(256, 256),
        train_batch_size=32,
        test_batch_size=32,
        num_workers=8,
        create_validation_set=True,
    )
    datamodule.setup()

    return datamodule


@pytest.fixture(autouse=True)
def data_sample(mvtec_data_module):
    _, data = next(enumerate(mvtec_data_module.train_dataloader()))
    return data


class TestMVTecDataModule:
    """Test MVTec AD Data Module."""

    def test_batch_size(self, mvtec_data_module):
        """test_mvtec_datamodule [summary]"""
        _, train_data_sample = next(enumerate(mvtec_data_module.train_dataloader()))
        _, val_data_sample = next(enumerate(mvtec_data_module.val_dataloader()))
        assert train_data_sample["image"].shape[0] == 1
        assert val_data_sample["image"].shape[0] == 1

    def test_val_and_test_dataloaders_has_mask_and_gt(self, mvtec_data_module):
        """Test Validation and Test dataloaders should return filenames, image, mask and label."""
        _, val_data = next(enumerate(mvtec_data_module.val_dataloader()))
        _, test_data = next(enumerate(mvtec_data_module.test_dataloader()))

        assert sorted(["image_path", "mask_path", "image", "label", "mask"]) == sorted(val_data.keys())
        assert sorted(["image_path", "mask_path", "image", "label", "mask"]) == sorted(test_data.keys())


class TestBTechDataModule:
    """Test BTech Data Module."""

    def test_batch_size(self, btech_data_module):
        """Test batch size."""
        _, train_data_sample = next(enumerate(btech_data_module.train_dataloader()))
        _, val_data_sample = next(enumerate(btech_data_module.val_dataloader()))
        assert train_data_sample["image"].shape[0] == 1
        assert val_data_sample["image"].shape[0] == 1

    def test_val_and_test_dataloaders_has_mask_and_gt(self, btech_data_module):
        """Test Validation and Test dataloaders should return filenames, image, mask and label."""
        _, val_data = next(enumerate(btech_data_module.val_dataloader()))
        _, test_data = next(enumerate(btech_data_module.test_dataloader()))

        assert sorted(["image_path", "mask_path", "image", "label", "mask"]) == sorted(val_data.keys())
        assert sorted(["image_path", "mask_path", "image", "label", "mask"]) == sorted(test_data.keys())


class TestFolderDataModule:
    """Test Folder Data Module."""

    def test_batch_size(self, folder_data_module):
        """Test batch size."""
        _, train_data_sample = next(enumerate(folder_data_module.train_dataloader()))
        _, val_data_sample = next(enumerate(folder_data_module.val_dataloader()))
        assert train_data_sample["image"].shape[0] == 16
        assert val_data_sample["image"].shape[0] == 12

    def test_val_and_test_dataloaders_has_mask_and_gt(self, folder_data_module):
        """Test Validation and Test dataloaders should return filenames, image, mask and label."""
        _, val_data = next(enumerate(folder_data_module.val_dataloader()))
        _, test_data = next(enumerate(folder_data_module.test_dataloader()))

        assert sorted(["image_path", "mask_path", "image", "label", "mask"]) == sorted(val_data.keys())
        assert sorted(["image_path", "mask_path", "image", "label", "mask"]) == sorted(test_data.keys())


class TestDenormalize:
    """Test Denormalize Util."""

    def test_denormalize_image_pixel_values(self, data_sample):
        """Test Denormalize denormalizes tensor into [0, 256] range."""
        denormalized_sample = Denormalize().__call__(data_sample["image"].squeeze())
        assert denormalized_sample.min() >= 0 and denormalized_sample.max() <= 256

    def test_denormalize_return_numpy(self, data_sample):
        """Denormalize should return a numpy array."""
        denormalized_sample = Denormalize()(data_sample["image"].squeeze())
        assert isinstance(denormalized_sample, np.ndarray)

    def test_denormalize_channel_order(self, data_sample):
        """Denormalize should return a numpy array of order [HxWxC]"""
        denormalized_sample = Denormalize().__call__(data_sample["image"].squeeze())
        assert len(denormalized_sample.shape) == 3 and denormalized_sample.shape[-1] == 3

    def test_representation(self):
        """Test Denormalize representation should return string
        Denormalize()"""
        assert str(Denormalize()) == "Denormalize()"


class TestToNumpy:
    """Test ToNumpy whether it properly converts tensor into numpy array."""

    def test_to_numpy_image_pixel_values(self, data_sample):
        """Test ToNumpy should return an array whose pixels in the range of [0,
        256]"""
        array = ToNumpy()(data_sample["image"])
        assert array.min() >= 0 and array.max() <= 256

    def test_to_numpy_converts_tensor_to_np_array(self, data_sample):
        """ToNumpy returns a numpy array."""
        array = ToNumpy()(data_sample["image"])
        assert isinstance(array, np.ndarray)

    def test_to_numpy_channel_order(self, data_sample):
        """ToNumpy() should return a numpy array of order [HxWxC]"""
        array = ToNumpy()(data_sample["image"])
        assert len(array.shape) == 3 and array.shape[-1] == 3

    def test_one_channel_images(self, data_sample):
        """One channel tensor should be converted to HxW np array."""
        data = data_sample["image"][:, 0, :, :].unsqueeze(0)
        array = ToNumpy()(data)
        assert len(array.shape) == 2

    def test_representation(self):
        """Test ToNumpy() representation should return string `ToNumpy()`"""
        assert str(ToNumpy()) == "ToNumpy()"


class TestConfigToDataModule:
    """Tests that check if the dataset parameters in the config achieve the desired effect."""

    @pytest.mark.parametrize(
        ["input_size", "effective_image_size"],
        [
            (512, (512, 512)),
            ((245, 276), (245, 276)),
            ((263, 134), (263, 134)),
            ((267, 267), (267, 267)),
        ],
    )
    @TestDataset(num_train=20, num_test=10)
    def test_image_size(self, input_size, effective_image_size, category="shapes", path=None):
        """Test if the image size parameter works as expected."""
        configurable_parameters = get_test_configurable_parameters(dataset_path=path, model_name="stfpm")
        configurable_parameters.dataset.category = category
        configurable_parameters.dataset.image_size = input_size
        configurable_parameters = update_input_size_config(configurable_parameters)

        data_module = get_datamodule(configurable_parameters)
        data_module.setup()
        assert iter(data_module.train_dataloader()).__next__()["image"].shape[-2:] == effective_image_size