Spaces:
Build error
Build error
File size: 11,759 Bytes
c8c12e9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
"""Benchmark all the algorithms in the repo."""
# Copyright (C) 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
import functools
import io
import logging
import math
import multiprocessing
import sys
import time
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, List, Union, cast
import torch
from omegaconf import DictConfig, ListConfig, OmegaConf
from pytorch_lightning import Trainer, seed_everything
from utils import convert_to_openvino, upload_to_wandb, write_metrics
from anomalib.config import get_configurable_parameters, update_input_size_config
from anomalib.data import get_datamodule
from anomalib.models import get_model
from anomalib.utils.loggers import configure_logger
from anomalib.utils.sweep import (
get_meta_data,
get_openvino_throughput,
get_run_config,
get_sweep_callbacks,
get_torch_throughput,
set_in_nested_config,
)
warnings.filterwarnings("ignore")
logger = logging.getLogger(__name__)
configure_logger()
pl_logger = logging.getLogger(__file__)
for logger_name in ["pytorch_lightning", "torchmetrics", "os"]:
logging.getLogger(logger_name).setLevel(logging.ERROR)
def hide_output(func):
"""Decorator to hide output of the function.
Args:
func (function): Hides output of this function.
Raises:
Exception: Incase the execution of function fails, it raises an exception.
Returns:
object of the called function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
std_out = sys.stdout
sys.stdout = buf = io.StringIO()
try:
value = func(*args, **kwargs)
except Exception as exp:
raise Exception(buf.getvalue()) from exp
sys.stdout = std_out
return value
return wrapper
@hide_output
def get_single_model_metrics(model_config: Union[DictConfig, ListConfig], openvino_metrics: bool = False) -> Dict:
"""Collects metrics for `model_name` and returns a dict of results.
Args:
model_config (DictConfig, ListConfig): Configuration for run
openvino_metrics (bool): If True, converts the model to OpenVINO format and gathers inference metrics.
Returns:
Dict: Collection of all the metrics such as time taken, throughput and performance scores.
"""
with TemporaryDirectory() as project_path:
model_config.project.path = project_path
datamodule = get_datamodule(model_config)
model = get_model(model_config)
callbacks = get_sweep_callbacks()
trainer = Trainer(**model_config.trainer, logger=None, callbacks=callbacks)
start_time = time.time()
trainer.fit(model=model, datamodule=datamodule)
# get start time
training_time = time.time() - start_time
# Creating new variable is faster according to https://stackoverflow.com/a/4330829
start_time = time.time()
# get test results
test_results = trainer.test(model=model, datamodule=datamodule)
# get testing time
testing_time = time.time() - start_time
meta_data = get_meta_data(model, model_config.model.input_size)
throughput = get_torch_throughput(model_config, model, datamodule.test_dataloader().dataset, meta_data)
# Get OpenVINO metrics
openvino_throughput = float("nan")
if openvino_metrics:
# Create dirs for openvino model export
openvino_export_path = project_path / Path("exported_models")
openvino_export_path.mkdir(parents=True, exist_ok=True)
convert_to_openvino(model, openvino_export_path, model_config.model.input_size)
openvino_throughput = get_openvino_throughput(
model_config, openvino_export_path, datamodule.test_dataloader().dataset, meta_data
)
# arrange the data
data = {
"Training Time (s)": training_time,
"Testing Time (s)": testing_time,
"Inference Throughput (fps)": throughput,
"OpenVINO Inference Throughput (fps)": openvino_throughput,
}
for key, val in test_results[0].items():
data[key] = float(val)
return data
def compute_on_cpu():
"""Compute all run configurations over a sigle CPU."""
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
for run_config in get_run_config(sweep_config.grid_search):
model_metrics = sweep(run_config, 0, sweep_config.seed, False)
write_metrics(model_metrics, sweep_config.writer)
def compute_on_gpu(
run_configs: Union[DictConfig, ListConfig],
device: int,
seed: int,
writers: List[str],
compute_openvino: bool = False,
):
"""Go over each run config and collect the result.
Args:
run_configs (Union[DictConfig, ListConfig]): List of run configurations.
device (int): The GPU id used for running the sweep.
seed (int): Fix a seed.
writers (List[str]): Destinations to write to.
compute_openvino (bool, optional): Compute OpenVINO throughput. Defaults to False.
"""
for run_config in run_configs:
if isinstance(run_config, (DictConfig, ListConfig)):
model_metrics = sweep(run_config, device, seed, compute_openvino)
write_metrics(model_metrics, writers)
else:
raise ValueError(
f"Expecting `run_config` of type DictConfig or ListConfig. Got {type(run_config)} instead."
)
def distribute_over_gpus():
"""Distribute metric collection over all available GPUs. This is done by splitting the list of configurations."""
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
with ProcessPoolExecutor(
max_workers=torch.cuda.device_count(), mp_context=multiprocessing.get_context("spawn")
) as executor:
run_configs = list(get_run_config(sweep_config.grid_search))
jobs = []
for device_id, run_split in enumerate(
range(0, len(run_configs), math.ceil(len(run_configs) / torch.cuda.device_count()))
):
jobs.append(
executor.submit(
compute_on_gpu,
run_configs[run_split : run_split + math.ceil(len(run_configs) / torch.cuda.device_count())],
device_id + 1,
sweep_config.seed,
sweep_config.writer,
sweep_config.compute_openvino,
)
)
for job in jobs:
try:
job.result()
except Exception as exc:
raise Exception(f"Error occurred while computing benchmark on device {job}") from exc
def distribute():
"""Run all cpu experiments on a single process. Distribute gpu experiments over all available gpus.
Args:
device_count (int, optional): If device count is 0, uses only cpu else spawn processes according
to number of gpus available on the machine. Defaults to 0.
"""
sweep_config = OmegaConf.load("tools/benchmarking/benchmark_params.yaml")
devices = sweep_config.hardware
if not torch.cuda.is_available() and "gpu" in devices:
pl_logger.warning("Config requested GPU benchmarking but torch could not detect any cuda enabled devices")
elif {"cpu", "gpu"}.issubset(devices):
# Create process for gpu and cpu
with ProcessPoolExecutor(max_workers=2, mp_context=multiprocessing.get_context("spawn")) as executor:
jobs = [executor.submit(compute_on_cpu), executor.submit(distribute_over_gpus)]
for job in as_completed(jobs):
try:
job.result()
except Exception as exception:
raise Exception(f"Error occurred while computing benchmark on device {job}") from exception
elif "cpu" in devices:
compute_on_cpu()
elif "gpu" in devices:
distribute_over_gpus()
if "wandb" in sweep_config.writer:
upload_to_wandb(team="anomalib")
def sweep(
run_config: Union[DictConfig, ListConfig], device: int = 0, seed: int = 42, convert_openvino: bool = False
) -> Dict[str, Union[float, str]]:
"""Go over all the values mentioned in `grid_search` parameter of the benchmarking config.
Args:
run_config: (Union[DictConfig, ListConfig], optional): Configuration for current run.
device (int, optional): Name of the device on which the model is trained. Defaults to 0 "cpu".
convert_openvino (bool, optional): Whether to convert the model to openvino format. Defaults to False.
Returns:
Dict[str, Union[float, str]]: Dictionary containing the metrics gathered from the sweep.
"""
seed_everything(seed, workers=True)
# This assumes that `model_name` is always present in the sweep config.
model_config = get_configurable_parameters(model_name=run_config.model_name)
model_config.project.seed = seed
model_config = cast(DictConfig, model_config) # placate mypy
for param in run_config.keys():
# grid search keys are always assumed to be strings
param = cast(str, param) # placate mypy
set_in_nested_config(model_config, param.split("."), run_config[param]) # type: ignore
# convert image size to tuple in case it was updated by run config
model_config = update_input_size_config(model_config)
# Set device in config. 0 - cpu, [0], [1].. - gpu id
model_config.trainer.gpus = 0 if device == 0 else [device - 1]
if run_config.model_name in ["patchcore", "cflow"]:
convert_openvino = False # `torch.cdist` is not supported by onnx version 11
# TODO Remove this line when issue #40 is fixed https://github.com/openvinotoolkit/anomalib/issues/40
if model_config.model.input_size != (224, 224):
return {} # go to next run
# Run benchmarking for current config
model_metrics = get_single_model_metrics(model_config=model_config, openvino_metrics=convert_openvino)
output = f"One sweep run complete for model {model_config.model.name}"
output += f" On category {model_config.dataset.category}" if model_config.dataset.category is not None else ""
output += str(model_metrics)
logger.info(output)
# Append configuration of current run to the collected metrics
for key, value in run_config.items():
# Skip adding model name to the dataframe
if key != "model_name":
model_metrics[key] = value
# Add device name to list
model_metrics["device"] = "gpu" if device > 0 else "cpu"
model_metrics["model_name"] = run_config.model_name
return model_metrics
if __name__ == "__main__":
# Benchmarking entry point.
# Spawn multiple processes one for cpu and rest for the number of gpus available in the system.
# The idea is to distribute metrics collection over all the available devices.
logger.info("Benchmarking started πββοΈ. This will take a while β² depending on your configuration.")
distribute()
logger.info("Finished gathering results β‘")
|