leaderboard / app.py
jasonshaoshun
fix: resolve inconsistent variable naming in causal graph (IOI and ravel)
ad60993
raw
history blame
36.7 kB
import json
import gzip
import os
import shutil
import secrets
import gradio as gr
from gradio_leaderboard import Leaderboard, ColumnFilter, SelectColumns
import pandas as pd
import numpy as np
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import snapshot_download
from io import StringIO
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from copy import deepcopy
from src.about import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT_SUBGRAPH, EVALUATION_QUEUE_TEXT_CAUSALVARIABLE,
INTRODUCTION_TEXT,
LLM_BENCHMARKS_TEXT,
TITLE,
)
from src.display.css_html_js import custom_css
from src.display.utils import (
BENCHMARK_COLS_MIB_SUBGRAPH,
COLS,
COLS_MIB_SUBGRAPH,
COLS_MULTIMODAL,
EVAL_COLS,
EVAL_TYPES,
AutoEvalColumn,
AutoEvalColumn_mib_subgraph,
AutoEvalColumn_mib_causalgraph,
fields,
)
from src.envs import API, EVAL_REQUESTS_SUBGRAPH, EVAL_REQUESTS_CAUSALGRAPH, QUEUE_REPO_SUBGRAPH, QUEUE_REPO_CAUSALGRAPH, REPO_ID, TOKEN, RESULTS_REPO_MIB_SUBGRAPH, EVAL_RESULTS_MIB_SUBGRAPH_PATH, RESULTS_REPO_MIB_CAUSALGRAPH, EVAL_RESULTS_MIB_CAUSALGRAPH_PATH
from src.populate import get_evaluation_queue_df, get_leaderboard_df_mib_subgraph, get_leaderboard_df_mib_causalgraph
from src.submission.submit import upload_to_queue, remove_submission
from src.submission.check_validity import verify_circuit_submission, verify_causal_variable_submission, check_rate_limit, parse_huggingface_url
from src.about import TasksMib_Subgraph, TasksMib_Causalgraph
from gradio_leaderboard import SelectColumns, Leaderboard
import pandas as pd
from typing import List, Dict, Optional
from dataclasses import fields
import math
class SmartSelectColumns(SelectColumns):
"""
Enhanced SelectColumns component matching exact original parameters.
"""
def __init__(
self,
benchmark_keywords: Optional[List[str]] = None,
model_keywords: Optional[List[str]] = None,
initial_selected: Optional[List[str]] = None,
label: Optional[str] = None,
show_label: bool = True,
info: Optional[str] = None,
allow: bool = True
):
# Match exact parameters from working SelectColumns
super().__init__(
default_selection=initial_selected or [],
cant_deselect=[],
allow=allow,
label=label,
show_label=show_label,
info=info
)
self.benchmark_keywords = benchmark_keywords or []
self.model_keywords = model_keywords or []
# Store groups for later use
self._groups = {}
def get_filtered_groups(self, columns: List[str]) -> Dict[str, List[str]]:
"""Get column groups based on keywords."""
filtered_groups = {}
# Add benchmark groups
for benchmark in self.benchmark_keywords:
matching_cols = [
col for col in columns
if benchmark in col.lower()
]
if matching_cols:
filtered_groups[f"Benchmark group for {benchmark}"] = matching_cols
# Add model groups
for model in self.model_keywords:
matching_cols = [
col for col in columns
if model in col.lower()
]
if matching_cols:
filtered_groups[f"Model group for {model}"] = matching_cols
self._groups = filtered_groups
return filtered_groups
import re
@dataclass
class SubstringSelectColumns(SelectColumns):
"""
Extends SelectColumns to support filtering columns by predefined substrings.
When a substring is selected, all columns containing that substring will be selected.
"""
substring_groups: Dict[str, List[str]] = field(default_factory=dict)
selected_substrings: List[str] = field(default_factory=list)
def __post_init__(self):
# Ensure default_selection is a list
if self.default_selection is None:
self.default_selection = []
# Build reverse mapping of column to substrings
self.column_to_substrings = {}
for substring, patterns in self.substring_groups.items():
for pattern in patterns:
# Convert glob-style patterns to regex
regex = re.compile(pattern.replace('*', '.*'))
# Find matching columns in default_selection
for col in self.default_selection:
if regex.search(col):
if col not in self.column_to_substrings:
self.column_to_substrings[col] = []
self.column_to_substrings[col].append(substring)
# Apply initial substring selections
if self.selected_substrings:
self.update_selection_from_substrings()
def update_selection_from_substrings(self) -> List[str]:
"""
Updates the column selection based on selected substrings.
Returns the new list of selected columns.
"""
selected_columns = self.cant_deselect.copy()
# If no substrings selected, show all columns
if not self.selected_substrings:
selected_columns.extend([
col for col in self.default_selection
if col not in self.cant_deselect
])
return selected_columns
# Add columns that match any selected substring
for col, substrings in self.column_to_substrings.items():
if any(s in self.selected_substrings for s in substrings):
if col not in selected_columns:
selected_columns.append(col)
return selected_columns
def restart_space():
API.restart_space(repo_id=REPO_ID)
### Space initialisation - refresh caches
try:
if os.path.exists(EVAL_REQUESTS_SUBGRAPH):
shutil.rmtree(EVAL_REQUESTS_SUBGRAPH)
snapshot_download(
repo_id=QUEUE_REPO_SUBGRAPH, local_dir=EVAL_REQUESTS_SUBGRAPH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
try:
if os.path.exists(EVAL_REQUESTS_CAUSALGRAPH):
shutil.rmtree(EVAL_REQUESTS_CAUSALGRAPH)
snapshot_download(
repo_id=QUEUE_REPO_CAUSALGRAPH, local_dir=EVAL_REQUESTS_CAUSALGRAPH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
try:
if os.path.exists(EVAL_RESULTS_MIB_SUBGRAPH_PATH):
shutil.rmtree(EVAL_RESULTS_MIB_SUBGRAPH_PATH)
snapshot_download(
repo_id=RESULTS_REPO_MIB_SUBGRAPH, local_dir=EVAL_RESULTS_MIB_SUBGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
try:
if os.path.exists(EVAL_RESULTS_MIB_CAUSALGRAPH_PATH):
shutil.rmtree(EVAL_RESULTS_MIB_CAUSALGRAPH_PATH)
snapshot_download(
repo_id=RESULTS_REPO_MIB_CAUSALGRAPH, local_dir=EVAL_RESULTS_MIB_CAUSALGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
def _sigmoid(x):
try:
return 1 / (1 + math.exp(-2 * (x-1)))
except:
return "-"
LEADERBOARD_DF_MIB_SUBGRAPH_FPL = get_leaderboard_df_mib_subgraph(EVAL_RESULTS_MIB_SUBGRAPH_PATH, COLS_MIB_SUBGRAPH, BENCHMARK_COLS_MIB_SUBGRAPH)
LEADERBOARD_DF_MIB_SUBGRAPH_FEQ = get_leaderboard_df_mib_subgraph(EVAL_RESULTS_MIB_SUBGRAPH_PATH, COLS_MIB_SUBGRAPH, BENCHMARK_COLS_MIB_SUBGRAPH,
metric_type="CMD")
# In app.py, modify the LEADERBOARD initialization
LEADERBOARD_DF_MIB_CAUSALGRAPH_AGGREGATED, LEADERBOARD_DF_MIB_CAUSALGRAPH_AVERAGED = get_leaderboard_df_mib_causalgraph(
EVAL_RESULTS_MIB_CAUSALGRAPH_PATH
)
(
finished_eval_queue_df_subgraph,
pending_eval_queue_df_subgraph,
) = get_evaluation_queue_df(EVAL_REQUESTS_SUBGRAPH, EVAL_COLS, "Circuit")
(
finished_eval_queue_df_causalvariable,
pending_eval_queue_df_causalvariable,
) = get_evaluation_queue_df(EVAL_REQUESTS_CAUSALGRAPH, EVAL_COLS, "Causal Variable")
finished_eval_queue = pd.concat((finished_eval_queue_df_subgraph, finished_eval_queue_df_causalvariable))
pending_eval_queue = pd.concat((pending_eval_queue_df_subgraph, pending_eval_queue_df_causalvariable))
def init_leaderboard_mib_subgraph(dataframe, track):
"""Initialize the subgraph leaderboard with display names for better readability."""
if dataframe is None or dataframe.empty:
raise ValueError("Leaderboard DataFrame is empty or None.")
print("\nDebugging DataFrame columns:", dataframe.columns.tolist())
model_name_mapping = {
"qwen2_5": "Qwen-2.5",
"gpt2": "GPT-2",
"gemma2": "Gemma-2",
"llama3": "Llama-3.1"
}
benchmark_mapping = {
"ioi": "IOI",
"mcqa": "MCQA",
"arithmetic_addition": "Arithmetic (+)",
"arithmetic_subtraction": "Arithmetic (-)",
"arc_easy": "ARC (Easy)",
"arc_challenge": "ARC (Challenge)"
}
display_mapping = {}
for task in TasksMib_Subgraph:
for model in task.value.models:
field_name = f"{task.value.benchmark}_{model}"
display_name = f"{benchmark_mapping[task.value.benchmark]} - {model_name_mapping[model]}"
display_mapping[field_name] = display_name
# Now when creating benchmark groups, we'll use display names
benchmark_groups = []
for task in TasksMib_Subgraph:
benchmark = task.value.benchmark
benchmark_cols = [
display_mapping[f"{benchmark}_{model}"] # Use display name from our mapping
for model in task.value.models
if f"{benchmark}_{model}" in dataframe.columns
]
if benchmark_cols:
benchmark_groups.append(benchmark_cols)
print(f"\nBenchmark group for {benchmark}:", benchmark_cols)
# Similarly for model groups
model_groups = []
all_models = list(set(model for task in TasksMib_Subgraph for model in task.value.models))
for model in all_models:
model_cols = [
display_mapping[f"{task.value.benchmark}_{model}"] # Use display name
for task in TasksMib_Subgraph
if model in task.value.models
and f"{task.value.benchmark}_{model}" in dataframe.columns
]
if model_cols:
model_groups.append(model_cols)
print(f"\nModel group for {model}:", model_cols)
# Combine all groups using display names
all_groups = benchmark_groups + model_groups
all_columns = [col for group in all_groups for col in group]
renamed_df = dataframe.rename(columns=display_mapping)
all_columns = renamed_df.columns.tolist()
# Original code
return Leaderboard(
value=renamed_df, # Use DataFrame with display names
datatype=[c.type for c in fields(AutoEvalColumn_mib_subgraph)],
search_columns=["Method"],
hide_columns=["eval_name"],
interactive=False,
), renamed_df
def init_leaderboard_mib_causalgraph(dataframe, track):
model_name_mapping = {
"Qwen2ForCausalLM": "Qwen-2.5",
"GPT2ForCausalLM": "GPT-2",
"GPT2LMHeadModel": "GPT-2",
"Gemma2ForCausalLM": "Gemma-2",
"LlamaForCausalLM": "Llama-3.1"
}
benchmark_mapping = {
"ioi_task": "IOI",
"4_answer_MCQA": "MCQA",
"arithmetic_addition": "Arithmetic (+)",
"arithmetic_subtraction": "Arithmetic (-)",
"ARC_easy": "ARC (Easy)",
"RAVEL": "RAVEL"
}
target_variables_mapping = {
"output_token": "Output Token",
"output_position": "Output Position",
"answer_pointer": "Answer Pointer",
"answer": "Answer",
"Continent": "Continent",
"Language": "Language",
"Country": "Country",
"Language": "Language"
}
display_mapping = {}
for task in TasksMib_Causalgraph:
for model in task.value.models:
for target_variables in task.value.target_variables:
field_name = f"{model}_{task.value.col_name}_{target_variables}"
display_name = f"{benchmark_mapping[task.value.col_name]} - {model_name_mapping[model]} - {target_variables_mapping[target_variables]}"
display_mapping[field_name] = display_name
renamed_df = dataframe.rename(columns=display_mapping)
# Create only necessary columns
return Leaderboard(
value=renamed_df,
datatype=[c.type for c in fields(AutoEvalColumn_mib_causalgraph)],
search_columns=["Method"],
hide_columns=["eval_name"],
bool_checkboxgroup_label="Hide models",
interactive=False,
), renamed_df
def init_leaderboard(dataframe, track):
if dataframe is None or dataframe.empty:
raise ValueError("Leaderboard DataFrame is empty or None.")
# filter for correct track
dataframe = dataframe.loc[dataframe["Track"] == track]
return Leaderboard(
value=dataframe,
datatype=[c.type for c in fields(AutoEvalColumn)],
select_columns=SelectColumns(
default_selection=[c.name for c in fields(AutoEvalColumn) if c.displayed_by_default],
cant_deselect=[c.name for c in fields(AutoEvalColumn) if c.never_hidden],
label="Select Columns to Display:",
),
search_columns=[AutoEvalColumn.model.name],
hide_columns=[c.name for c in fields(AutoEvalColumn) if c.hidden],
bool_checkboxgroup_label="Hide models",
interactive=False,
)
def process_json(temp_file):
if temp_file is None:
return {}
# Handle file upload
try:
file_path = temp_file.name
if file_path.endswith('.gz'):
with gzip.open(file_path, 'rt') as f:
data = json.load(f)
else:
with open(file_path, 'r') as f:
data = json.load(f)
except Exception as e:
raise gr.Error(f"Error processing file: {str(e)}")
gr.Markdown("Upload successful!")
return data
def get_hf_username(hf_repo):
hf_repo = hf_repo.rstrip("/")
parts = hf_repo.split("/")
username = parts[-2]
return username
# Define the preset substrings for filtering
PRESET_SUBSTRINGS = ["IOI", "MCQA", "Arithmetic", "ARC", "GPT-2", "Qwen-2.5", "Gemma-2", "Llama-3.1"]
TASK_SUBSTRINGS = ["IOI", "MCQA", "Arithmetic", "ARC"]
TASK_CAUSAL_SUBSTRINGS = ["IOI", "MCQA", "ARC (Easy)", "RAVEL"]
MODEL_SUBSTRINGS = ["GPT-2", "Qwen-2.5", "Gemma-2", "Llama-3.1"]
def filter_columns_by_substrings(dataframe: pd.DataFrame, selected_task_substrings: List[str],
selected_model_substrings: List[str]) -> pd.DataFrame:
"""
Filter columns based on the selected substrings.
"""
original_dataframe = deepcopy(dataframe)
if not selected_task_substrings and not selected_model_substrings:
return dataframe # No filtering if no substrings are selected
if not selected_task_substrings:
# Filter columns that contain any of the selected model substrings
filtered_columns = [
col for col in dataframe.columns
if any(sub.lower() in col.lower() for sub in selected_model_substrings)
or col == "Method"
]
return dataframe[filtered_columns]
elif not selected_model_substrings:
# Filter columns that contain any of the selected task substrings
filtered_columns = [
col for col in dataframe.columns
if any(sub.lower() in col.lower() for sub in selected_task_substrings)
or col == "Method"
]
return dataframe[filtered_columns]
# Filter columns by task first. Use AND logic to combine with model filtering
filtered_columns = [
col for col in dataframe.columns
if any(sub.lower() in col.lower() for sub in selected_task_substrings)
or col == "Method"
]
filtered_columns = [
col for col in dataframe[filtered_columns].columns
if any(sub.lower() in col.lower() for sub in selected_model_substrings)
or col == "Method"
]
return dataframe[filtered_columns]
def update_leaderboard(dataframe: pd.DataFrame, selected_task_substrings: List[str],
selected_model_substrings: List[str], ascending: bool = False):
"""
Update the leaderboard based on the selected substrings.
"""
filtered_dataframe = filter_columns_by_substrings(dataframe, selected_task_substrings, selected_model_substrings)
if len(selected_task_substrings) >= 2 or len(selected_task_substrings) == 0:
if len(selected_model_substrings) >= 2 or len(selected_model_substrings) == 0:
show_average = True
else:
show_average = False
else:
show_average = False
def _transform_floats(df):
df_transformed = df.copy()
# Apply transformation row by row
for i, row in df_transformed.iterrows():
# Apply sigmoid only to numeric values in the row
df_transformed.loc[i] = row.apply(lambda x: _sigmoid(x) if isinstance(x, (float, int)) else x)
return df_transformed
if show_average:
# Replace "-" with NaN for calculation, then use skipna=False to get NaN when any value is missing
numeric_data = filtered_dataframe.replace("-", np.nan)
means = numeric_data.mean(axis=1, skipna=False)
# Apply the same transformation for computing scores
s_filtered_dataframe = _transform_floats(filtered_dataframe)
s_numeric_data = s_filtered_dataframe.replace("-", np.nan)
s_means = s_numeric_data.mean(axis=1, skipna=False)
# Set Average and Score columns
# Keep numeric values as NaN for proper sorting, convert to "-" only for display if needed
filtered_dataframe.loc[:, "Average"] = means.round(2)
filtered_dataframe.loc[:, "Score"] = s_means.round(2)
# Sort by Average with NaN values at the end
filtered_dataframe = filtered_dataframe.sort_values(by=["Average"], ascending=ascending, na_position='last')
# After sorting, convert NaN back to "-" for display
filtered_dataframe.loc[:, "Average"] = filtered_dataframe["Average"].fillna("-")
filtered_dataframe.loc[:, "Score"] = filtered_dataframe["Score"].fillna("-")
return filtered_dataframe
def process_url(url):
# Add your URL processing logic here
return f"You entered the URL: {url}"
demo = gr.Blocks(css=custom_css)
with demo:
gr.HTML(TITLE)
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text")
with gr.Tabs(elem_classes="tab-buttons") as tabs:
with gr.TabItem("Circuit Localization", elem_id="subgraph", id=0):
with gr.Tabs() as subgraph_tabs:
with gr.TabItem("CPR", id=0):
# Add description for filters
gr.Markdown("""
### Filtering Options
Use the dropdown menus below to filter results by specific tasks or models.
You can combine filters to see specific task-model combinations.
""")
# CheckboxGroup for selecting substrings
task_substring_checkbox = gr.CheckboxGroup(
choices=TASK_SUBSTRINGS,
label="View tasks:",
value=TASK_SUBSTRINGS, # Default to all substrings selected
)
model_substring_checkbox = gr.CheckboxGroup(
choices = MODEL_SUBSTRINGS,
label = "View models:",
value = MODEL_SUBSTRINGS
)
leaderboard, data = init_leaderboard_mib_subgraph(LEADERBOARD_DF_MIB_SUBGRAPH_FPL, "Subgraph")
original_leaderboard = gr.State(value=data)
ascending = gr.State(value=False)
# Update the leaderboard when the user selects/deselects substrings
task_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard
)
model_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard
)
print(f"Leaderboard is {leaderboard}")
with gr.TabItem("CMD", id=1):
# Add description for filters
gr.Markdown("""
### Filtering Options
Use the dropdown menus below to filter results by specific tasks or models.
You can combine filters to see specific task-model combinations.
""")
# CheckboxGroup for selecting substrings
task_substring_checkbox = gr.CheckboxGroup(
choices=TASK_SUBSTRINGS,
label="View tasks:",
value=TASK_SUBSTRINGS, # Default to all substrings selected
)
model_substring_checkbox = gr.CheckboxGroup(
choices = MODEL_SUBSTRINGS,
label = "View models:",
value = MODEL_SUBSTRINGS
)
leaderboard, data = init_leaderboard_mib_subgraph(LEADERBOARD_DF_MIB_SUBGRAPH_FEQ, "Subgraph")
original_leaderboard = gr.State(value=data)
ascending = gr.State(value=True)
# Update the leaderboard when the user selects/deselects substrings
task_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard
)
model_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard
)
print(f"Leaderboard is {leaderboard}")
# Then modify the Causal Graph tab section
with gr.TabItem("Causal Variable Localization", elem_id="causalgraph", id=1):
with gr.Tabs() as causalgraph_tabs:
with gr.TabItem("Highest View", id=0):
gr.Markdown("""
### Filtering Options
Use the dropdown menus below to filter results by specific tasks or models.
You can combine filters to see specific task-model combinations.
""")
task_substring_checkbox = gr.CheckboxGroup(
choices=TASK_CAUSAL_SUBSTRINGS,
label="View tasks:",
value=TASK_CAUSAL_SUBSTRINGS, # Default to all substrings selected
)
model_substring_checkbox = gr.CheckboxGroup(
choices = MODEL_SUBSTRINGS,
label = "View models:",
value = MODEL_SUBSTRINGS
)
leaderboard_aggregated, data = init_leaderboard_mib_causalgraph(
LEADERBOARD_DF_MIB_CAUSALGRAPH_AGGREGATED,
"Causal Graph"
)
original_leaderboard = gr.State(value=data)
ascending = gr.State(value=False)
task_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard_aggregated
)
model_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard_aggregated
)
with gr.TabItem("Averaged View", id=1):
task_substring_checkbox = gr.CheckboxGroup(
choices=TASK_CAUSAL_SUBSTRINGS,
label="View tasks:",
value=TASK_CAUSAL_SUBSTRINGS, # Default to all substrings selected
)
model_substring_checkbox = gr.CheckboxGroup(
choices = MODEL_SUBSTRINGS,
label = "View models:",
value = MODEL_SUBSTRINGS
)
leaderboard_averaged, data = init_leaderboard_mib_causalgraph(
LEADERBOARD_DF_MIB_CAUSALGRAPH_AVERAGED,
"Causal Graph"
)
original_leaderboard = gr.State(value=data)
ascending = gr.State(value=False)
task_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard_averaged
)
model_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard_averaged
)
with gr.TabItem("Submit", elem_id="llm-benchmark-tab-table", id=2):
# Track selection
track = gr.Radio(
choices=[
"Circuit Localization Track",
"Causal Variable Localization Track"
],
label="Select Competition Track",
elem_id="track_selector"
)
with gr.Column(visible=False, elem_id="bordered-column") as circuit_ui:
with gr.Row():
gr.Markdown(EVALUATION_QUEUE_TEXT_SUBGRAPH, elem_classes="markdown-text")
with gr.Row():
hf_repo_circ = gr.Textbox(
label="HuggingFace Repository URL",
placeholder="https://huggingface.co/username/repo/path",
info="Must be a valid HuggingFace URL pointing to folders containing either 1 importance score file per task/model, or " \
"9 circuit files per task/model (.json or .pt)."
)
level = gr.Radio(
choices=[
"Edge",
"Node (submodule)",
"Node (neuron)"
],
label="Level of granularity",
info="Is your circuit defined by its inclusion/exclusion of certain edges (e.g., MLP1 to H10L12), of certain submodules (e.g., MLP1), or of neurons " \
"within those submodules (e.g., MLP1 neuron 295)?"
)
with gr.Column(visible=False, elem_id="bordered-column") as causal_ui:
gr.Markdown(EVALUATION_QUEUE_TEXT_CAUSALVARIABLE, elem_classes="markdown-text")
with gr.Row():
hf_repo_cg = gr.Textbox(
label="HuggingFace Repository URL",
placeholder="https://huggingface.co/username/repo/path",
info="Must be a valid HuggingFace URL pointing to a file containing the trained featurizer (.pt). " )
# Common fields
with gr.Group():
gr.Markdown("### Submission Information")
method_name = gr.Textbox(label="Method Name")
contact_email = gr.Textbox(label="Contact Email")
# Dynamic UI logic
def toggle_ui(track):
circuit = track == "Circuit Localization Track"
causal = not circuit
return {
circuit_ui: gr.Group(visible=circuit),
causal_ui: gr.Group(visible=causal)
}
track.change(toggle_ui, track, [circuit_ui, causal_ui])
# Submission handling
status = gr.Textbox(label="Submission Status", visible=False)
def handle_submission(track, hf_repo_circ, hf_repo_cg, level, method_name, contact_email):
errors = []
warnings = []
breaking_error = False
hf_repo = hf_repo_circ if "Circuit" in track else hf_repo_cg
# Validate common fields
if not method_name.strip():
errors.append("Method name is required")
if "@" not in contact_email or "." not in contact_email:
errors.append("Valid email address is required")
if "Circuit" in track and not level:
errors.append("Level of granularity is required")
if not hf_repo.startswith("https://huggingface.co/") and not hf_repo.startswith("http://huggingface.co/"):
errors.append(f"Invalid HuggingFace URL - must start with https://huggingface.co/")
breaking_error = True
else:
repo_id, subfolder, revision = parse_huggingface_url(hf_repo)
if repo_id is None:
errors.append("Could not read username or repo name from HF URL")
breaking_error = True
else:
user_name, repo_name = repo_id.split("/")
under_rate_limit, time_left = check_rate_limit(track, user_name, contact_email)
if not under_rate_limit:
errors.append(f"Rate limit exceeded (max 2 submissions per week). Please try again in {time_left}. " \
"(If you're trying again after a failed validation, either remove the previous entry below or try again in about 30 minutes.")
breaking_error = True
# Track-specific validation
if "Circuit" in track and not breaking_error:
submission_errors, submission_warnings = verify_circuit_submission(hf_repo, level)
elif not breaking_error:
submission_errors, submission_warnings = verify_causal_variable_submission(hf_repo)
if not breaking_error:
errors.extend(submission_errors)
warnings.extend(submission_warnings)
_id = secrets.token_urlsafe(12)
if errors:
return [
gr.Textbox("\n".join(f"❌ {e}" for e in errors), visible=True),
None, None,
gr.Column(visible=False),
]
elif warnings:
return [
gr.Textbox("Warnings:", visible=True),
gr.Markdown("\n\n".join(f"β€’ {w}" for w in warnings)),
(track, hf_repo_circ, hf_repo_cg, level, method_name, contact_email, _id),
gr.Column(visible=True)
]
else:
return upload_to_queue(track, hf_repo_circ, hf_repo_cg, level, method_name, contact_email, _id)
# New warning confirmation dialog
warning_modal = gr.Column(visible=False, variant="panel")
with warning_modal:
gr.Markdown("### ⚠️ Submission Warnings")
warning_display = gr.Markdown()
proceed_btn = gr.Button("Proceed Anyway", variant="secondary")
cancel_btn = gr.Button("Cancel Submission", variant="primary")
# Store submission data between callbacks
pending_submission = gr.State()
submit_btn = gr.Button("Submit Entry", variant="primary")
submit_btn.click(
handle_submission,
inputs=[track, hf_repo_circ, hf_repo_cg, level, method_name, contact_email],
outputs=[status, warning_display, pending_submission, warning_modal]
)
proceed_btn.click(
lambda x: upload_to_queue(*x),
inputs=pending_submission,
outputs=[status, warning_display, pending_submission, warning_modal]
)
cancel_btn.click(
lambda: [gr.Textbox("Submission canceled.", visible=True), None, None, gr.Column(visible=False)],
outputs=[status, warning_display, pending_submission, warning_modal]
)
with gr.Column():
with gr.Accordion(
f"βœ… Finished Evaluations ({len(finished_eval_queue)})",
open=False,
):
with gr.Row():
finished_eval_table = gr.components.Dataframe(
value=finished_eval_queue,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Accordion(
f"⏳ Pending Evaluation Queue ({len(pending_eval_queue)})",
open=False,
):
with gr.Row():
pending_eval_table = gr.components.Dataframe(
value=pending_eval_queue,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Group():
gr.Markdown("### Remove Submission from Queue")
with gr.Row():
name_r = gr.Textbox(label="Method Name")
_id_r = gr.Textbox(label = "Submission ID")
status_r = gr.Textbox(label="Removal Status", visible=False)
remove_button = gr.Button("Remove Entry")
remove_button.click(
remove_submission,
inputs=[track, name_r, _id_r],
outputs=[status_r]
)
# Add info about rate limits
gr.Markdown("""
### Submission Policy
- Maximum 2 valid submissions per HuggingFace account per week
- Invalid submissions don't count toward your limit
- Rate limit tracked on a rolling basis: a submission no longer counts toward the limit as soon as 7 days have passed since the submission time
- The queues can take up to an hour to update; don't fret if your submission doesn't show up immediately!
""")
with gr.Row():
with gr.Accordion("πŸ“™ Citation", open=False):
citation_button = gr.Textbox(
value=CITATION_BUTTON_TEXT,
label=CITATION_BUTTON_LABEL,
lines=10,
elem_id="citation-button",
show_copy_button=True,
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
demo.queue(default_concurrency_limit=40).launch(share=True, ssr_mode=False)