julien.blanchon
add app
c8c12e9
"""Benchmark all the algorithms in the repo."""
# Copyright (C) 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
import functools
import io
import logging
import math
import multiprocessing
import sys
import time
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, List, Union, cast
import torch
from omegaconf import DictConfig, ListConfig, OmegaConf
from pytorch_lightning import Trainer, seed_everything
from utils import convert_to_openvino, upload_to_wandb, write_metrics
from anomalib.config import get_configurable_parameters, update_input_size_config
from anomalib.data import get_datamodule
from anomalib.models import get_model
from anomalib.utils.loggers import configure_logger
from anomalib.utils.sweep import (
get_meta_data,
get_openvino_throughput,
get_run_config,
get_sweep_callbacks,
get_torch_throughput,
set_in_nested_config,
)
warnings.filterwarnings("ignore")
logger = logging.getLogger(__name__)
configure_logger()
pl_logger = logging.getLogger(__file__)
for logger_name in ["pytorch_lightning", "torchmetrics", "os"]:
logging.getLogger(logger_name).setLevel(logging.ERROR)
def hide_output(func):
"""Decorator to hide output of the function.
Args:
func (function): Hides output of this function.
Raises:
Exception: Incase the execution of function fails, it raises an exception.
Returns:
object of the called function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
std_out = sys.stdout
sys.stdout = buf = io.StringIO()
try:
value = func(*args, **kwargs)
except Exception as exp:
raise Exception(buf.getvalue()) from exp
sys.stdout = std_out
return value
return wrapper
@hide_output
def get_single_model_metrics(model_config: Union[DictConfig, ListConfig], openvino_metrics: bool = False) -> Dict:
"""Collects metrics for `model_name` and returns a dict of results.
Args:
model_config (DictConfig, ListConfig): Configuration for run
openvino_metrics (bool): If True, converts the model to OpenVINO format and gathers inference metrics.
Returns:
Dict: Collection of all the metrics such as time taken, throughput and performance scores.
"""
with TemporaryDirectory() as project_path:
model_config.project.path = project_path
datamodule = get_datamodule(model_config)
model = get_model(model_config)
callbacks = get_sweep_callbacks()
trainer = Trainer(**model_config.trainer, logger=None, callbacks=callbacks)
start_time = time.time()
trainer.fit(model=model, datamodule=datamodule)
# get start time
training_time = time.time() - start_time
# Creating new variable is faster according to https://stackoverflow.com/a/4330829
start_time = time.time()
# get test results
test_results = trainer.test(model=model, datamodule=datamodule)
# get testing time
testing_time = time.time() - start_time
meta_data = get_meta_data(model, model_config.model.input_size)
throughput = get_torch_throughput(model_config, model, datamodule.test_dataloader().dataset, meta_data)
# Get OpenVINO metrics
openvino_throughput = float("nan")
if openvino_metrics:
# Create dirs for openvino model export
openvino_export_path = project_path / Path("exported_models")
openvino_export_path.mkdir(parents=True, exist_ok=True)
convert_to_openvino(model, openvino_export_path, model_config.model.input_size)
openvino_throughput = get_openvino_throughput(
model_config, openvino_export_path, datamodule.test_dataloader().dataset, meta_data
)
# arrange the data
data = {
"Training Time (s)": training_time,
"Testing Time (s)": testing_time,
"Inference Throughput (fps)": throughput,
"OpenVINO Inference Throughput (fps)": openvino_throughput,
}
for key, val in test_results[0].items():
data[key] = float(val)
return data
def compute_on_cpu():
"""Compute all run configurations over a sigle CPU."""
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
for run_config in get_run_config(sweep_config.grid_search):
model_metrics = sweep(run_config, 0, sweep_config.seed, False)
write_metrics(model_metrics, sweep_config.writer)
def compute_on_gpu(
run_configs: Union[DictConfig, ListConfig],
device: int,
seed: int,
writers: List[str],
compute_openvino: bool = False,
):
"""Go over each run config and collect the result.
Args:
run_configs (Union[DictConfig, ListConfig]): List of run configurations.
device (int): The GPU id used for running the sweep.
seed (int): Fix a seed.
writers (List[str]): Destinations to write to.
compute_openvino (bool, optional): Compute OpenVINO throughput. Defaults to False.
"""
for run_config in run_configs:
if isinstance(run_config, (DictConfig, ListConfig)):
model_metrics = sweep(run_config, device, seed, compute_openvino)
write_metrics(model_metrics, writers)
else:
raise ValueError(
f"Expecting `run_config` of type DictConfig or ListConfig. Got {type(run_config)} instead."
)
def distribute_over_gpus():
"""Distribute metric collection over all available GPUs. This is done by splitting the list of configurations."""
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
with ProcessPoolExecutor(
max_workers=torch.cuda.device_count(), mp_context=multiprocessing.get_context("spawn")
) as executor:
run_configs = list(get_run_config(sweep_config.grid_search))
jobs = []
for device_id, run_split in enumerate(
range(0, len(run_configs), math.ceil(len(run_configs) / torch.cuda.device_count()))
):
jobs.append(
executor.submit(
compute_on_gpu,
run_configs[run_split : run_split + math.ceil(len(run_configs) / torch.cuda.device_count())],
device_id + 1,
sweep_config.seed,
sweep_config.writer,
sweep_config.compute_openvino,
)
)
for job in jobs:
try:
job.result()
except Exception as exc:
raise Exception(f"Error occurred while computing benchmark on device {job}") from exc
def distribute():
"""Run all cpu experiments on a single process. Distribute gpu experiments over all available gpus.
Args:
device_count (int, optional): If device count is 0, uses only cpu else spawn processes according
to number of gpus available on the machine. Defaults to 0.
"""
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
devices = sweep_config.hardware
if not torch.cuda.is_available() and "gpu" in devices:
pl_logger.warning("Config requested GPU benchmarking but torch could not detect any cuda enabled devices")
elif {"cpu", "gpu"}.issubset(devices):
# Create process for gpu and cpu
with ProcessPoolExecutor(max_workers=2, mp_context=multiprocessing.get_context("spawn")) as executor:
jobs = [executor.submit(compute_on_cpu), executor.submit(distribute_over_gpus)]
for job in as_completed(jobs):
try:
job.result()
except Exception as exception:
raise Exception(f"Error occurred while computing benchmark on device {job}") from exception
elif "cpu" in devices:
compute_on_cpu()
elif "gpu" in devices:
distribute_over_gpus()
if "wandb" in sweep_config.writer:
upload_to_wandb(team="anomalib")
def sweep(
run_config: Union[DictConfig, ListConfig], device: int = 0, seed: int = 42, convert_openvino: bool = False
) -> Dict[str, Union[float, str]]:
"""Go over all the values mentioned in `grid_search` parameter of the benchmarking config.
Args:
run_config: (Union[DictConfig, ListConfig], optional): Configuration for current run.
device (int, optional): Name of the device on which the model is trained. Defaults to 0 "cpu".
convert_openvino (bool, optional): Whether to convert the model to openvino format. Defaults to False.
Returns:
Dict[str, Union[float, str]]: Dictionary containing the metrics gathered from the sweep.
"""
seed_everything(seed, workers=True)
# This assumes that `model_name` is always present in the sweep config.
model_config = get_configurable_parameters(model_name=run_config.model_name)
model_config.project.seed = seed
model_config = cast(DictConfig, model_config) # placate mypy
for param in run_config.keys():
# grid search keys are always assumed to be strings
param = cast(str, param) # placate mypy
set_in_nested_config(model_config, param.split("."), run_config[param]) # type: ignore
# convert image size to tuple in case it was updated by run config
model_config = update_input_size_config(model_config)
# Set device in config. 0 - cpu, [0], [1].. - gpu id
model_config.trainer.gpus = 0 if device == 0 else [device - 1]
if run_config.model_name in ["patchcore", "cflow"]:
convert_openvino = False # `torch.cdist` is not supported by onnx version 11
# TODO Remove this line when issue #40 is fixed https://github.com/openvinotoolkit/anomalib/issues/40
if model_config.model.input_size != (224, 224):
return {} # go to next run
# Run benchmarking for current config
model_metrics = get_single_model_metrics(model_config=model_config, openvino_metrics=convert_openvino)
output = f"One sweep run complete for model {model_config.model.name}"
output += f" On category {model_config.dataset.category}" if model_config.dataset.category is not None else ""
output += str(model_metrics)
logger.info(output)
# Append configuration of current run to the collected metrics
for key, value in run_config.items():
# Skip adding model name to the dataframe
if key != "model_name":
model_metrics[key] = value
# Add device name to list
model_metrics["device"] = "gpu" if device > 0 else "cpu"
model_metrics["model_name"] = run_config.model_name
return model_metrics
if __name__ == "__main__":
# Benchmarking entry point.
# Spawn multiple processes one for cpu and rest for the number of gpus available in the system.
# The idea is to distribute metrics collection over all the available devices.
logger.info("Benchmarking started πŸƒβ€β™‚οΈ. This will take a while ⏲ depending on your configuration.")
distribute()
logger.info("Finished gathering results ⚑")