leaderboard-viewer / src /streamlit_app.py
Anas Awadalla
fix caching of elements
628f62f
raw
history blame
21.7 kB
import os
# Set HF_HOME for caching
os.environ["HF_HOME"] = "src/data_cache"
import streamlit as st
import pandas as pd
import altair as alt
from huggingface_hub import HfApi, HfFileSystem
import json
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
# Page config
st.set_page_config(
page_title="Grounding Benchmark Leaderboard",
page_icon="🎯",
layout="wide"
)
# Constants
REPO_ID = "mlfoundations-cua-dev/leaderboard"
GROUNDING_PATH = "grounding"
# Baselines for different datasets
BASELINES = {
"screenspot-v2": {
"Qwen2-VL-7B": {
"desktop_text": 52.01, "desktop_icon": 44.98, "web_text": 33.04, "web_icon": 21.84, "overall": 37.96
},
"UI-TARS-2B": {
"desktop_text": 90.7, "desktop_icon": 68.6, "web_text": 87.2, "web_icon": 84.7, "overall": 82.8
},
"UI-TARS-7B": {
"desktop_text": 95.4, "desktop_icon": 87.8, "web_text": 93.8, "web_icon": 91.6, "overall": 92.2
},
"UI-TARS-72B": {
"desktop_text": 91.2, "desktop_icon": 87.8, "web_text": 87.7, "web_icon": 86.3, "overall": 88.3
}
},
"screenspot-pro": {
"Qwen2.5-VL-3B-Instruct": {"overall": 16.1},
"Qwen2.5-VL-7B-Instruct": {"overall": 26.8},
"Qwen2.5-VL-72B-Instruct": {"overall": 53.3},
"UI-TARS-2B": {"overall": 27.7},
"UI-TARS-7B": {"overall": 35.7},
"UI-TARS-72B": {"overall": 38.1}
},
"showdown-clicks": {
"Qwen2.5-VL-72B-Instruct": {"overall": 24.78},
"UI-TARS-72B-SFT": {"overall": 54.4},
"Molmo-72B-0924": {"overall": 54.76}
}
}
@st.cache_data(ttl=300) # Cache for 5 minutes
def fetch_leaderboard_data():
"""Fetch all grounding results from HuggingFace leaderboard by streaming JSON files."""
api = HfApi()
fs = HfFileSystem()
try:
# List all files in the grounding directory
files = api.list_repo_files(repo_id=REPO_ID, repo_type="dataset")
grounding_files = [f for f in files if f.startswith(f"{GROUNDING_PATH}/") and f.endswith(".json")]
results = []
# Create progress bar for loading
progress_bar = st.progress(0)
status_text = st.empty()
for idx, file_path in enumerate(grounding_files):
try:
# Update progress
progress = (idx + 1) / len(grounding_files)
progress_bar.progress(progress)
status_text.text(f"Loading {idx + 1}/{len(grounding_files)} files...")
# Stream the JSON file content directly from HuggingFace
file_url = f"datasets/{REPO_ID}/{file_path}"
# Read the file content directly without downloading
with fs.open(file_url, 'r') as f:
data = json.load(f)
# Extract only the necessary information
metadata = data.get("metadata", {})
metrics = data.get("metrics", {})
detailed_results = data.get("detailed_results", {})
# Parse the file path to get dataset and model info
path_parts = file_path.split('/')
dataset_name = path_parts[1] if len(path_parts) > 1 else "unknown"
# Get model name from metadata or path
model_checkpoint = metadata.get("model_checkpoint", "")
model_name = model_checkpoint.split('/')[-1]
base_model_name = None
is_checkpoint = False
# Handle checkpoint names
if not model_name and len(path_parts) > 2:
# Check if it's a checkpoint subdirectory structure
if len(path_parts) > 3 and path_parts[2] != path_parts[3]:
# Format: grounding/dataset/base_model/checkpoint.json
base_model_name = path_parts[2]
checkpoint_file = path_parts[3].replace(".json", "")
model_name = f"{base_model_name}/{checkpoint_file}"
is_checkpoint = True
else:
# Regular format: grounding/dataset/results_modelname.json
model_name = path_parts[2].replace("results_", "").replace(".json", "")
base_model_name = model_name
# Check if model name indicates a checkpoint
if 'checkpoint-' in model_name:
is_checkpoint = True
if not base_model_name:
# Extract base model name from full path
if '/' in model_name:
parts = model_name.split('/')
base_model_name = parts[0]
else:
# Try to get from model_checkpoint path
checkpoint_parts = model_checkpoint.split('/')
if len(checkpoint_parts) > 1:
base_model_name = checkpoint_parts[-2]
# Extract UI type results if available
ui_type_results = detailed_results.get("by_ui_type", {})
dataset_type_results = detailed_results.get("by_dataset_type", {})
results_by_file = detailed_results.get("by_file", {})
# Create a compact result entry (only keep what we need for visualization)
result_entry = {
"dataset": dataset_name,
"model": model_name,
"base_model": base_model_name or model_name,
"is_checkpoint": is_checkpoint,
"model_path": model_checkpoint,
"overall_accuracy": metrics.get("accuracy", 0) * 100, # Convert to percentage
"total_samples": metrics.get("total", 0),
"timestamp": metadata.get("evaluation_timestamp", ""),
"checkpoint_steps": metadata.get("checkpoint_steps"),
"training_loss": metadata.get("training_loss"),
"ui_type_results": ui_type_results,
"dataset_type_results": dataset_type_results,
"results_by_file": results_by_file
}
results.append(result_entry)
except Exception as e:
st.warning(f"Error loading {file_path}: {str(e)}")
continue
# Clear progress indicators
progress_bar.empty()
status_text.empty()
# Create DataFrame
df = pd.DataFrame(results)
# Process checkpoints: for each base model, find the best checkpoint
if not df.empty:
# Group by dataset and base_model
grouped = df.groupby(['dataset', 'base_model'])
# For each group, find the best checkpoint
best_models = []
for (dataset, base_model), group in grouped:
if len(group) > 1:
# Multiple entries for this model (likely checkpoints)
best_idx = group['overall_accuracy'].idxmax()
best_row = group.loc[best_idx].copy()
# Check if the best is the last checkpoint
checkpoint_steps = group[group['checkpoint_steps'].notna()]['checkpoint_steps'].sort_values()
if len(checkpoint_steps) > 0:
last_checkpoint_steps = checkpoint_steps.iloc[-1]
best_checkpoint_steps = best_row['checkpoint_steps']
if pd.notna(best_checkpoint_steps) and best_checkpoint_steps != last_checkpoint_steps:
# Best checkpoint is not the last one, add asterisk
best_row['model'] = best_row['model'] + '*'
best_row['is_best_not_last'] = True
else:
best_row['is_best_not_last'] = False
# Store all checkpoints for this model
best_row['all_checkpoints'] = group.to_dict('records')
best_models.append(best_row)
else:
# Single entry for this model
row = group.iloc[0].copy()
row['is_best_not_last'] = False
row['all_checkpoints'] = [row.to_dict()]
best_models.append(row)
# Create new dataframe with best models
df_best = pd.DataFrame(best_models)
return df_best
return df
except Exception as e:
st.error(f"Error fetching leaderboard data: {str(e)}")
return pd.DataFrame()
def parse_ui_type_metrics(df: pd.DataFrame, dataset_filter: str) -> pd.DataFrame:
"""Parse UI type metrics from the results dataframe."""
metrics_list = []
for _, row in df.iterrows():
if row['dataset'] != dataset_filter:
continue
model = row['model']
ui_results = row.get('ui_type_results', {})
dataset_type_results = row.get('dataset_type_results', {})
results_by_file = row.get('results_by_file', {})
# For ScreenSpot datasets
if 'screenspot' in dataset_filter.lower():
# Check if we have desktop/web breakdown in results_by_file
desktop_file = None
web_file = None
for filename, file_results in results_by_file.items():
if 'desktop' in filename.lower():
desktop_file = file_results
elif 'web' in filename.lower():
web_file = file_results
if desktop_file and web_file:
# We have desktop/web breakdown
desktop_text = desktop_file.get('by_ui_type', {}).get('text', {}).get('correct', 0) / max(desktop_file.get('by_ui_type', {}).get('text', {}).get('total', 1), 1) * 100
desktop_icon = desktop_file.get('by_ui_type', {}).get('icon', {}).get('correct', 0) / max(desktop_file.get('by_ui_type', {}).get('icon', {}).get('total', 1), 1) * 100
web_text = web_file.get('by_ui_type', {}).get('text', {}).get('correct', 0) / max(web_file.get('by_ui_type', {}).get('text', {}).get('total', 1), 1) * 100
web_icon = web_file.get('by_ui_type', {}).get('icon', {}).get('correct', 0) / max(web_file.get('by_ui_type', {}).get('icon', {}).get('total', 1), 1) * 100
# Calculate averages
desktop_avg = (desktop_text + desktop_icon) / 2 if (desktop_text > 0 or desktop_icon > 0) else 0
web_avg = (web_text + web_icon) / 2 if (web_text > 0 or web_icon > 0) else 0
text_avg = (desktop_text + web_text) / 2 if (desktop_text > 0 or web_text > 0) else 0
icon_avg = (desktop_icon + web_icon) / 2 if (desktop_icon > 0 or web_icon > 0) else 0
# For screenspot-v2, calculate the overall as average of desktop and web
if dataset_filter == 'screenspot-v2':
overall = (desktop_avg + web_avg) / 2 if (desktop_avg > 0 or web_avg > 0) else row['overall_accuracy']
else:
overall = row['overall_accuracy']
metrics_list.append({
'model': model,
'desktop_text': desktop_text,
'desktop_icon': desktop_icon,
'web_text': web_text,
'web_icon': web_icon,
'desktop_avg': desktop_avg,
'web_avg': web_avg,
'text_avg': text_avg,
'icon_avg': icon_avg,
'overall': overall,
'is_best_not_last': row.get('is_best_not_last', False),
'all_checkpoints': row.get('all_checkpoints', [])
})
elif 'text' in ui_results and 'icon' in ui_results:
# Simple text/icon structure without desktop/web breakdown
text_acc = (ui_results.get('text', {}).get('correct', 0) / max(ui_results.get('text', {}).get('total', 1), 1)) * 100
icon_acc = (ui_results.get('icon', {}).get('correct', 0) / max(ui_results.get('icon', {}).get('total', 1), 1)) * 100
metrics_list.append({
'model': model,
'text': text_acc,
'icon': icon_acc,
'overall': row['overall_accuracy'],
'is_best_not_last': row.get('is_best_not_last', False),
'all_checkpoints': row.get('all_checkpoints', [])
})
else:
# Try to get from dataset_type_results if available
found_data = False
for dataset_key in dataset_type_results:
if 'screenspot' in dataset_key.lower():
dataset_data = dataset_type_results[dataset_key]
if 'by_ui_type' in dataset_data:
ui_data = dataset_data['by_ui_type']
text_data = ui_data.get('text', {})
icon_data = ui_data.get('icon', {})
text_acc = (text_data.get('correct', 0) / max(text_data.get('total', 1), 1)) * 100
icon_acc = (icon_data.get('correct', 0) / max(icon_data.get('total', 1), 1)) * 100
metrics_list.append({
'model': model,
'text': text_acc,
'icon': icon_acc,
'overall': row['overall_accuracy'],
'is_best_not_last': row.get('is_best_not_last', False),
'all_checkpoints': row.get('all_checkpoints', [])
})
found_data = True
break
if not found_data:
# No UI type data available, just use overall
metrics_list.append({
'model': model,
'overall': row['overall_accuracy'],
'is_best_not_last': row.get('is_best_not_last', False),
'all_checkpoints': row.get('all_checkpoints', [])
})
else:
# For non-screenspot datasets, just pass through overall accuracy
metrics_list.append({
'model': model,
'overall': row['overall_accuracy'],
'is_best_not_last': row.get('is_best_not_last', False),
'all_checkpoints': row.get('all_checkpoints', [])
})
return pd.DataFrame(metrics_list)
def create_bar_chart(data: pd.DataFrame, metric: str, title: str):
"""Create a bar chart for a specific metric."""
# Prepare data for the chart
chart_data = []
# Add model results
for _, row in data.iterrows():
if metric in row and row[metric] > 0:
chart_data.append({
'Model': row['model'],
'Score': row[metric],
'Type': 'Evaluated'
})
# Add baselines if available
dataset = st.session_state.get('selected_dataset', '')
if dataset in BASELINES:
for baseline_name, baseline_metrics in BASELINES[dataset].items():
metric_key = metric.replace('_avg', '').replace('avg', 'overall')
if metric_key in baseline_metrics:
chart_data.append({
'Model': baseline_name,
'Score': baseline_metrics[metric_key],
'Type': 'Baseline'
})
if not chart_data:
return None
df_chart = pd.DataFrame(chart_data)
# Create the bar chart
chart = alt.Chart(df_chart).mark_bar().encode(
x=alt.X('Model:N',
sort=alt.EncodingSortField(field='Score', order='descending'),
axis=alt.Axis(labelAngle=-45)),
y=alt.Y('Score:Q',
scale=alt.Scale(domain=[0, 100]),
axis=alt.Axis(title='Score (%)')),
color=alt.Color('Type:N',
scale=alt.Scale(domain=['Evaluated', 'Baseline'],
range=['#4ECDC4', '#FFA726'])),
tooltip=['Model', 'Score', 'Type']
).properties(
title=title,
width=500, # Increased from 400
height=400 # Increased from 300
)
# Add value labels
text = chart.mark_text(
align='center',
baseline='bottom',
dy=-5
).encode(
text=alt.Text('Score:Q', format='.1f')
)
return chart + text
def main():
st.title("🎯 Grounding Benchmark Leaderboard")
st.markdown("Visualization of model performance on grounding benchmarks")
# Fetch data
with st.spinner("Loading leaderboard data..."):
df = fetch_leaderboard_data()
if df.empty:
st.warning("No data available in the leaderboard.")
return
# Sidebar filters
st.sidebar.header("Filters")
# Dataset filter
datasets = sorted(df['dataset'].unique())
selected_dataset = st.sidebar.selectbox("Select Dataset", datasets)
st.session_state['selected_dataset'] = selected_dataset
# Filter data
filtered_df = df[df['dataset'] == selected_dataset]
# Model filter (optional)
models = ['All'] + sorted(filtered_df['model'].unique())
selected_model = st.sidebar.selectbox("Select Model", models)
if selected_model != 'All':
filtered_df = filtered_df[filtered_df['model'] == selected_model]
# Main content
st.header(f"Results for {selected_dataset}")
# Overall metrics
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Models Evaluated", len(filtered_df))
with col2:
if not filtered_df.empty:
best_acc = filtered_df['overall_accuracy'].max()
best_model = filtered_df[filtered_df['overall_accuracy'] == best_acc]['model'].iloc[0]
st.metric("Best Overall Accuracy", f"{best_acc:.1f}%", help=f"Model: {best_model}")
with col3:
total_samples = filtered_df['total_samples'].sum()
st.metric("Total Samples Evaluated", f"{total_samples:,}")
# Parse UI type metrics
ui_metrics_df = parse_ui_type_metrics(filtered_df, selected_dataset)
# Add metric selector for screenspot datasets
selected_metric = 'overall' # Default metric
if not ui_metrics_df.empty and 'screenspot' in selected_dataset.lower():
# Metric selector dropdown
if selected_dataset == 'screenspot-v2':
metric_options = {
'overall': 'Overall Average (Desktop + Web) / 2',
'desktop_avg': 'Desktop Average',
'web_avg': 'Web Average',
'desktop_text': 'Desktop (Text)',
'desktop_icon': 'Desktop (Icon)',
'web_text': 'Web (Text)',
'web_icon': 'Web (Icon)',
'text_avg': 'Text Average',
'icon_avg': 'Icon Average'
}
elif selected_dataset in ['screenspot-pro', 'showdown-clicks']:
# For screenspot-pro and showdown-clicks, only show overall average
metric_options = {
'overall': 'Overall Average'
}
else:
metric_options = {
'overall': 'Overall Average',
'desktop_avg': 'Desktop Average',
'web_avg': 'Web Average',
'text_avg': 'Text Average',
'icon_avg': 'Icon Average'
}
selected_metric = st.selectbox(
"Select metric to visualize:",
options=list(metric_options.keys()),
format_func=lambda x: metric_options[x],
key="metric_selector"
)
# Add note about asterisks
if any(ui_metrics_df['is_best_not_last']):
st.info("* indicates the best checkpoint is not the last checkpoint")
# Create single chart for selected metric
chart = create_bar_chart(ui_metrics_df, selected_metric, metric_options[selected_metric])
if chart:
st.altair_chart(chart, use_container_width=True)
else:
st.warning(f"No data available for {metric_options[selected_metric]}")
else:
# For non-ScreenSpot datasets, show a simple bar chart
chart_data = filtered_df[['model', 'overall_accuracy']].copy()
chart_data.columns = ['Model', 'Accuracy']
chart = alt.Chart(chart_data).mark_bar().encode(
x=alt.X('Model:N', sort='-y', axis=alt.Axis(labelAngle=-45)),
y=alt.Y('Accuracy:Q', scale=alt.Scale(domain=[0, 100])),
tooltip=['Model', 'Accuracy']
).properties(
width=800,
height=400
)
st.altair_chart(chart, use_container_width=True)
if __name__ == "__main__":
main()