Spaces:
Build error
Build error
"""Benchmark all the algorithms in the repo.""" | |
# Copyright (C) 2020 Intel Corporation | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions | |
# and limitations under the License. | |
import functools | |
import io | |
import logging | |
import math | |
import multiprocessing | |
import sys | |
import time | |
import warnings | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
from pathlib import Path | |
from tempfile import TemporaryDirectory | |
from typing import Dict, List, Union, cast | |
import torch | |
from omegaconf import DictConfig, ListConfig, OmegaConf | |
from pytorch_lightning import Trainer, seed_everything | |
from utils import convert_to_openvino, upload_to_wandb, write_metrics | |
from anomalib.config import get_configurable_parameters, update_input_size_config | |
from anomalib.data import get_datamodule | |
from anomalib.models import get_model | |
from anomalib.utils.loggers import configure_logger | |
from anomalib.utils.sweep import ( | |
get_meta_data, | |
get_openvino_throughput, | |
get_run_config, | |
get_sweep_callbacks, | |
get_torch_throughput, | |
set_in_nested_config, | |
) | |
warnings.filterwarnings("ignore") | |
logger = logging.getLogger(__name__) | |
configure_logger() | |
pl_logger = logging.getLogger(__file__) | |
for logger_name in ["pytorch_lightning", "torchmetrics", "os"]: | |
logging.getLogger(logger_name).setLevel(logging.ERROR) | |
def hide_output(func): | |
"""Decorator to hide output of the function. | |
Args: | |
func (function): Hides output of this function. | |
Raises: | |
Exception: Incase the execution of function fails, it raises an exception. | |
Returns: | |
object of the called function | |
""" | |
def wrapper(*args, **kwargs): | |
std_out = sys.stdout | |
sys.stdout = buf = io.StringIO() | |
try: | |
value = func(*args, **kwargs) | |
except Exception as exp: | |
raise Exception(buf.getvalue()) from exp | |
sys.stdout = std_out | |
return value | |
return wrapper | |
def get_single_model_metrics(model_config: Union[DictConfig, ListConfig], openvino_metrics: bool = False) -> Dict: | |
"""Collects metrics for `model_name` and returns a dict of results. | |
Args: | |
model_config (DictConfig, ListConfig): Configuration for run | |
openvino_metrics (bool): If True, converts the model to OpenVINO format and gathers inference metrics. | |
Returns: | |
Dict: Collection of all the metrics such as time taken, throughput and performance scores. | |
""" | |
with TemporaryDirectory() as project_path: | |
model_config.project.path = project_path | |
datamodule = get_datamodule(model_config) | |
model = get_model(model_config) | |
callbacks = get_sweep_callbacks() | |
trainer = Trainer(**model_config.trainer, logger=None, callbacks=callbacks) | |
start_time = time.time() | |
trainer.fit(model=model, datamodule=datamodule) | |
# get start time | |
training_time = time.time() - start_time | |
# Creating new variable is faster according to https://stackoverflow.com/a/4330829 | |
start_time = time.time() | |
# get test results | |
test_results = trainer.test(model=model, datamodule=datamodule) | |
# get testing time | |
testing_time = time.time() - start_time | |
meta_data = get_meta_data(model, model_config.model.input_size) | |
throughput = get_torch_throughput(model_config, model, datamodule.test_dataloader().dataset, meta_data) | |
# Get OpenVINO metrics | |
openvino_throughput = float("nan") | |
if openvino_metrics: | |
# Create dirs for openvino model export | |
openvino_export_path = project_path / Path("exported_models") | |
openvino_export_path.mkdir(parents=True, exist_ok=True) | |
convert_to_openvino(model, openvino_export_path, model_config.model.input_size) | |
openvino_throughput = get_openvino_throughput( | |
model_config, openvino_export_path, datamodule.test_dataloader().dataset, meta_data | |
) | |
# arrange the data | |
data = { | |
"Training Time (s)": training_time, | |
"Testing Time (s)": testing_time, | |
"Inference Throughput (fps)": throughput, | |
"OpenVINO Inference Throughput (fps)": openvino_throughput, | |
} | |
for key, val in test_results[0].items(): | |
data[key] = float(val) | |
return data | |
def compute_on_cpu(): | |
"""Compute all run configurations over a sigle CPU.""" | |
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml") | |
for run_config in get_run_config(sweep_config.grid_search): | |
model_metrics = sweep(run_config, 0, sweep_config.seed, False) | |
write_metrics(model_metrics, sweep_config.writer) | |
def compute_on_gpu( | |
run_configs: Union[DictConfig, ListConfig], | |
device: int, | |
seed: int, | |
writers: List[str], | |
compute_openvino: bool = False, | |
): | |
"""Go over each run config and collect the result. | |
Args: | |
run_configs (Union[DictConfig, ListConfig]): List of run configurations. | |
device (int): The GPU id used for running the sweep. | |
seed (int): Fix a seed. | |
writers (List[str]): Destinations to write to. | |
compute_openvino (bool, optional): Compute OpenVINO throughput. Defaults to False. | |
""" | |
for run_config in run_configs: | |
if isinstance(run_config, (DictConfig, ListConfig)): | |
model_metrics = sweep(run_config, device, seed, compute_openvino) | |
write_metrics(model_metrics, writers) | |
else: | |
raise ValueError( | |
f"Expecting `run_config` of type DictConfig or ListConfig. Got {type(run_config)} instead." | |
) | |
def distribute_over_gpus(): | |
"""Distribute metric collection over all available GPUs. This is done by splitting the list of configurations.""" | |
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml") | |
with ProcessPoolExecutor( | |
max_workers=torch.cuda.device_count(), mp_context=multiprocessing.get_context("spawn") | |
) as executor: | |
run_configs = list(get_run_config(sweep_config.grid_search)) | |
jobs = [] | |
for device_id, run_split in enumerate( | |
range(0, len(run_configs), math.ceil(len(run_configs) / torch.cuda.device_count())) | |
): | |
jobs.append( | |
executor.submit( | |
compute_on_gpu, | |
run_configs[run_split : run_split + math.ceil(len(run_configs) / torch.cuda.device_count())], | |
device_id + 1, | |
sweep_config.seed, | |
sweep_config.writer, | |
sweep_config.compute_openvino, | |
) | |
) | |
for job in jobs: | |
try: | |
job.result() | |
except Exception as exc: | |
raise Exception(f"Error occurred while computing benchmark on device {job}") from exc | |
def distribute(): | |
"""Run all cpu experiments on a single process. Distribute gpu experiments over all available gpus. | |
Args: | |
device_count (int, optional): If device count is 0, uses only cpu else spawn processes according | |
to number of gpus available on the machine. Defaults to 0. | |
""" | |
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml") | |
devices = sweep_config.hardware | |
if not torch.cuda.is_available() and "gpu" in devices: | |
pl_logger.warning("Config requested GPU benchmarking but torch could not detect any cuda enabled devices") | |
elif {"cpu", "gpu"}.issubset(devices): | |
# Create process for gpu and cpu | |
with ProcessPoolExecutor(max_workers=2, mp_context=multiprocessing.get_context("spawn")) as executor: | |
jobs = [executor.submit(compute_on_cpu), executor.submit(distribute_over_gpus)] | |
for job in as_completed(jobs): | |
try: | |
job.result() | |
except Exception as exception: | |
raise Exception(f"Error occurred while computing benchmark on device {job}") from exception | |
elif "cpu" in devices: | |
compute_on_cpu() | |
elif "gpu" in devices: | |
distribute_over_gpus() | |
if "wandb" in sweep_config.writer: | |
upload_to_wandb(team="anomalib") | |
def sweep( | |
run_config: Union[DictConfig, ListConfig], device: int = 0, seed: int = 42, convert_openvino: bool = False | |
) -> Dict[str, Union[float, str]]: | |
"""Go over all the values mentioned in `grid_search` parameter of the benchmarking config. | |
Args: | |
run_config: (Union[DictConfig, ListConfig], optional): Configuration for current run. | |
device (int, optional): Name of the device on which the model is trained. Defaults to 0 "cpu". | |
convert_openvino (bool, optional): Whether to convert the model to openvino format. Defaults to False. | |
Returns: | |
Dict[str, Union[float, str]]: Dictionary containing the metrics gathered from the sweep. | |
""" | |
seed_everything(seed, workers=True) | |
# This assumes that `model_name` is always present in the sweep config. | |
model_config = get_configurable_parameters(model_name=run_config.model_name) | |
model_config.project.seed = seed | |
model_config = cast(DictConfig, model_config) # placate mypy | |
for param in run_config.keys(): | |
# grid search keys are always assumed to be strings | |
param = cast(str, param) # placate mypy | |
set_in_nested_config(model_config, param.split("."), run_config[param]) # type: ignore | |
# convert image size to tuple in case it was updated by run config | |
model_config = update_input_size_config(model_config) | |
# Set device in config. 0 - cpu, [0], [1].. - gpu id | |
model_config.trainer.gpus = 0 if device == 0 else [device - 1] | |
if run_config.model_name in ["patchcore", "cflow"]: | |
convert_openvino = False # `torch.cdist` is not supported by onnx version 11 | |
# TODO Remove this line when issue #40 is fixed https://github.com/openvinotoolkit/anomalib/issues/40 | |
if model_config.model.input_size != (224, 224): | |
return {} # go to next run | |
# Run benchmarking for current config | |
model_metrics = get_single_model_metrics(model_config=model_config, openvino_metrics=convert_openvino) | |
output = f"One sweep run complete for model {model_config.model.name}" | |
output += f" On category {model_config.dataset.category}" if model_config.dataset.category is not None else "" | |
output += str(model_metrics) | |
logger.info(output) | |
# Append configuration of current run to the collected metrics | |
for key, value in run_config.items(): | |
# Skip adding model name to the dataframe | |
if key != "model_name": | |
model_metrics[key] = value | |
# Add device name to list | |
model_metrics["device"] = "gpu" if device > 0 else "cpu" | |
model_metrics["model_name"] = run_config.model_name | |
return model_metrics | |
if __name__ == "__main__": | |
# Benchmarking entry point. | |
# Spawn multiple processes one for cpu and rest for the number of gpus available in the system. | |
# The idea is to distribute metrics collection over all the available devices. | |
logger.info("Benchmarking started πββοΈ. This will take a while β² depending on your configuration.") | |
distribute() | |
logger.info("Finished gathering results β‘") | |