File size: 11,759 Bytes
c8c12e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"""Benchmark all the algorithms in the repo."""

# Copyright (C) 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.


import functools
import io
import logging
import math
import multiprocessing
import sys
import time
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, List, Union, cast

import torch
from omegaconf import DictConfig, ListConfig, OmegaConf
from pytorch_lightning import Trainer, seed_everything
from utils import convert_to_openvino, upload_to_wandb, write_metrics

from anomalib.config import get_configurable_parameters, update_input_size_config
from anomalib.data import get_datamodule
from anomalib.models import get_model
from anomalib.utils.loggers import configure_logger
from anomalib.utils.sweep import (
    get_meta_data,
    get_openvino_throughput,
    get_run_config,
    get_sweep_callbacks,
    get_torch_throughput,
    set_in_nested_config,
)

warnings.filterwarnings("ignore")

logger = logging.getLogger(__name__)
configure_logger()
pl_logger = logging.getLogger(__file__)
for logger_name in ["pytorch_lightning", "torchmetrics", "os"]:
    logging.getLogger(logger_name).setLevel(logging.ERROR)


def hide_output(func):
    """Decorator to hide output of the function.

    Args:
        func (function): Hides output of this function.

    Raises:
        Exception: Incase the execution of function fails, it raises an exception.

    Returns:
        object of the called function
    """

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        std_out = sys.stdout
        sys.stdout = buf = io.StringIO()
        try:
            value = func(*args, **kwargs)
        except Exception as exp:
            raise Exception(buf.getvalue()) from exp
        sys.stdout = std_out
        return value

    return wrapper


@hide_output
def get_single_model_metrics(model_config: Union[DictConfig, ListConfig], openvino_metrics: bool = False) -> Dict:
    """Collects metrics for `model_name` and returns a dict of results.

    Args:
        model_config (DictConfig, ListConfig): Configuration for run
        openvino_metrics (bool): If True, converts the model to OpenVINO format and gathers inference metrics.

    Returns:
        Dict: Collection of all the metrics such as time taken, throughput and performance scores.
    """

    with TemporaryDirectory() as project_path:
        model_config.project.path = project_path
        datamodule = get_datamodule(model_config)
        model = get_model(model_config)

        callbacks = get_sweep_callbacks()

        trainer = Trainer(**model_config.trainer, logger=None, callbacks=callbacks)

        start_time = time.time()

        trainer.fit(model=model, datamodule=datamodule)

        # get start time
        training_time = time.time() - start_time

        # Creating new variable is faster according to https://stackoverflow.com/a/4330829
        start_time = time.time()
        # get test results
        test_results = trainer.test(model=model, datamodule=datamodule)

        # get testing time
        testing_time = time.time() - start_time

        meta_data = get_meta_data(model, model_config.model.input_size)

        throughput = get_torch_throughput(model_config, model, datamodule.test_dataloader().dataset, meta_data)

        # Get OpenVINO metrics
        openvino_throughput = float("nan")
        if openvino_metrics:
            # Create dirs for openvino model export
            openvino_export_path = project_path / Path("exported_models")
            openvino_export_path.mkdir(parents=True, exist_ok=True)
            convert_to_openvino(model, openvino_export_path, model_config.model.input_size)
            openvino_throughput = get_openvino_throughput(
                model_config, openvino_export_path, datamodule.test_dataloader().dataset, meta_data
            )

        # arrange the data
        data = {
            "Training Time (s)": training_time,
            "Testing Time (s)": testing_time,
            "Inference Throughput (fps)": throughput,
            "OpenVINO Inference Throughput (fps)": openvino_throughput,
        }
        for key, val in test_results[0].items():
            data[key] = float(val)

    return data


def compute_on_cpu():
    """Compute all run configurations over a sigle CPU."""
    sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
    for run_config in get_run_config(sweep_config.grid_search):
        model_metrics = sweep(run_config, 0, sweep_config.seed, False)
        write_metrics(model_metrics, sweep_config.writer)


def compute_on_gpu(
    run_configs: Union[DictConfig, ListConfig],
    device: int,
    seed: int,
    writers: List[str],
    compute_openvino: bool = False,
):
    """Go over each run config and collect the result.

    Args:
        run_configs (Union[DictConfig, ListConfig]): List of run configurations.
        device (int): The GPU id used for running the sweep.
        seed (int): Fix a seed.
        writers (List[str]): Destinations to write to.
        compute_openvino (bool, optional): Compute OpenVINO throughput. Defaults to False.
    """
    for run_config in run_configs:
        if isinstance(run_config, (DictConfig, ListConfig)):
            model_metrics = sweep(run_config, device, seed, compute_openvino)
            write_metrics(model_metrics, writers)
        else:
            raise ValueError(
                f"Expecting `run_config` of type DictConfig or ListConfig. Got {type(run_config)} instead."
            )


def distribute_over_gpus():
    """Distribute metric collection over all available GPUs. This is done by splitting the list of configurations."""
    sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
    with ProcessPoolExecutor(
        max_workers=torch.cuda.device_count(), mp_context=multiprocessing.get_context("spawn")
    ) as executor:
        run_configs = list(get_run_config(sweep_config.grid_search))
        jobs = []
        for device_id, run_split in enumerate(
            range(0, len(run_configs), math.ceil(len(run_configs) / torch.cuda.device_count()))
        ):
            jobs.append(
                executor.submit(
                    compute_on_gpu,
                    run_configs[run_split : run_split + math.ceil(len(run_configs) / torch.cuda.device_count())],
                    device_id + 1,
                    sweep_config.seed,
                    sweep_config.writer,
                    sweep_config.compute_openvino,
                )
            )
        for job in jobs:
            try:
                job.result()
            except Exception as exc:
                raise Exception(f"Error occurred while computing benchmark on device {job}") from exc


def distribute():
    """Run all cpu experiments on a single process. Distribute gpu experiments over all available gpus.

    Args:
        device_count (int, optional): If device count is 0, uses only cpu else spawn processes according
        to number of gpus available on the machine. Defaults to 0.
    """
    sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
    devices = sweep_config.hardware
    if not torch.cuda.is_available() and "gpu" in devices:
        pl_logger.warning("Config requested GPU benchmarking but torch could not detect any cuda enabled devices")
    elif {"cpu", "gpu"}.issubset(devices):
        # Create process for gpu and cpu
        with ProcessPoolExecutor(max_workers=2, mp_context=multiprocessing.get_context("spawn")) as executor:
            jobs = [executor.submit(compute_on_cpu), executor.submit(distribute_over_gpus)]
            for job in as_completed(jobs):
                try:
                    job.result()
                except Exception as exception:
                    raise Exception(f"Error occurred while computing benchmark on device {job}") from exception
    elif "cpu" in devices:
        compute_on_cpu()
    elif "gpu" in devices:
        distribute_over_gpus()
    if "wandb" in sweep_config.writer:
        upload_to_wandb(team="anomalib")


def sweep(
    run_config: Union[DictConfig, ListConfig], device: int = 0, seed: int = 42, convert_openvino: bool = False
) -> Dict[str, Union[float, str]]:
    """Go over all the values mentioned in `grid_search` parameter of the benchmarking config.

    Args:
        run_config: (Union[DictConfig, ListConfig], optional): Configuration for current run.
        device (int, optional): Name of the device on which the model is trained. Defaults to 0 "cpu".
        convert_openvino (bool, optional): Whether to convert the model to openvino format. Defaults to False.

    Returns:
        Dict[str, Union[float, str]]: Dictionary containing the metrics gathered from the sweep.
    """
    seed_everything(seed, workers=True)
    # This assumes that `model_name` is always present in the sweep config.
    model_config = get_configurable_parameters(model_name=run_config.model_name)
    model_config.project.seed = seed

    model_config = cast(DictConfig, model_config)  # placate mypy
    for param in run_config.keys():
        # grid search keys are always assumed to be strings
        param = cast(str, param)  # placate mypy
        set_in_nested_config(model_config, param.split("."), run_config[param])  # type: ignore

    # convert image size to tuple in case it was updated by run config
    model_config = update_input_size_config(model_config)

    # Set device in config. 0 - cpu, [0], [1].. - gpu id
    model_config.trainer.gpus = 0 if device == 0 else [device - 1]

    if run_config.model_name in ["patchcore", "cflow"]:
        convert_openvino = False  # `torch.cdist` is not supported by onnx version 11
        # TODO Remove this line when issue #40 is fixed https://github.com/openvinotoolkit/anomalib/issues/40
        if model_config.model.input_size != (224, 224):
            return {}  # go to next run

    # Run benchmarking for current config
    model_metrics = get_single_model_metrics(model_config=model_config, openvino_metrics=convert_openvino)
    output = f"One sweep run complete for model {model_config.model.name}"
    output += f" On category {model_config.dataset.category}" if model_config.dataset.category is not None else ""
    output += str(model_metrics)
    logger.info(output)

    # Append configuration of current run to the collected metrics
    for key, value in run_config.items():
        # Skip adding model name to the dataframe
        if key != "model_name":
            model_metrics[key] = value

    # Add device name to list
    model_metrics["device"] = "gpu" if device > 0 else "cpu"
    model_metrics["model_name"] = run_config.model_name

    return model_metrics


if __name__ == "__main__":
    # Benchmarking entry point.
    # Spawn multiple processes one for cpu and rest for the number of gpus available in the system.
    # The idea is to distribute metrics collection over all the available devices.

    logger.info("Benchmarking started πŸƒβ€β™‚οΈ. This will take a while ⏲ depending on your configuration.")
    distribute()
    logger.info("Finished gathering results ⚑")