opencv_zoo / benchmark /benchmark.py
Yuantao Feng
Add SFace for face recognition (#3)
9d96bb5
raw
history blame
7.5 kB
import os
import argparse
import yaml
import tqdm
import numpy as np
import cv2 as cv
from models import MODELS
parser = argparse.ArgumentParser("Benchmarks for OpenCV Zoo.")
parser.add_argument('--cfg', '-c', type=str,
help='Benchmarking on the given config.')
args = parser.parse_args()
class Timer:
def __init__(self, warmup=0, reduction='median'):
self._warmup = warmup
self._reduction = reduction
self._tm = cv.TickMeter()
self._time_record = []
self._calls = 0
def start(self):
self._tm.start()
def stop(self):
self._tm.stop()
self._calls += 1
self._time_record.append(self._tm.getTimeMilli())
self._tm.reset()
def reset(self):
self._time_record = []
self._calls = 0
def getResult(self):
if self._reduction == 'median':
return self._getMedian(self._time_record[self._warmup:])
elif self._reduction == 'gmean':
return self._getGMean(self._time_record[self._warmup:])
else:
raise NotImplementedError()
def _getMedian(self, records):
''' Return median time
'''
l = len(records)
mid = int(l / 2)
if l % 2 == 0:
return (records[mid] + records[mid - 1]) / 2
else:
return records[mid]
def _getGMean(self, records, drop_largest=3):
''' Return geometric mean of time
'''
time_record_sorted = sorted(records, reverse=True)
return sum(records[drop_largest:]) / (self._calls - drop_largest)
class Data:
def __init__(self, **kwargs):
self._path = kwargs.pop('path', None)
assert self._path, 'Benchmark[\'data\'][\'path\'] cannot be empty.'
self._files = kwargs.pop('files', None)
if not self._files:
print('Benchmark[\'data\'][\'files\'] is empty, loading all images by default.')
self._files = list()
for filename in os.listdir(self._path):
if filename.endswith('jpg') or filename.endswith('png'):
self._files.append(filename)
self._use_label = kwargs.pop('useLabel', False)
if self._use_label:
self._labels = self._load_label()
def _load_label(self):
labels = dict.fromkeys(self._files, None)
for filename in self._files:
labels[filename] = np.loadtxt(os.path.join(self._path, '{}.txt'.format(filename[:-4])), ndmin=2)
return labels
def __getitem__(self, idx):
image = cv.imread(os.path.join(self._path, self._files[idx]))
if self._use_label:
return self._files[idx], image, self._labels[self._files[idx]]
else:
return self._files[idx], image
class Metric:
def __init__(self, **kwargs):
self._sizes = kwargs.pop('sizes', None)
self._warmup = kwargs.pop('warmup', 3)
self._repeat = kwargs.pop('repeat', 10)
assert self._warmup < self._repeat, 'The value of warmup must be smaller than the value of repeat.'
self._batch_size = kwargs.pop('batchSize', 1)
self._reduction = kwargs.pop('reduction', 'median')
self._timer = Timer(self._warmup, self._reduction)
def getReduction(self):
return self._reduction
def forward(self, model, *args, **kwargs):
img = args[0]
h, w, _ = img.shape
if not self._sizes:
self._sizes = [[w, h]]
results = dict()
self._timer.reset()
if len(args) == 1:
for size in self._sizes:
img_r = cv.resize(img, size)
model.setInputSize(size)
# TODO: batched inference
# input_data = [img] * self._batch_size
input_data = img_r
for _ in range(self._repeat+self._warmup):
self._timer.start()
model.infer(input_data)
self._timer.stop()
results[str(size)] = self._timer.getResult()
else:
# TODO: batched inference
# input_data = [args] * self._batch_size
bboxes = args[1]
for idx, bbox in enumerate(bboxes):
for _ in range(self._repeat+self._warmup):
self._timer.start()
model.infer(img, bbox)
self._timer.stop()
results['bbox{}'.format(idx)] = self._timer.getResult()
return results
class Benchmark:
def __init__(self, **kwargs):
self._data_dict = kwargs.pop('data', None)
assert self._data_dict, 'Benchmark[\'data\'] cannot be empty and must have path and files.'
self._data = Data(**self._data_dict)
self._metric_dict = kwargs.pop('metric', None)
self._metric = Metric(**self._metric_dict)
backend_id = kwargs.pop('backend', 'default')
available_backends = dict(
default=cv.dnn.DNN_BACKEND_DEFAULT,
# halide=cv.dnn.DNN_BACKEND_HALIDE,
# inference_engine=cv.dnn.DNN_BACKEND_INFERENCE_ENGINE,
opencv=cv.dnn.DNN_BACKEND_OPENCV,
# vkcom=cv.dnn.DNN_BACKEND_VKCOM,
cuda=cv.dnn.DNN_BACKEND_CUDA
)
self._backend = available_backends[backend_id]
target_id = kwargs.pop('target', 'cpu')
available_targets = dict(
cpu=cv.dnn.DNN_TARGET_CPU,
# opencl=cv.dnn.DNN_TARGET_OPENCL,
# opencl_fp16=cv.dnn.DNN_TARGET_OPENCL_FP16,
# myriad=cv.dnn.DNN_TARGET_MYRIAD,
# vulkan=cv.dnn.DNN_TARGET_VULKAN,
# fpga=cv.dnn.DNN_TARGET_FPGA,
cuda=cv.dnn.DNN_TARGET_CUDA,
cuda_fp16=cv.dnn.DNN_TARGET_CUDA_FP16,
# hddl=cv.dnn.DNN_TARGET_HDDL
)
self._target = available_targets[target_id]
self._benchmark_results = dict()
def run(self, model):
model.setBackend(self._backend)
model.setTarget(self._target)
for data in self._data:
self._benchmark_results[data[0]] = self._metric.forward(model, *data[1:])
def printResults(self):
for imgName, results in self._benchmark_results.items():
print(' image: {}'.format(imgName))
total_latency = 0
for key, latency in results.items():
total_latency += latency
print(' {}, latency ({}): {:.4f} ms'.format(key, self._metric.getReduction(), latency))
def build_from_cfg(cfg, registery):
obj_name = cfg.pop('name')
obj = registery.get(obj_name)
return obj(**cfg)
def prepend_pythonpath(cfg, key1, key2):
pythonpath = os.environ['PYTHONPATH']
if cfg[key1][key2].startswith('/'):
return
cfg[key1][key2] = os.path.join(pythonpath, cfg[key1][key2])
if __name__ == '__main__':
assert args.cfg.endswith('yaml'), 'Currently support configs of yaml format only.'
with open(args.cfg, 'r') as f:
cfg = yaml.safe_load(f)
# prepend PYTHONPATH to each path
prepend_pythonpath(cfg['Benchmark'], key1='data', key2='path')
prepend_pythonpath(cfg, key1='Model', key2='modelPath')
# Instantiate benchmarking
benchmark = Benchmark(**cfg['Benchmark'])
# Instantiate model
model = build_from_cfg(cfg=cfg['Model'], registery=MODELS)
# Run benchmarking
print('Benchmarking {}:'.format(model.name))
benchmark.run(model)
benchmark.printResults()