leaderboard-viewer / src /streamlit_app.py
Anas Awadalla
more analysis + baselines
79cb6e1
raw
history blame
28.8 kB
import os
# Set HF_HOME for caching
os.environ["HF_HOME"] = "src/data_cache"
import streamlit as st
import pandas as pd
import altair as alt
from huggingface_hub import HfApi, HfFileSystem
import json
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
# Page config
st.set_page_config(
page_title="Grounding Benchmark Leaderboard",
page_icon="🎯",
layout="wide"
)
# Constants
REPO_ID = "mlfoundations-cua-dev/leaderboard"
GROUNDING_PATH = "grounding"
# Baselines for different datasets
BASELINES = {
"screenspot-v2": {
"Qwen2-VL-7B": {
"desktop_text": 52.01,
"desktop_icon": 44.98,
"web_text": 33.04,
"web_icon": 21.84,
"overall": 37.96
},
"UI-TARS-2B": {
"desktop_text": 90.7,
"desktop_icon": 68.6,
"web_text": 87.2,
"web_icon": 84.7,
"overall": 82.8
},
"UI-TARS-7B": {
"desktop_text": 95.4,
"desktop_icon": 87.8,
"web_text": 93.8,
"web_icon": 91.6,
"overall": 92.2
},
"UI-TARS-72B": {
"desktop_text": 91.2,
"desktop_icon": 87.8,
"web_text": 87.7,
"web_icon": 86.3,
"overall": 88.3
}
},
"screenspot-pro": {
"Qwen2.5-VL-3B-Instruct": {
"overall": 16.1
},
"Qwen2.5-VL-7B-Instruct": {
"overall": 26.8
},
"Qwen2.5-VL-72B-Instruct": {
"overall": 53.3
},
"UI-TARS-2B": {
"overall": 27.7
},
"UI-TARS-7B": {
"overall": 35.7
},
"UI-TARS-72B": {
"overall": 38.1
}
}
}
@st.cache_data(ttl=300) # Cache for 5 minutes
def fetch_leaderboard_data():
"""Fetch all grounding results from HuggingFace leaderboard by streaming JSON files."""
api = HfApi()
fs = HfFileSystem()
try:
# List all files in the grounding directory
files = api.list_repo_files(repo_id=REPO_ID, repo_type="dataset")
grounding_files = [f for f in files if f.startswith(f"{GROUNDING_PATH}/") and f.endswith(".json")]
results = []
# Create progress bar for loading
progress_bar = st.progress(0)
status_text = st.empty()
for idx, file_path in enumerate(grounding_files):
try:
# Update progress
progress = (idx + 1) / len(grounding_files)
progress_bar.progress(progress)
status_text.text(f"Loading {idx + 1}/{len(grounding_files)} files...")
# Stream the JSON file content directly from HuggingFace
file_url = f"datasets/{REPO_ID}/{file_path}"
# Read the file content directly without downloading
with fs.open(file_url, 'r') as f:
data = json.load(f)
# Extract only the necessary information
metadata = data.get("metadata", {})
metrics = data.get("metrics", {})
detailed_results = data.get("detailed_results", {})
# Parse the file path to get dataset and model info
path_parts = file_path.split('/')
dataset_name = path_parts[1] if len(path_parts) > 1 else "unknown"
# Get model name from metadata or path
model_checkpoint = metadata.get("model_checkpoint", "")
model_name = model_checkpoint.split('/')[-1]
base_model_name = None
is_checkpoint = False
# Handle checkpoint names
if not model_name and len(path_parts) > 2:
# Check if it's a checkpoint subdirectory structure
if len(path_parts) > 3 and path_parts[2] != path_parts[3]:
# Format: grounding/dataset/base_model/checkpoint.json
base_model_name = path_parts[2]
checkpoint_file = path_parts[3].replace(".json", "")
model_name = f"{base_model_name}/{checkpoint_file}"
is_checkpoint = True
else:
# Regular format: grounding/dataset/results_modelname.json
model_name = path_parts[2].replace("results_", "").replace(".json", "")
base_model_name = model_name
# Check if model name indicates a checkpoint
if 'checkpoint-' in model_name:
is_checkpoint = True
if not base_model_name:
# Extract base model name from full path
if '/' in model_name:
parts = model_name.split('/')
base_model_name = parts[0]
else:
# Try to get from model_checkpoint path
checkpoint_parts = model_checkpoint.split('/')
if len(checkpoint_parts) > 1:
base_model_name = checkpoint_parts[-2]
# Extract UI type results if available
ui_type_results = detailed_results.get("by_ui_type", {})
dataset_type_results = detailed_results.get("by_dataset_type", {})
# Create a compact result entry (only keep what we need for visualization)
result_entry = {
"dataset": dataset_name,
"model": model_name,
"base_model": base_model_name or model_name,
"is_checkpoint": is_checkpoint,
"model_path": model_checkpoint,
"overall_accuracy": metrics.get("accuracy", 0) * 100, # Convert to percentage
"total_samples": metrics.get("total", 0),
"timestamp": metadata.get("evaluation_timestamp", ""),
"checkpoint_steps": metadata.get("checkpoint_steps"),
"training_loss": metadata.get("training_loss"),
"ui_type_results": ui_type_results,
"dataset_type_results": dataset_type_results,
# Store minimal sample results for inspection
"sample_results_summary": {
"total_samples": len(data.get("sample_results", [])),
"first_5_samples": data.get("sample_results", [])[:5]
}
}
results.append(result_entry)
except Exception as e:
st.warning(f"Error loading {file_path}: {str(e)}")
continue
# Clear progress indicators
progress_bar.empty()
status_text.empty()
# Create DataFrame
df = pd.DataFrame(results)
# Process checkpoints: for each base model, find the best checkpoint
if not df.empty:
# Group by dataset and base_model
grouped = df.groupby(['dataset', 'base_model'])
# For each group, find the best checkpoint
best_models = []
for (dataset, base_model), group in grouped:
if len(group) > 1:
# Multiple entries for this model (likely checkpoints)
best_idx = group['overall_accuracy'].idxmax()
best_row = group.loc[best_idx].copy()
# Check if the best is the last checkpoint
checkpoint_steps = group[group['checkpoint_steps'].notna()]['checkpoint_steps'].sort_values()
if len(checkpoint_steps) > 0:
last_checkpoint_steps = checkpoint_steps.iloc[-1]
best_checkpoint_steps = best_row['checkpoint_steps']
if pd.notna(best_checkpoint_steps) and best_checkpoint_steps != last_checkpoint_steps:
# Best checkpoint is not the last one, add asterisk
best_row['model'] = best_row['model'] + '*'
best_row['is_best_not_last'] = True
else:
best_row['is_best_not_last'] = False
# Store all checkpoints for this model
best_row['all_checkpoints'] = group.to_dict('records')
best_models.append(best_row)
else:
# Single entry for this model
row = group.iloc[0].copy()
row['is_best_not_last'] = False
row['all_checkpoints'] = [row.to_dict()]
best_models.append(row)
# Create new dataframe with best models
df_best = pd.DataFrame(best_models)
return df_best
return df
except Exception as e:
st.error(f"Error fetching leaderboard data: {str(e)}")
return pd.DataFrame()
def parse_ui_type_metrics(df: pd.DataFrame, dataset_filter: str) -> pd.DataFrame:
"""Parse UI type metrics from the results dataframe."""
metrics_list = []
for _, row in df.iterrows():
if row['dataset'] != dataset_filter:
continue
model = row['model']
ui_results = row['ui_type_results']
# For ScreenSpot datasets, we have desktop/web and text/icon
if 'screenspot' in dataset_filter.lower():
# Calculate individual metrics
desktop_text = ui_results.get('desktop_text', {}).get('correct', 0) / max(ui_results.get('desktop_text', {}).get('total', 1), 1) * 100
desktop_icon = ui_results.get('desktop_icon', {}).get('correct', 0) / max(ui_results.get('desktop_icon', {}).get('total', 1), 1) * 100
web_text = ui_results.get('web_text', {}).get('correct', 0) / max(ui_results.get('web_text', {}).get('total', 1), 1) * 100
web_icon = ui_results.get('web_icon', {}).get('correct', 0) / max(ui_results.get('web_icon', {}).get('total', 1), 1) * 100
# Calculate averages
desktop_avg = (desktop_text + desktop_icon) / 2 if (desktop_text > 0 or desktop_icon > 0) else 0
web_avg = (web_text + web_icon) / 2 if (web_text > 0 or web_icon > 0) else 0
text_avg = (desktop_text + web_text) / 2 if (desktop_text > 0 or web_text > 0) else 0
icon_avg = (desktop_icon + web_icon) / 2 if (desktop_icon > 0 or web_icon > 0) else 0
# For screenspot-v2, calculate the overall as average of desktop and web
if dataset_filter == 'screenspot-v2':
overall = (desktop_avg + web_avg) / 2 if (desktop_avg > 0 or web_avg > 0) else 0
else:
overall = row['overall_accuracy']
metrics_list.append({
'model': model,
'desktop_text': desktop_text,
'desktop_icon': desktop_icon,
'web_text': web_text,
'web_icon': web_icon,
'desktop_avg': desktop_avg,
'web_avg': web_avg,
'text_avg': text_avg,
'icon_avg': icon_avg,
'overall': overall,
'is_best_not_last': row.get('is_best_not_last', False),
'all_checkpoints': row.get('all_checkpoints', [])
})
return pd.DataFrame(metrics_list)
def create_bar_chart(data: pd.DataFrame, metric: str, title: str):
"""Create a bar chart for a specific metric."""
# Prepare data for the chart
chart_data = []
# Add model results
for _, row in data.iterrows():
if metric in row and row[metric] > 0:
chart_data.append({
'Model': row['model'],
'Score': row[metric],
'Type': 'Evaluated'
})
# Add baselines if available
dataset = st.session_state.get('selected_dataset', '')
if dataset in BASELINES:
for baseline_name, baseline_metrics in BASELINES[dataset].items():
metric_key = metric.replace('_avg', '').replace('avg', 'overall')
if metric_key in baseline_metrics:
chart_data.append({
'Model': baseline_name,
'Score': baseline_metrics[metric_key],
'Type': 'Baseline'
})
if not chart_data:
return None
df_chart = pd.DataFrame(chart_data)
# Create the bar chart
chart = alt.Chart(df_chart).mark_bar().encode(
x=alt.X('Model:N',
sort=alt.EncodingSortField(field='Score', order='descending'),
axis=alt.Axis(labelAngle=-45)),
y=alt.Y('Score:Q',
scale=alt.Scale(domain=[0, 100]),
axis=alt.Axis(title='Score (%)')),
color=alt.Color('Type:N',
scale=alt.Scale(domain=['Evaluated', 'Baseline'],
range=['#4ECDC4', '#FFA726'])),
tooltip=['Model', 'Score', 'Type']
).properties(
title=title,
width=400,
height=300
)
# Add value labels
text = chart.mark_text(
align='center',
baseline='bottom',
dy=-5
).encode(
text=alt.Text('Score:Q', format='.1f')
)
return chart + text
def main():
st.title("🎯 Grounding Benchmark Leaderboard")
st.markdown("Visualization of model performance on grounding benchmarks")
# Fetch data
with st.spinner("Loading leaderboard data..."):
df = fetch_leaderboard_data()
if df.empty:
st.warning("No data available in the leaderboard.")
return
# Sidebar filters
st.sidebar.header("Filters")
# Dataset filter
datasets = sorted(df['dataset'].unique())
selected_dataset = st.sidebar.selectbox("Select Dataset", datasets)
st.session_state['selected_dataset'] = selected_dataset
# Filter data
filtered_df = df[df['dataset'] == selected_dataset]
# Model filter (optional)
models = ['All'] + sorted(filtered_df['model'].unique())
selected_model = st.sidebar.selectbox("Select Model", models)
if selected_model != 'All':
filtered_df = filtered_df[filtered_df['model'] == selected_model]
# Main content
st.header(f"Results for {selected_dataset}")
# Overall metrics
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Models Evaluated", len(filtered_df))
with col2:
if not filtered_df.empty:
best_acc = filtered_df['overall_accuracy'].max()
best_model = filtered_df[filtered_df['overall_accuracy'] == best_acc]['model'].iloc[0]
st.metric("Best Overall Accuracy", f"{best_acc:.1f}%", help=f"Model: {best_model}")
with col3:
total_samples = filtered_df['total_samples'].sum()
st.metric("Total Samples Evaluated", f"{total_samples:,}")
# Parse UI type metrics
ui_metrics_df = parse_ui_type_metrics(filtered_df, selected_dataset)
if not ui_metrics_df.empty and 'screenspot' in selected_dataset.lower():
st.subheader("Performance by UI Type")
# Add note about asterisks
if any(ui_metrics_df['is_best_not_last']):
st.info("* indicates the best checkpoint is not the last checkpoint")
# Create charts in a grid
if selected_dataset == 'screenspot-v2':
# First row: Overall, Desktop, Web averages
col1, col2, col3 = st.columns(3)
with col1:
chart = create_bar_chart(ui_metrics_df, 'overall', 'Overall Average (Desktop + Web) / 2')
if chart:
st.altair_chart(chart, use_container_width=True)
with col2:
chart = create_bar_chart(ui_metrics_df, 'desktop_avg', 'Desktop Average')
if chart:
st.altair_chart(chart, use_container_width=True)
with col3:
chart = create_bar_chart(ui_metrics_df, 'web_avg', 'Web Average')
if chart:
st.altair_chart(chart, use_container_width=True)
# Second row: Individual UI type metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
chart = create_bar_chart(ui_metrics_df, 'desktop_text', 'Desktop (Text)')
if chart:
st.altair_chart(chart, use_container_width=True)
with col2:
chart = create_bar_chart(ui_metrics_df, 'desktop_icon', 'Desktop (Icon)')
if chart:
st.altair_chart(chart, use_container_width=True)
with col3:
chart = create_bar_chart(ui_metrics_df, 'web_text', 'Web (Text)')
if chart:
st.altair_chart(chart, use_container_width=True)
with col4:
chart = create_bar_chart(ui_metrics_df, 'web_icon', 'Web (Icon)')
if chart:
st.altair_chart(chart, use_container_width=True)
# Third row: Text vs Icon averages
col1, col2 = st.columns(2)
with col1:
chart = create_bar_chart(ui_metrics_df, 'text_avg', 'Text Average (Desktop + Web)')
if chart:
st.altair_chart(chart, use_container_width=True)
with col2:
chart = create_bar_chart(ui_metrics_df, 'icon_avg', 'Icon Average (Desktop + Web)')
if chart:
st.altair_chart(chart, use_container_width=True)
else:
# For other screenspot datasets, show the standard layout
col1, col2 = st.columns(2)
with col1:
# Overall Average
chart = create_bar_chart(ui_metrics_df, 'overall', 'Overall Average')
if chart:
st.altair_chart(chart, use_container_width=True)
# Desktop Average
chart = create_bar_chart(ui_metrics_df, 'desktop_avg', 'Desktop Average')
if chart:
st.altair_chart(chart, use_container_width=True)
# Text Average
chart = create_bar_chart(ui_metrics_df, 'text_avg', 'Text Average (UI-Type)')
if chart:
st.altair_chart(chart, use_container_width=True)
with col2:
# Web Average
chart = create_bar_chart(ui_metrics_df, 'web_avg', 'Web Average')
if chart:
st.altair_chart(chart, use_container_width=True)
# Icon Average
chart = create_bar_chart(ui_metrics_df, 'icon_avg', 'Icon Average (UI-Type)')
if chart:
st.altair_chart(chart, use_container_width=True)
# Checkpoint progression visualization
with st.expander("Checkpoint Progression Analysis"):
# Select a model with checkpoints
models_with_checkpoints = ui_metrics_df[ui_metrics_df['all_checkpoints'].apply(lambda x: len(x) > 1)]
if not models_with_checkpoints.empty:
selected_checkpoint_model = st.selectbox(
"Select a model to view checkpoint progression:",
models_with_checkpoints['model'].str.replace('*', '').unique()
)
# Get checkpoint data for selected model
model_row = models_with_checkpoints[models_with_checkpoints['model'].str.replace('*', '') == selected_checkpoint_model].iloc[0]
checkpoint_data = model_row['all_checkpoints']
# Create DataFrame from checkpoint data
checkpoint_df = pd.DataFrame(checkpoint_data)
# Prepare data for visualization
checkpoint_metrics = []
for _, cp in checkpoint_df.iterrows():
ui_results = cp['ui_type_results']
# Calculate metrics
desktop_text = ui_results.get('desktop_text', {}).get('correct', 0) / max(ui_results.get('desktop_text', {}).get('total', 1), 1) * 100
desktop_icon = ui_results.get('desktop_icon', {}).get('correct', 0) / max(ui_results.get('desktop_icon', {}).get('total', 1), 1) * 100
web_text = ui_results.get('web_text', {}).get('correct', 0) / max(ui_results.get('web_text', {}).get('total', 1), 1) * 100
web_icon = ui_results.get('web_icon', {}).get('correct', 0) / max(ui_results.get('web_icon', {}).get('total', 1), 1) * 100
desktop_avg = (desktop_text + desktop_icon) / 2
web_avg = (web_text + web_icon) / 2
overall = (desktop_avg + web_avg) / 2 if selected_dataset == 'screenspot-v2' else cp['overall_accuracy']
checkpoint_metrics.append({
'steps': cp['checkpoint_steps'] or 0,
'overall': overall,
'desktop': desktop_avg,
'web': web_avg,
'loss': cp['training_loss'],
'neg_log_loss': -np.log(cp['training_loss']) if cp['training_loss'] and cp['training_loss'] > 0 else None
})
metrics_df = pd.DataFrame(checkpoint_metrics).sort_values('steps')
# Plot metrics over training steps
col1, col2 = st.columns(2)
with col1:
st.write("**Accuracy over Training Steps**")
# Melt data for multi-line chart
melted = metrics_df[['steps', 'overall', 'desktop', 'web']].melt(
id_vars=['steps'],
var_name='Metric',
value_name='Accuracy'
)
chart = alt.Chart(melted).mark_line(point=True).encode(
x=alt.X('steps:Q', title='Training Steps'),
y=alt.Y('Accuracy:Q', scale=alt.Scale(domain=[0, 100]), title='Accuracy (%)'),
color=alt.Color('Metric:N', scale=alt.Scale(
domain=['overall', 'desktop', 'web'],
range=['#4ECDC4', '#45B7D1', '#96CEB4']
)),
tooltip=['steps', 'Metric', 'Accuracy']
).properties(
width=400,
height=300,
title='Accuracy Progression During Training'
)
st.altair_chart(chart, use_container_width=True)
with col2:
st.write("**Accuracy vs. Training Loss**")
if metrics_df['neg_log_loss'].notna().any():
scatter_data = metrics_df[metrics_df['neg_log_loss'].notna()]
chart = alt.Chart(scatter_data).mark_circle(size=100).encode(
x=alt.X('neg_log_loss:Q', title='-log(Training Loss)'),
y=alt.Y('overall:Q', scale=alt.Scale(domain=[0, 100]), title='Overall Accuracy (%)'),
color=alt.Color('steps:Q', scale=alt.Scale(scheme='viridis'), title='Training Steps'),
tooltip=['steps', 'loss', 'overall']
).properties(
width=400,
height=300,
title='Accuracy vs. -log(Training Loss)'
)
st.altair_chart(chart, use_container_width=True)
else:
st.info("No training loss data available for this model")
# Show checkpoint details table
st.write("**Checkpoint Details**")
display_metrics = metrics_df[['steps', 'overall', 'desktop', 'web', 'loss']].copy()
display_metrics.columns = ['Steps', 'Overall %', 'Desktop %', 'Web %', 'Training Loss']
display_metrics[['Overall %', 'Desktop %', 'Web %']] = display_metrics[['Overall %', 'Desktop %', 'Web %']].round(2)
display_metrics['Training Loss'] = display_metrics['Training Loss'].apply(lambda x: f"{x:.4f}" if pd.notna(x) else "N/A")
st.dataframe(display_metrics, use_container_width=True)
else:
st.info("No models with multiple checkpoints available for progression analysis")
# Detailed breakdown
with st.expander("Detailed UI Type Breakdown"):
# Create a heatmap-style table
detailed_metrics = []
for _, row in ui_metrics_df.iterrows():
detailed_metrics.append({
'Model': row['model'],
'Desktop Text': f"{row['desktop_text']:.1f}%",
'Desktop Icon': f"{row['desktop_icon']:.1f}%",
'Web Text': f"{row['web_text']:.1f}%",
'Web Icon': f"{row['web_icon']:.1f}%",
'Overall': f"{row['overall']:.1f}%"
})
if detailed_metrics:
st.dataframe(pd.DataFrame(detailed_metrics), use_container_width=True)
else:
# For non-ScreenSpot datasets, show a simple bar chart
st.subheader("Model Performance")
chart_data = filtered_df[['model', 'overall_accuracy']].copy()
chart_data.columns = ['Model', 'Accuracy']
chart = alt.Chart(chart_data).mark_bar().encode(
x=alt.X('Model:N', sort='-y', axis=alt.Axis(labelAngle=-45)),
y=alt.Y('Accuracy:Q', scale=alt.Scale(domain=[0, 100])),
tooltip=['Model', 'Accuracy']
).properties(
width=800,
height=400
)
st.altair_chart(chart, use_container_width=True)
# Model details table
with st.expander("Model Details"):
display_df = filtered_df[['model', 'overall_accuracy', 'total_samples', 'checkpoint_steps', 'training_loss', 'timestamp']].copy()
display_df.columns = ['Model', 'Accuracy (%)', 'Samples', 'Checkpoint Steps', 'Training Loss', 'Timestamp']
display_df['Accuracy (%)'] = display_df['Accuracy (%)'].apply(lambda x: f"{x:.2f}")
display_df['Training Loss'] = display_df['Training Loss'].apply(lambda x: f"{x:.4f}" if pd.notna(x) else "N/A")
st.dataframe(display_df, use_container_width=True)
# Raw data viewer
with st.expander("Sample Results"):
if selected_model != 'All' and len(filtered_df) == 1:
summary = filtered_df.iloc[0]['sample_results_summary']
st.write(f"**Total evaluation samples:** {summary['total_samples']}")
st.write("**First 5 sample results:**")
for i, sample in enumerate(summary['first_5_samples'], 1):
st.write(f"\n**Sample {i}:**")
col1, col2 = st.columns([1, 2])
with col1:
st.write(f"- **Correct:** {'βœ…' if sample.get('is_correct') else '❌'}")
st.write(f"- **Image:** {sample.get('img_filename', 'N/A')}")
with col2:
st.write(f"- **Instruction:** {sample.get('instruction', 'N/A')}")
if sample.get('predicted_click'):
st.write(f"- **Predicted Click:** {sample['predicted_click']}")
if sample.get('error_msg'):
st.write(f"- **Error:** {sample['error_msg']}")
else:
st.info("Select a specific model to view sample results")
if __name__ == "__main__":
main()