File size: 18,751 Bytes
a01ef8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

import pytest
import numpy

from unittest import mock
from unittest.mock import ANY, MagicMock, patch
from sklearn import decomposition

from tlt.models import model_factory
from tlt.utils.types import FrameworkType, UseCaseType

try:
    from tlt.models.image_anomaly_detection.pytorch_image_anomaly_detection_model import extract_features, pca, get_feature_extraction_model  # noqa: E501
except ModuleNotFoundError:
    print("WARNING: Unable to import PytorchImageAnomolyDetectionModel. Pytorch may not be installed")

# This is necessary to protect from import errors when testing in a pytorch only environment
# True when imports are successful, False when imports are unsuccessful
torch_env = True

try:
    # Do torch specific imports in a try/except to prevent pytest test loading from failing when running in a TF env
    import torch
    import torch.nn as nn
except ModuleNotFoundError:
    print("WARNING: Unable to import torch. Torch may not be installed")
    torch_env = False

try:
    # Do torch specific imports in a try/except to prevent pytest test loading from failing when running in a TF env
    from tlt.models.image_classification.torchvision_image_classification_model import TorchvisionImageClassificationModel  # noqa: E501
    from tlt.datasets.image_classification.torchvision_image_classification_dataset import TorchvisionImageClassificationDataset   # noqa: E501
    from tlt.datasets.image_classification.pytorch_custom_image_classification_dataset import \
        PyTorchCustomImageClassificationDataset  # noqa: E501
    from tlt.models.text_classification.pytorch_hf_text_classification_model import PyTorchHFTextClassificationModel  # noqa: E501
except ModuleNotFoundError:
    print("WARNING: Unable to import TorchvisionImageClassificationModel and TorchvisionImageClassificationDataset. "
          "Torch may not be installed")

try:
    from tlt.models.image_anomaly_detection.torchvision_image_anomaly_detection_model import \
        TorchvisionImageAnomalyDetectionModel
except ModuleNotFoundError:
    print("WARNING: Unable to import TorchvisionImageAnomalyDetectionModel and "
          "PyTorchCustomImageAnomalyDetectionDataset. Torch may not be installed")

try:
    from tlt.datasets.text_classification.hf_text_classification_dataset import HFTextClassificationDataset  # noqa: F401, E501
except ModuleNotFoundError:
    print("WARNING: Unable to import HFTextClassificationDataset. Hugging Face's `transformers` API may not \
           be installed in the current env")


@pytest.mark.pytorch
def test_torchvision_efficientnet_b0():
    """
    Checks that an efficientnet_b0 model can be downloaded from TFHub
    """
    model = model_factory.get_model('efficientnet_b0', 'pytorch')
    assert type(model) == TorchvisionImageClassificationModel
    assert model.model_name == 'efficientnet_b0'


@pytest.mark.pytorch
def test_get_supported_models():
    """
    Call get supported models and checks to make sure the dictionary has keys for each use case,
    and checks for a known supported model.
    """
    model_dict = model_factory.get_supported_models()

    # Ensure there are keys for each use case
    for k in UseCaseType:
        assert str(k) in model_dict.keys()

    # Check for a known model
    assert 'efficientnet_b0' in model_dict[str(UseCaseType.IMAGE_CLASSIFICATION)]
    assert 'resnet50' in model_dict[str(UseCaseType.IMAGE_ANOMALY_DETECTION)]
    efficientnet_b0 = model_dict[str(UseCaseType.IMAGE_CLASSIFICATION)]['efficientnet_b0']
    assert str(FrameworkType.PYTORCH) in efficientnet_b0
    assert 'torchvision' == efficientnet_b0[str(FrameworkType.PYTORCH)]['model_hub']


@pytest.mark.pytorch
@pytest.mark.parametrize('framework,use_case',
                         [['tensorflow', None],
                          ['pytorch', None],
                          [None, 'image_classification'],
                          [None, 'question_answering'],
                          ['tensorflow', 'image_classification'],
                          ['pytorch', 'text_classification'],
                          ['pytorch', 'question_answering'],
                          ['pytorch', 'image_anomaly_detection']])
def test_get_supported_models_with_filter(framework, use_case):
    """
    Tests getting the dictionary of supported models while filtering by framework and/or use case.
    Checks to ensure that keys for the expected use cases are there. If filtering by framework, then the test will
    also check to make sure we only have models for the specified framework.
    """
    model_dict = model_factory.get_supported_models(framework, use_case)

    if use_case is not None:
        # Model dictionary should only have a key for the specified use case
        assert 1 == len(model_dict.keys())
        assert use_case in model_dict
    else:
        # Model dictionary should have keys for every use case
        assert len(UseCaseType) == len(model_dict.keys())
        for k in UseCaseType:
            assert str(k) in model_dict.keys()

    # If filtering by framework, we should not find models from other frameworks
    if framework is not None:
        for use_case_key in model_dict.keys():
            for model_name_key in model_dict[use_case_key].keys():
                assert 1 == len(model_dict[use_case_key][model_name_key].keys())
                assert framework in model_dict[use_case_key][model_name_key]


@pytest.mark.pytorch
@pytest.mark.parametrize('bad_framework',
                         ['tensorflowers',
                          'python',
                          'torch',
                          'fantastic-potato'])
def test_get_supported_models_bad_framework(bad_framework):
    """
    Ensure that the proper error is raised when a bad framework is passed in
    """
    with pytest.raises(ValueError) as e:
        model_factory.get_supported_models(bad_framework)
        assert "Unsupported framework: {}".format(bad_framework) in str(e)


@pytest.mark.pytorch
@pytest.mark.parametrize('bad_use_case',
                         ['tensorflow',
                          'imageclassification',
                          'python',
                          'fantastic-potato'])
def test_get_supported_models_bad_use_case(bad_use_case):
    """
    Ensure that the proper error is raised when a bad use case is passed in
    """
    with pytest.raises(ValueError) as e:
        model_factory.get_supported_models(use_case=bad_use_case)
        assert "Unsupported use case: {}".format(bad_use_case) in str(e)


@pytest.mark.pytorch
def test_torchvision_efficientnet_b0_train():
    """
    Tests calling train on a torchvision efficientnet_b0 model with a mock dataset, model, and optimizer
    """
    model = model_factory.get_model('efficientnet_b0', 'pytorch')
    model._generate_checkpoints = False

    with patch('tlt.datasets.image_classification.torchvision_image_classification_dataset.TorchvisionImageClassificationDataset') as mock_dataset:  # noqa: E501
        with patch('tlt.models.image_classification.torchvision_image_classification_model.'
                   'TorchvisionImageClassificationModel._get_hub_model') as mock_get_hub_model:
            mock_dataset.train_subset = [1, 2, 3]
            mock_dataset.validation_subset = [4, 5, 6]
            mock_dataset.__class__ = TorchvisionImageClassificationDataset
            mock_model = MagicMock()
            mock_optimizer = MagicMock()
            expected_return_value_model = mock_model
            expected_return_value_history_val = {'Acc': [0.0], 'Loss': [0.0], 'Val Acc': [0.0], 'Val Loss': [0.0]}
            expected_return_value_history_no_val = {'Acc': [0.0], 'Loss': [0.0]}

            def mock_to(device):
                assert device == torch.device("cpu")
                return expected_return_value_model

            def mock_train():
                return None

            mock_model.to = mock_to
            mock_model.train = mock_train
            mock_get_hub_model.return_value = (mock_model, mock_optimizer)

            # Train and eval (eval should be called)
            return_val = model.train(mock_dataset, output_dir="/tmp/output/pytorch", do_eval=True, lr_decay=False)
            assert return_val == expected_return_value_history_val
            mock_model.eval.assert_called_once()

            # Train without eval (eval should not be called)
            mock_model.eval.reset_mock()
            return_val = model.train(mock_dataset, output_dir="/tmp/output/pytorch", do_eval=False, lr_decay=False)
            assert return_val == expected_return_value_history_no_val
            mock_model.eval.assert_not_called()

            # Try to train with eval, but no validation subset (eval should not be called)
            mock_dataset.validation_subset = None
            mock_model.eval.reset_mock()
            return_val = model.train(mock_dataset, output_dir="/tmp/output/pytorch", do_eval=True, lr_decay=False)
            assert return_val == expected_return_value_history_no_val
            mock_model.eval.assert_not_called()


@pytest.mark.pytorch
def test_bert_train():
    model = model_factory.get_model('distilbert-base-uncased', 'pytorch')
    assert type(model) == PyTorchHFTextClassificationModel
    with patch('tlt.datasets.text_classification.hf_text_classification_dataset.HFTextClassificationDataset') as mock_dataset:  # noqa: E501
        mock_dataset.__class__ = HFTextClassificationDataset
        mock_dataset.train_subset = ['1', '2', '3']
        mock_dataset.validation_subset = ['4', '5', '6']
        expected_return_value_history_no_val = {'Acc': [0.0], 'Loss': [0.0]}
        expected_return_value_history_val = {'Acc': [0.0], 'Loss': [0.0], 'Val Acc': [0.0], 'Val Loss': [0.0]}

        # Scenario 1: Call train without validation
        return_val = model.train(mock_dataset, output_dir="/tmp/output/pytorch", do_eval=False, lr_decay=False)
        assert return_val['Acc'] == expected_return_value_history_no_val['Acc']
        assert return_val['Loss'] == expected_return_value_history_no_val['Loss']
        assert 'train_runtime' in return_val
        assert 'train_samples_per_second' in return_val
        assert 'Val Acc' not in return_val
        assert 'Val Loss' not in return_val

        # Scenario 2: Call train with validation
        mock_dataset.validation_loader.__class__ = HFTextClassificationDataset
        return_val = model.train(mock_dataset, output_dir="/tmp/output/pytorch", do_eval=True, lr_decay=False)
        assert return_val['Acc'] == expected_return_value_history_val['Acc']
        assert return_val['Loss'] == expected_return_value_history_val['Loss']
        assert return_val['Val Acc'] == expected_return_value_history_val['Val Acc']
        assert return_val['Val Loss'] == expected_return_value_history_val['Val Loss']
        assert 'train_runtime' in return_val
        assert 'train_samples_per_second' in return_val


@pytest.mark.pytorch
def test_resnet50_anomaly_extract_pca():
    model = model_factory.get_model(model_name="resnet50", framework="pytorch", use_case="anomaly_detection")
    assert type(model) == TorchvisionImageAnomalyDetectionModel

    # Call extract_features and PCA on 5 randomly generated images
    data = torch.rand(5, 3, 225, 225)  # NCHW
    resnet_model = get_feature_extraction_model(model._model, 'layer3')
    features = extract_features(resnet_model, data, layer_name='layer3', pooling=['avg', 2])
    assert isinstance(features, torch.Tensor)
    assert len(features) == 5

    data_mats_orig = torch.empty((features.shape[1], len(data))).to('cpu')

    # Skip the rest of the test if the tensor contains any NaNs, due to flaky behavior
    if not numpy.isnan(data_mats_orig).any():
        with torch.no_grad():
            components = pca(data_mats_orig, 0.97)
        assert type(components) == decomposition._pca.PCA
        assert components.n_components == 0.97


# This is necessary to protect from import errors when testing in a pytorch only environment
if torch_env:
    @pytest.mark.pytorch
    @pytest.mark.parametrize('model_name,use_case,dataset_type,optimizer,loss',
                             [['efficientnet_b0', 'image_classification', PyTorchCustomImageClassificationDataset,
                               torch.optim.Adam, torch.nn.L1Loss],
                              ['resnet18', 'image_classification', PyTorchCustomImageClassificationDataset,
                               torch.optim.AdamW, torch.nn.MSELoss],
                              ['custom', 'image_classification', PyTorchCustomImageClassificationDataset,
                               torch.optim.SGD, torch.nn.L1Loss],
                              ['distilbert-base-uncased', 'text_classification', HFTextClassificationDataset,
                               torch.optim.Adam, torch.nn.MSELoss]])
    def test_pytorch_optimizer_loss(model_name, use_case, dataset_type, optimizer, loss):
        """
        Tests initializing and training a model with configurable optimizers and loss functions
        """

        # Define a model
        class Net(nn.Module):
            def __init__(self):
                super().__init__()
                self.conv1 = nn.Conv2d(3, 6, 5)
                self.pool = nn.MaxPool2d(2, 2)
                self.conv2 = nn.Conv2d(6, 16, 5)
                self.fc1 = nn.Linear(16 * 5 * 5, 120)
                self.fc2 = nn.Linear(120, 84)
                self.fc3 = nn.Linear(84, 3)

            def forward(self, x):
                x = self.pool(nn.functional.relu(self.conv1(x)))
                x = self.pool(nn.functional.relu(self.conv2(x)))
                x = torch.flatten(x, 1)
                x = nn.functional.relu(self.fc1(x))
                x = nn.functional.relu(self.fc2(x))
                x = self.fc3(x)
                return x

        net = Net()

        if model_name == 'custom':
            model = model_factory.load_model(model_name, net, 'pytorch', use_case, optimizer=optimizer, loss=loss)
        else:
            model = model_factory.get_model(model_name, 'pytorch', optimizer=optimizer, loss=loss)

        model._generate_checkpoints = False
        model._fit = MagicMock()
        assert model._optimizer_class == optimizer
        assert model._loss_class == loss
        assert type(model._loss) == loss

        mock_dataset = MagicMock()
        mock_dataset.__class__ = dataset_type
        mock_dataset.class_names = ['a', 'b', 'c']
        mock_dataset.train_subset = [1, 2, 3]
        mock_dataset.validation_subset = [4, 5, 6]

        # Train is called and optimizer and loss objects should match the input types
        model.train(mock_dataset, output_dir="/tmp/output/pytorch")
        assert model._optimizer_class == optimizer
        assert type(model._optimizer) == optimizer
        assert model._loss_class == loss
        assert type(model._loss) == loss


# This is necessary to protect from import errors when testing in a pytorch only environment
if torch_env:
    @pytest.mark.pytorch
    @pytest.mark.parametrize('model_name,optimizer',
                             [['efficientnet_b0', 1],
                              ['resnet18', 'foo'],
                              ['distilbert-base-uncased', torch.nn.MSELoss]])
    def test_pytorch_optimizer_wrong_type(model_name, optimizer):
        """
        Tests that an exception is thrown when the input optimizer is the wrong type
        """
        with pytest.raises(TypeError):
            model_factory.get_model(model_name, 'pytorch', optimizer=optimizer)


@pytest.mark.pytorch
@patch('tlt.models.text_classification.pytorch_hf_text_classification_model.torch.optim.AdamW')
@patch('tlt.models.text_classification.pytorch_hf_text_classification_model.Trainer')
@patch('tlt.models.text_classification.pytorch_hf_text_classification_model.ModelDownloader')
def test_pytorch_hf_text_classification_trainer_return_values(mock_downloader, mock_trainer, mock_optimizer):
    """
    Tests the PyTorch Text Classification model with the Hugging Face Trainer to verify that the value returned
    by Trainer.train() is returned by the model.train() method
    """

    model = model_factory.get_model(model_name='bert-base-cased', framework='pytorch')

    mock_dataset = MagicMock()
    mock_dataset.__class__ = HFTextClassificationDataset
    mock_dataset.class_names = ['a', 'b', 'c']
    mock_dataset.train_subset = [1, 2, 3]
    mock_dataset.validation_subset = [4, 5, 6]

    expected_value = "a"

    mock_trainer().train.return_value = expected_value

    return_val = model.train(mock_dataset, output_dir="/tmp", use_trainer=True, seed=10)
    assert mock_trainer().train.call_count == 1

    assert return_val == expected_value


@pytest.mark.pytorch
@patch('tlt.models.text_classification.pytorch_hf_text_classification_model.torch.optim.AdamW')
@patch('tlt.models.text_classification.pytorch_hf_text_classification_model.Trainer')
@patch('tlt.models.text_classification.pytorch_hf_text_classification_model.ModelDownloader')
def test_pytorch_hf_text_classification_trainer_without_val_subset(mock_downloader, mock_trainer, mock_optimizer):
    """
    Tests the PyTorch Text Classification model with the Hugging Face Trainer is able to run evaluation with a test
    subset when a validation subset does not exist.
    """

    model = model_factory.get_model(model_name='bert-base-cased', framework='pytorch')

    mock_dataset = MagicMock()
    mock_dataset.__class__ = HFTextClassificationDataset
    mock_dataset.class_names = ['a', 'b', 'c']
    mock_dataset.train_subset = [1, 2, 3]
    mock_dataset.test_subset = [4, 5, 6]
    type(mock_dataset).validation_subset = mock.PropertyMock(side_effect=ValueError)

    with pytest.raises(ValueError):
        mock_dataset.validation_subset

    model.train(mock_dataset, output_dir="/tmp", use_trainer=True, seed=10)
    mock_trainer.assert_called_with(model=model._model, args=ANY, train_dataset=[1, 2, 3], eval_dataset=[4, 5, 6],
                                    compute_metrics=ANY, tokenizer=ANY)