Benchmark / app.py
Nathan12's picture
update app
6f1502c
import gradio as gr
import plotly
# %% ../nbs/00_benchmark.ipynb 5
import torch
import time
from codecarbon import OfflineEmissionsTracker
import numpy as np
import os
from thop import profile, clever_format
from tqdm.notebook import tqdm
from torchprofile import profile_macs
# %% ../nbs/00_benchmark.ipynb 7
def get_model_size(model, temp_path="temp_model.pth"):
torch.save(model.state_dict(), temp_path)
model_size = os.path.getsize(temp_path)
os.remove(temp_path)
return model_size
# %% ../nbs/00_benchmark.ipynb 8
def get_num_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
# %% ../nbs/00_benchmark.ipynb 11
@torch.inference_mode()
def evaluate_cpu_speed(model, dummy_input, warmup_rounds=5, test_rounds=20):
device = torch.device("cpu")
model.eval()
model.to(device)
dummy_input = dummy_input.to(device)
# Warm up CPU
for _ in range(warmup_rounds):
_ = model(dummy_input)
# Measure Latency
latencies = []
for _ in range(test_rounds):
start_time = time.perf_counter()
_ = model(dummy_input)
end_time = time.perf_counter()
latencies.append(end_time - start_time)
latencies = np.array(latencies) * 1000 # Convert to milliseconds
mean_latency = np.mean(latencies)
std_latency = np.std(latencies)
# Measure Throughput
throughput = dummy_input.size(0) * 1000 / mean_latency # Inferences per second
return mean_latency, std_latency, throughput
# %% ../nbs/00_benchmark.ipynb 13
@torch.inference_mode()
def get_model_macs(model, inputs) -> int:
return profile_macs(model, inputs)
# %% ../nbs/00_benchmark.ipynb 16
@torch.inference_mode()
def evaluate_emissions(model, dummy_input, warmup_rounds=5, test_rounds=20):
device = torch.device("cpu")
model.eval()
model.to(device)
dummy_input = dummy_input.to(device)
# Warm up GPU
for _ in range(warmup_rounds):
_ = model(dummy_input)
# Measure Latency
tracker = OfflineEmissionsTracker(country_iso_code="USA")
tracker.start()
for _ in range(test_rounds):
_ = model(dummy_input)
tracker.stop()
total_emissions = tracker.final_emissions
total_energy_consumed = tracker.final_emissions_data.energy_consumed
# Calculate average emissions and energy consumption per inference
average_emissions_per_inference = total_emissions / test_rounds
average_energy_per_inference = total_energy_consumed / test_rounds
return average_emissions_per_inference, average_energy_per_inference
# %% ../nbs/00_benchmark.ipynb 18
@torch.inference_mode()
def benchmark(model, dummy_input):
# Model Size
print('disk size')
disk_size = get_model_size(model)
#num_parameters = get_num_parameters(model)
# CPU Speed
print('cpu speed')
cpu_latency, cpu_std_latency, cpu_throughput = evaluate_cpu_speed(model, dummy_input)
# Model MACs
#macs = get_model_macs(model, dummy_input)
print('macs')
macs, params = profile(model, inputs=(dummy_input, ))
macs, num_parameters = clever_format([macs, params], "%.3f")
print('emissions')
# Emissions
avg_emissions, avg_energy = evaluate_emissions(model, dummy_input)
# Print results
print(f"Model Size: {disk_size / 1e6:.2f} MB (disk), {num_parameters} parameters")
print(f"CPU Latency: {cpu_latency:.3f} ms (± {cpu_std_latency:.3f} ms)")
print(f"CPU Throughput: {cpu_throughput:.2f} inferences/sec")
print(f"Model MACs: {macs}")
print(f"Average Carbon Emissions per Inference: {avg_emissions*1e3:.6f} gCO2e")
print(f"Average Energy Consumption per Inference: {avg_energy*1e3:.6f} Wh")
return {
'disk_size': disk_size,
'num_parameters': num_parameters,
'cpu_latency': cpu_latency,
'cpu_throughput': cpu_throughput,
'macs': macs,
'avg_emissions': avg_emissions,
'avg_energy': avg_energy
}
def parse_metric_value(value_str):
"""Convert string values with units (M, G) to float"""
if isinstance(value_str, (int, float)):
return float(value_str)
value_str = str(value_str)
if 'G' in value_str:
return float(value_str.replace('G', '')) * 1000 # Convert G to M
elif 'M' in value_str:
return float(value_str.replace('M', '')) # Keep in M
elif 'K' in value_str:
return float(value_str.replace('K', '')) / 1000 # Convert K to M
else:
return float(value_str)
def create_radar_plot(benchmark_results):
import plotly.graph_objects as go
# Define metrics with icons, hover text format, and units
metrics = {
'💾': { # Storage icon
'value': benchmark_results['disk_size'] / 1e6,
'hover_format': 'Model Size: {:.2f} MB',
'unit': 'MB'
},
'🧮': { # Calculator icon for parameters
'value': parse_metric_value(benchmark_results['num_parameters']),
'hover_format': 'Parameters: {:.2f}M',
'unit': 'M'
},
'⏱️': { # Clock icon for latency
'value': benchmark_results['cpu_latency'],
'hover_format': 'Latency: {:.2f} ms',
'unit': 'ms'
},
'⚡': { # Lightning bolt for MACs
'value': parse_metric_value(benchmark_results['macs']),
'hover_format': 'MACs: {:.2f}G',
'unit': 'G'
},
'🔋': { # Battery icon for energy
'value': benchmark_results['avg_energy'] * 1e6,
'hover_format': 'Energy: {:.3f} mWh',
'unit': 'mWh'
}
}
# Find min and max values for each metric
reference_values = {
'💾': {'min': 0, 'max': max(metrics['💾']['value'], 1000)}, # Model size (MB)
'🧮': {'min': 0, 'max': max(metrics['🧮']['value'], 50)}, # Parameters (M)
'⏱️': {'min': 0, 'max': max(metrics['⏱️']['value'], 200)}, # Latency (ms)
'⚡': {'min': 0, 'max': max(metrics['⚡']['value'], 5000)}, # MACs (G)
'🔋': {'min': 0, 'max': max(metrics['🔋']['value'], 10)} # Energy (mWh)
}
# Normalize values and create hover text
normalized_values = []
hover_texts = []
labels = []
for icon, metric in metrics.items():
# Min-max normalization
normalized_value = (metric['value'] - reference_values[icon]['min']) / \
(reference_values[icon]['max'] - reference_values[icon]['min'])
normalized_values.append(normalized_value)
# Create hover text with actual value
hover_texts.append(metric['hover_format'].format(metric['value']))
labels.append(icon)
# Add first values again to close the polygon
normalized_values.append(normalized_values[0])
hover_texts.append(hover_texts[0])
labels.append(labels[0])
fig = go.Figure()
fig.add_trace(go.Scatterpolar(
r=normalized_values,
theta=labels,
fill='toself',
name='Model Metrics',
hovertext=hover_texts,
hoverinfo='text',
line=dict(color='#FF8C00'), # Bright orange color
fillcolor='rgba(255, 140, 0, 0.3)' # Semi-transparent orange
))
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=True,
range=[0, 1],
showticklabels=False, # Hide radial axis labels
gridcolor='rgba(128, 128, 128, 0.5)', # Semi-transparent grey grid lines
linecolor='rgba(128, 128, 128, 0.5)' # Semi-transparent grey axis lines
),
angularaxis=dict(
tickfont=dict(size=24), # Icon labels
gridcolor='rgba(128, 128, 128, 0.5)' # Semi-transparent grey grid lines
),
bgcolor='rgba(0,0,0,0)' # Transparent background
),
showlegend=False,
margin=dict(t=100, b=100, l=100, r=100),
paper_bgcolor='rgba(0,0,0,0)', # Transparent background
plot_bgcolor='rgba(0,0,0,0)' # Transparent background
)
return fig
# Rest of the code remains the same
def benchmark_interface(model_name):
import torchvision.models as models
model_mapping = {
'ResNet18': models.resnet18(pretrained=False),
'ResNet50': models.resnet50(pretrained=False),
'MobileNetV2': models.mobilenet_v2(pretrained=False),
'EfficientNet-B0': models.efficientnet_b0(pretrained=False),
'VGG16': models.vgg16(pretrained=False),
'DenseNet121': models.densenet121(pretrained=False)
}
model = model_mapping[model_name]
dummy_input = torch.randn(1, 3, 224, 224)
# Run benchmark
results = benchmark(model, dummy_input)
# Create radar plot
plot = create_radar_plot(results)
return plot
available_models = ['ResNet18', 'ResNet50', 'MobileNetV2', 'EfficientNet-B0', 'VGG16', 'DenseNet121']
iface = gr.Interface(
fn=benchmark_interface,
inputs=[
gr.Dropdown(choices=available_models, label="Select Model", value='ResNet18')
],
outputs=[
gr.Plot(label="Model Benchmark Results")
],
)
iface.launch()