leaderboard / app.py
jasonshaoshun
fix: resolve inconsistent variable naming in causal graph (IOI and ravel)
ad60993
import json
import gzip
import os
import shutil
import secrets
import gradio as gr
from gradio_leaderboard import Leaderboard, ColumnFilter, SelectColumns
import pandas as pd
import numpy as np
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import snapshot_download
from io import StringIO
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from copy import deepcopy
from src.about import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT_SUBGRAPH, EVALUATION_QUEUE_TEXT_CAUSALVARIABLE,
INTRODUCTION_TEXT,
LLM_BENCHMARKS_TEXT,
TITLE,
)
from src.display.css_html_js import custom_css
from src.display.utils import (
BENCHMARK_COLS_MIB_SUBGRAPH,
COLS,
COLS_MIB_SUBGRAPH,
COLS_MULTIMODAL,
EVAL_COLS,
EVAL_TYPES,
AutoEvalColumn,
AutoEvalColumn_mib_subgraph,
AutoEvalColumn_mib_causalgraph,
fields,
)
from src.envs import API, EVAL_REQUESTS_SUBGRAPH, EVAL_REQUESTS_CAUSALGRAPH, QUEUE_REPO_SUBGRAPH, QUEUE_REPO_CAUSALGRAPH, REPO_ID, TOKEN, RESULTS_REPO_MIB_SUBGRAPH, EVAL_RESULTS_MIB_SUBGRAPH_PATH, RESULTS_REPO_MIB_CAUSALGRAPH, EVAL_RESULTS_MIB_CAUSALGRAPH_PATH
from src.populate import get_evaluation_queue_df, get_leaderboard_df_mib_subgraph, get_leaderboard_df_mib_causalgraph
from src.submission.submit import upload_to_queue, remove_submission
from src.submission.check_validity import verify_circuit_submission, verify_causal_variable_submission, check_rate_limit, parse_huggingface_url
from src.about import TasksMib_Subgraph, TasksMib_Causalgraph
from gradio_leaderboard import SelectColumns, Leaderboard
import pandas as pd
from typing import List, Dict, Optional
from dataclasses import fields
import math
class SmartSelectColumns(SelectColumns):
"""
Enhanced SelectColumns component matching exact original parameters.
"""
def __init__(
self,
benchmark_keywords: Optional[List[str]] = None,
model_keywords: Optional[List[str]] = None,
initial_selected: Optional[List[str]] = None,
label: Optional[str] = None,
show_label: bool = True,
info: Optional[str] = None,
allow: bool = True
):
# Match exact parameters from working SelectColumns
super().__init__(
default_selection=initial_selected or [],
cant_deselect=[],
allow=allow,
label=label,
show_label=show_label,
info=info
)
self.benchmark_keywords = benchmark_keywords or []
self.model_keywords = model_keywords or []
# Store groups for later use
self._groups = {}
def get_filtered_groups(self, columns: List[str]) -> Dict[str, List[str]]:
"""Get column groups based on keywords."""
filtered_groups = {}
# Add benchmark groups
for benchmark in self.benchmark_keywords:
matching_cols = [
col for col in columns
if benchmark in col.lower()
]
if matching_cols:
filtered_groups[f"Benchmark group for {benchmark}"] = matching_cols
# Add model groups
for model in self.model_keywords:
matching_cols = [
col for col in columns
if model in col.lower()
]
if matching_cols:
filtered_groups[f"Model group for {model}"] = matching_cols
self._groups = filtered_groups
return filtered_groups
import re
@dataclass
class SubstringSelectColumns(SelectColumns):
"""
Extends SelectColumns to support filtering columns by predefined substrings.
When a substring is selected, all columns containing that substring will be selected.
"""
substring_groups: Dict[str, List[str]] = field(default_factory=dict)
selected_substrings: List[str] = field(default_factory=list)
def __post_init__(self):
# Ensure default_selection is a list
if self.default_selection is None:
self.default_selection = []
# Build reverse mapping of column to substrings
self.column_to_substrings = {}
for substring, patterns in self.substring_groups.items():
for pattern in patterns:
# Convert glob-style patterns to regex
regex = re.compile(pattern.replace('*', '.*'))
# Find matching columns in default_selection
for col in self.default_selection:
if regex.search(col):
if col not in self.column_to_substrings:
self.column_to_substrings[col] = []
self.column_to_substrings[col].append(substring)
# Apply initial substring selections
if self.selected_substrings:
self.update_selection_from_substrings()
def update_selection_from_substrings(self) -> List[str]:
"""
Updates the column selection based on selected substrings.
Returns the new list of selected columns.
"""
selected_columns = self.cant_deselect.copy()
# If no substrings selected, show all columns
if not self.selected_substrings:
selected_columns.extend([
col for col in self.default_selection
if col not in self.cant_deselect
])
return selected_columns
# Add columns that match any selected substring
for col, substrings in self.column_to_substrings.items():
if any(s in self.selected_substrings for s in substrings):
if col not in selected_columns:
selected_columns.append(col)
return selected_columns
def restart_space():
API.restart_space(repo_id=REPO_ID)
### Space initialisation - refresh caches
try:
if os.path.exists(EVAL_REQUESTS_SUBGRAPH):
shutil.rmtree(EVAL_REQUESTS_SUBGRAPH)
snapshot_download(
repo_id=QUEUE_REPO_SUBGRAPH, local_dir=EVAL_REQUESTS_SUBGRAPH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
try:
if os.path.exists(EVAL_REQUESTS_CAUSALGRAPH):
shutil.rmtree(EVAL_REQUESTS_CAUSALGRAPH)
snapshot_download(
repo_id=QUEUE_REPO_CAUSALGRAPH, local_dir=EVAL_REQUESTS_CAUSALGRAPH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
try:
if os.path.exists(EVAL_RESULTS_MIB_SUBGRAPH_PATH):
shutil.rmtree(EVAL_RESULTS_MIB_SUBGRAPH_PATH)
snapshot_download(
repo_id=RESULTS_REPO_MIB_SUBGRAPH, local_dir=EVAL_RESULTS_MIB_SUBGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
try:
if os.path.exists(EVAL_RESULTS_MIB_CAUSALGRAPH_PATH):
shutil.rmtree(EVAL_RESULTS_MIB_CAUSALGRAPH_PATH)
snapshot_download(
repo_id=RESULTS_REPO_MIB_CAUSALGRAPH, local_dir=EVAL_RESULTS_MIB_CAUSALGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
def _sigmoid(x):
try:
return 1 / (1 + math.exp(-2 * (x-1)))
except:
return "-"
LEADERBOARD_DF_MIB_SUBGRAPH_FPL = get_leaderboard_df_mib_subgraph(EVAL_RESULTS_MIB_SUBGRAPH_PATH, COLS_MIB_SUBGRAPH, BENCHMARK_COLS_MIB_SUBGRAPH)
LEADERBOARD_DF_MIB_SUBGRAPH_FEQ = get_leaderboard_df_mib_subgraph(EVAL_RESULTS_MIB_SUBGRAPH_PATH, COLS_MIB_SUBGRAPH, BENCHMARK_COLS_MIB_SUBGRAPH,
metric_type="CMD")
# In app.py, modify the LEADERBOARD initialization
LEADERBOARD_DF_MIB_CAUSALGRAPH_AGGREGATED, LEADERBOARD_DF_MIB_CAUSALGRAPH_AVERAGED = get_leaderboard_df_mib_causalgraph(
EVAL_RESULTS_MIB_CAUSALGRAPH_PATH
)
(
finished_eval_queue_df_subgraph,
pending_eval_queue_df_subgraph,
) = get_evaluation_queue_df(EVAL_REQUESTS_SUBGRAPH, EVAL_COLS, "Circuit")
(
finished_eval_queue_df_causalvariable,
pending_eval_queue_df_causalvariable,
) = get_evaluation_queue_df(EVAL_REQUESTS_CAUSALGRAPH, EVAL_COLS, "Causal Variable")
finished_eval_queue = pd.concat((finished_eval_queue_df_subgraph, finished_eval_queue_df_causalvariable))
pending_eval_queue = pd.concat((pending_eval_queue_df_subgraph, pending_eval_queue_df_causalvariable))
def init_leaderboard_mib_subgraph(dataframe, track):
"""Initialize the subgraph leaderboard with display names for better readability."""
if dataframe is None or dataframe.empty:
raise ValueError("Leaderboard DataFrame is empty or None.")
print("\nDebugging DataFrame columns:", dataframe.columns.tolist())
model_name_mapping = {
"qwen2_5": "Qwen-2.5",
"gpt2": "GPT-2",
"gemma2": "Gemma-2",
"llama3": "Llama-3.1"
}
benchmark_mapping = {
"ioi": "IOI",
"mcqa": "MCQA",
"arithmetic_addition": "Arithmetic (+)",
"arithmetic_subtraction": "Arithmetic (-)",
"arc_easy": "ARC (Easy)",
"arc_challenge": "ARC (Challenge)"
}
display_mapping = {}
for task in TasksMib_Subgraph:
for model in task.value.models:
field_name = f"{task.value.benchmark}_{model}"
display_name = f"{benchmark_mapping[task.value.benchmark]} - {model_name_mapping[model]}"
display_mapping[field_name] = display_name
# Now when creating benchmark groups, we'll use display names
benchmark_groups = []
for task in TasksMib_Subgraph:
benchmark = task.value.benchmark
benchmark_cols = [
display_mapping[f"{benchmark}_{model}"] # Use display name from our mapping
for model in task.value.models
if f"{benchmark}_{model}" in dataframe.columns
]
if benchmark_cols:
benchmark_groups.append(benchmark_cols)
print(f"\nBenchmark group for {benchmark}:", benchmark_cols)
# Similarly for model groups
model_groups = []
all_models = list(set(model for task in TasksMib_Subgraph for model in task.value.models))
for model in all_models:
model_cols = [
display_mapping[f"{task.value.benchmark}_{model}"] # Use display name
for task in TasksMib_Subgraph
if model in task.value.models
and f"{task.value.benchmark}_{model}" in dataframe.columns
]
if model_cols:
model_groups.append(model_cols)
print(f"\nModel group for {model}:", model_cols)
# Combine all groups using display names
all_groups = benchmark_groups + model_groups
all_columns = [col for group in all_groups for col in group]
renamed_df = dataframe.rename(columns=display_mapping)
all_columns = renamed_df.columns.tolist()
# Original code
return Leaderboard(
value=renamed_df, # Use DataFrame with display names
datatype=[c.type for c in fields(AutoEvalColumn_mib_subgraph)],
search_columns=["Method"],
hide_columns=["eval_name"],
interactive=False,
), renamed_df
def init_leaderboard_mib_causalgraph(dataframe, track):
model_name_mapping = {
"Qwen2ForCausalLM": "Qwen-2.5",
"GPT2ForCausalLM": "GPT-2",
"GPT2LMHeadModel": "GPT-2",
"Gemma2ForCausalLM": "Gemma-2",
"LlamaForCausalLM": "Llama-3.1"
}
benchmark_mapping = {
"ioi_task": "IOI",
"4_answer_MCQA": "MCQA",
"arithmetic_addition": "Arithmetic (+)",
"arithmetic_subtraction": "Arithmetic (-)",
"ARC_easy": "ARC (Easy)",
"RAVEL": "RAVEL"
}
target_variables_mapping = {
"output_token": "Output Token",
"output_position": "Output Position",
"answer_pointer": "Answer Pointer",
"answer": "Answer",
"Continent": "Continent",
"Language": "Language",
"Country": "Country",
"Language": "Language"
}
display_mapping = {}
for task in TasksMib_Causalgraph:
for model in task.value.models:
for target_variables in task.value.target_variables:
field_name = f"{model}_{task.value.col_name}_{target_variables}"
display_name = f"{benchmark_mapping[task.value.col_name]} - {model_name_mapping[model]} - {target_variables_mapping[target_variables]}"
display_mapping[field_name] = display_name
renamed_df = dataframe.rename(columns=display_mapping)
# Create only necessary columns
return Leaderboard(
value=renamed_df,
datatype=[c.type for c in fields(AutoEvalColumn_mib_causalgraph)],
search_columns=["Method"],
hide_columns=["eval_name"],
bool_checkboxgroup_label="Hide models",
interactive=False,
), renamed_df
def init_leaderboard(dataframe, track):
if dataframe is None or dataframe.empty:
raise ValueError("Leaderboard DataFrame is empty or None.")
# filter for correct track
dataframe = dataframe.loc[dataframe["Track"] == track]
return Leaderboard(
value=dataframe,
datatype=[c.type for c in fields(AutoEvalColumn)],
select_columns=SelectColumns(
default_selection=[c.name for c in fields(AutoEvalColumn) if c.displayed_by_default],
cant_deselect=[c.name for c in fields(AutoEvalColumn) if c.never_hidden],
label="Select Columns to Display:",
),
search_columns=[AutoEvalColumn.model.name],
hide_columns=[c.name for c in fields(AutoEvalColumn) if c.hidden],
bool_checkboxgroup_label="Hide models",
interactive=False,
)
def process_json(temp_file):
if temp_file is None:
return {}
# Handle file upload
try:
file_path = temp_file.name
if file_path.endswith('.gz'):
with gzip.open(file_path, 'rt') as f:
data = json.load(f)
else:
with open(file_path, 'r') as f:
data = json.load(f)
except Exception as e:
raise gr.Error(f"Error processing file: {str(e)}")
gr.Markdown("Upload successful!")
return data
def get_hf_username(hf_repo):
hf_repo = hf_repo.rstrip("/")
parts = hf_repo.split("/")
username = parts[-2]
return username
# Define the preset substrings for filtering
PRESET_SUBSTRINGS = ["IOI", "MCQA", "Arithmetic", "ARC", "GPT-2", "Qwen-2.5", "Gemma-2", "Llama-3.1"]
TASK_SUBSTRINGS = ["IOI", "MCQA", "Arithmetic", "ARC"]
TASK_CAUSAL_SUBSTRINGS = ["IOI", "MCQA", "ARC (Easy)", "RAVEL"]
MODEL_SUBSTRINGS = ["GPT-2", "Qwen-2.5", "Gemma-2", "Llama-3.1"]
def filter_columns_by_substrings(dataframe: pd.DataFrame, selected_task_substrings: List[str],
selected_model_substrings: List[str]) -> pd.DataFrame:
"""
Filter columns based on the selected substrings.
"""
original_dataframe = deepcopy(dataframe)
if not selected_task_substrings and not selected_model_substrings:
return dataframe # No filtering if no substrings are selected
if not selected_task_substrings:
# Filter columns that contain any of the selected model substrings
filtered_columns = [
col for col in dataframe.columns
if any(sub.lower() in col.lower() for sub in selected_model_substrings)
or col == "Method"
]
return dataframe[filtered_columns]
elif not selected_model_substrings:
# Filter columns that contain any of the selected task substrings
filtered_columns = [
col for col in dataframe.columns
if any(sub.lower() in col.lower() for sub in selected_task_substrings)
or col == "Method"
]
return dataframe[filtered_columns]
# Filter columns by task first. Use AND logic to combine with model filtering
filtered_columns = [
col for col in dataframe.columns
if any(sub.lower() in col.lower() for sub in selected_task_substrings)
or col == "Method"
]
filtered_columns = [
col for col in dataframe[filtered_columns].columns
if any(sub.lower() in col.lower() for sub in selected_model_substrings)
or col == "Method"
]
return dataframe[filtered_columns]
def update_leaderboard(dataframe: pd.DataFrame, selected_task_substrings: List[str],
selected_model_substrings: List[str], ascending: bool = False):
"""
Update the leaderboard based on the selected substrings.
"""
filtered_dataframe = filter_columns_by_substrings(dataframe, selected_task_substrings, selected_model_substrings)
if len(selected_task_substrings) >= 2 or len(selected_task_substrings) == 0:
if len(selected_model_substrings) >= 2 or len(selected_model_substrings) == 0:
show_average = True
else:
show_average = False
else:
show_average = False
def _transform_floats(df):
df_transformed = df.copy()
# Apply transformation row by row
for i, row in df_transformed.iterrows():
# Apply sigmoid only to numeric values in the row
df_transformed.loc[i] = row.apply(lambda x: _sigmoid(x) if isinstance(x, (float, int)) else x)
return df_transformed
if show_average:
# Replace "-" with NaN for calculation, then use skipna=False to get NaN when any value is missing
numeric_data = filtered_dataframe.replace("-", np.nan)
means = numeric_data.mean(axis=1, skipna=False)
# Apply the same transformation for computing scores
s_filtered_dataframe = _transform_floats(filtered_dataframe)
s_numeric_data = s_filtered_dataframe.replace("-", np.nan)
s_means = s_numeric_data.mean(axis=1, skipna=False)
# Set Average and Score columns
# Keep numeric values as NaN for proper sorting, convert to "-" only for display if needed
filtered_dataframe.loc[:, "Average"] = means.round(2)
filtered_dataframe.loc[:, "Score"] = s_means.round(2)
# Sort by Average with NaN values at the end
filtered_dataframe = filtered_dataframe.sort_values(by=["Average"], ascending=ascending, na_position='last')
# After sorting, convert NaN back to "-" for display
filtered_dataframe.loc[:, "Average"] = filtered_dataframe["Average"].fillna("-")
filtered_dataframe.loc[:, "Score"] = filtered_dataframe["Score"].fillna("-")
return filtered_dataframe
def process_url(url):
# Add your URL processing logic here
return f"You entered the URL: {url}"
demo = gr.Blocks(css=custom_css)
with demo:
gr.HTML(TITLE)
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text")
with gr.Tabs(elem_classes="tab-buttons") as tabs:
with gr.TabItem("Circuit Localization", elem_id="subgraph", id=0):
with gr.Tabs() as subgraph_tabs:
with gr.TabItem("CPR", id=0):
# Add description for filters
gr.Markdown("""
### Filtering Options
Use the dropdown menus below to filter results by specific tasks or models.
You can combine filters to see specific task-model combinations.
""")
# CheckboxGroup for selecting substrings
task_substring_checkbox = gr.CheckboxGroup(
choices=TASK_SUBSTRINGS,
label="View tasks:",
value=TASK_SUBSTRINGS, # Default to all substrings selected
)
model_substring_checkbox = gr.CheckboxGroup(
choices = MODEL_SUBSTRINGS,
label = "View models:",
value = MODEL_SUBSTRINGS
)
leaderboard, data = init_leaderboard_mib_subgraph(LEADERBOARD_DF_MIB_SUBGRAPH_FPL, "Subgraph")
original_leaderboard = gr.State(value=data)
ascending = gr.State(value=False)
# Update the leaderboard when the user selects/deselects substrings
task_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard
)
model_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard
)
print(f"Leaderboard is {leaderboard}")
with gr.TabItem("CMD", id=1):
# Add description for filters
gr.Markdown("""
### Filtering Options
Use the dropdown menus below to filter results by specific tasks or models.
You can combine filters to see specific task-model combinations.
""")
# CheckboxGroup for selecting substrings
task_substring_checkbox = gr.CheckboxGroup(
choices=TASK_SUBSTRINGS,
label="View tasks:",
value=TASK_SUBSTRINGS, # Default to all substrings selected
)
model_substring_checkbox = gr.CheckboxGroup(
choices = MODEL_SUBSTRINGS,
label = "View models:",
value = MODEL_SUBSTRINGS
)
leaderboard, data = init_leaderboard_mib_subgraph(LEADERBOARD_DF_MIB_SUBGRAPH_FEQ, "Subgraph")
original_leaderboard = gr.State(value=data)
ascending = gr.State(value=True)
# Update the leaderboard when the user selects/deselects substrings
task_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard
)
model_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard
)
print(f"Leaderboard is {leaderboard}")
# Then modify the Causal Graph tab section
with gr.TabItem("Causal Variable Localization", elem_id="causalgraph", id=1):
with gr.Tabs() as causalgraph_tabs:
with gr.TabItem("Highest View", id=0):
gr.Markdown("""
### Filtering Options
Use the dropdown menus below to filter results by specific tasks or models.
You can combine filters to see specific task-model combinations.
""")
task_substring_checkbox = gr.CheckboxGroup(
choices=TASK_CAUSAL_SUBSTRINGS,
label="View tasks:",
value=TASK_CAUSAL_SUBSTRINGS, # Default to all substrings selected
)
model_substring_checkbox = gr.CheckboxGroup(
choices = MODEL_SUBSTRINGS,
label = "View models:",
value = MODEL_SUBSTRINGS
)
leaderboard_aggregated, data = init_leaderboard_mib_causalgraph(
LEADERBOARD_DF_MIB_CAUSALGRAPH_AGGREGATED,
"Causal Graph"
)
original_leaderboard = gr.State(value=data)
ascending = gr.State(value=False)
task_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard_aggregated
)
model_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard_aggregated
)
with gr.TabItem("Averaged View", id=1):
task_substring_checkbox = gr.CheckboxGroup(
choices=TASK_CAUSAL_SUBSTRINGS,
label="View tasks:",
value=TASK_CAUSAL_SUBSTRINGS, # Default to all substrings selected
)
model_substring_checkbox = gr.CheckboxGroup(
choices = MODEL_SUBSTRINGS,
label = "View models:",
value = MODEL_SUBSTRINGS
)
leaderboard_averaged, data = init_leaderboard_mib_causalgraph(
LEADERBOARD_DF_MIB_CAUSALGRAPH_AVERAGED,
"Causal Graph"
)
original_leaderboard = gr.State(value=data)
ascending = gr.State(value=False)
task_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard_averaged
)
model_substring_checkbox.change(
fn=update_leaderboard,
inputs=[original_leaderboard, task_substring_checkbox, model_substring_checkbox, ascending],
outputs=leaderboard_averaged
)
with gr.TabItem("Submit", elem_id="llm-benchmark-tab-table", id=2):
# Track selection
track = gr.Radio(
choices=[
"Circuit Localization Track",
"Causal Variable Localization Track"
],
label="Select Competition Track",
elem_id="track_selector"
)
with gr.Column(visible=False, elem_id="bordered-column") as circuit_ui:
with gr.Row():
gr.Markdown(EVALUATION_QUEUE_TEXT_SUBGRAPH, elem_classes="markdown-text")
with gr.Row():
hf_repo_circ = gr.Textbox(
label="HuggingFace Repository URL",
placeholder="https://huggingface.co/username/repo/path",
info="Must be a valid HuggingFace URL pointing to folders containing either 1 importance score file per task/model, or " \
"9 circuit files per task/model (.json or .pt)."
)
level = gr.Radio(
choices=[
"Edge",
"Node (submodule)",
"Node (neuron)"
],
label="Level of granularity",
info="Is your circuit defined by its inclusion/exclusion of certain edges (e.g., MLP1 to H10L12), of certain submodules (e.g., MLP1), or of neurons " \
"within those submodules (e.g., MLP1 neuron 295)?"
)
with gr.Column(visible=False, elem_id="bordered-column") as causal_ui:
gr.Markdown(EVALUATION_QUEUE_TEXT_CAUSALVARIABLE, elem_classes="markdown-text")
with gr.Row():
hf_repo_cg = gr.Textbox(
label="HuggingFace Repository URL",
placeholder="https://huggingface.co/username/repo/path",
info="Must be a valid HuggingFace URL pointing to a file containing the trained featurizer (.pt). " )
# Common fields
with gr.Group():
gr.Markdown("### Submission Information")
method_name = gr.Textbox(label="Method Name")
contact_email = gr.Textbox(label="Contact Email")
# Dynamic UI logic
def toggle_ui(track):
circuit = track == "Circuit Localization Track"
causal = not circuit
return {
circuit_ui: gr.Group(visible=circuit),
causal_ui: gr.Group(visible=causal)
}
track.change(toggle_ui, track, [circuit_ui, causal_ui])
# Submission handling
status = gr.Textbox(label="Submission Status", visible=False)
def handle_submission(track, hf_repo_circ, hf_repo_cg, level, method_name, contact_email):
errors = []
warnings = []
breaking_error = False
hf_repo = hf_repo_circ if "Circuit" in track else hf_repo_cg
# Validate common fields
if not method_name.strip():
errors.append("Method name is required")
if "@" not in contact_email or "." not in contact_email:
errors.append("Valid email address is required")
if "Circuit" in track and not level:
errors.append("Level of granularity is required")
if not hf_repo.startswith("https://huggingface.co/") and not hf_repo.startswith("http://huggingface.co/"):
errors.append(f"Invalid HuggingFace URL - must start with https://huggingface.co/")
breaking_error = True
else:
repo_id, subfolder, revision = parse_huggingface_url(hf_repo)
if repo_id is None:
errors.append("Could not read username or repo name from HF URL")
breaking_error = True
else:
user_name, repo_name = repo_id.split("/")
under_rate_limit, time_left = check_rate_limit(track, user_name, contact_email)
if not under_rate_limit:
errors.append(f"Rate limit exceeded (max 2 submissions per week). Please try again in {time_left}. " \
"(If you're trying again after a failed validation, either remove the previous entry below or try again in about 30 minutes.")
breaking_error = True
# Track-specific validation
if "Circuit" in track and not breaking_error:
submission_errors, submission_warnings = verify_circuit_submission(hf_repo, level)
elif not breaking_error:
submission_errors, submission_warnings = verify_causal_variable_submission(hf_repo)
if not breaking_error:
errors.extend(submission_errors)
warnings.extend(submission_warnings)
_id = secrets.token_urlsafe(12)
if errors:
return [
gr.Textbox("\n".join(f"❌ {e}" for e in errors), visible=True),
None, None,
gr.Column(visible=False),
]
elif warnings:
return [
gr.Textbox("Warnings:", visible=True),
gr.Markdown("\n\n".join(f"β€’ {w}" for w in warnings)),
(track, hf_repo_circ, hf_repo_cg, level, method_name, contact_email, _id),
gr.Column(visible=True)
]
else:
return upload_to_queue(track, hf_repo_circ, hf_repo_cg, level, method_name, contact_email, _id)
# New warning confirmation dialog
warning_modal = gr.Column(visible=False, variant="panel")
with warning_modal:
gr.Markdown("### ⚠️ Submission Warnings")
warning_display = gr.Markdown()
proceed_btn = gr.Button("Proceed Anyway", variant="secondary")
cancel_btn = gr.Button("Cancel Submission", variant="primary")
# Store submission data between callbacks
pending_submission = gr.State()
submit_btn = gr.Button("Submit Entry", variant="primary")
submit_btn.click(
handle_submission,
inputs=[track, hf_repo_circ, hf_repo_cg, level, method_name, contact_email],
outputs=[status, warning_display, pending_submission, warning_modal]
)
proceed_btn.click(
lambda x: upload_to_queue(*x),
inputs=pending_submission,
outputs=[status, warning_display, pending_submission, warning_modal]
)
cancel_btn.click(
lambda: [gr.Textbox("Submission canceled.", visible=True), None, None, gr.Column(visible=False)],
outputs=[status, warning_display, pending_submission, warning_modal]
)
with gr.Column():
with gr.Accordion(
f"βœ… Finished Evaluations ({len(finished_eval_queue)})",
open=False,
):
with gr.Row():
finished_eval_table = gr.components.Dataframe(
value=finished_eval_queue,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Accordion(
f"⏳ Pending Evaluation Queue ({len(pending_eval_queue)})",
open=False,
):
with gr.Row():
pending_eval_table = gr.components.Dataframe(
value=pending_eval_queue,
headers=EVAL_COLS,
datatype=EVAL_TYPES,
row_count=5,
)
with gr.Group():
gr.Markdown("### Remove Submission from Queue")
with gr.Row():
name_r = gr.Textbox(label="Method Name")
_id_r = gr.Textbox(label = "Submission ID")
status_r = gr.Textbox(label="Removal Status", visible=False)
remove_button = gr.Button("Remove Entry")
remove_button.click(
remove_submission,
inputs=[track, name_r, _id_r],
outputs=[status_r]
)
# Add info about rate limits
gr.Markdown("""
### Submission Policy
- Maximum 2 valid submissions per HuggingFace account per week
- Invalid submissions don't count toward your limit
- Rate limit tracked on a rolling basis: a submission no longer counts toward the limit as soon as 7 days have passed since the submission time
- The queues can take up to an hour to update; don't fret if your submission doesn't show up immediately!
""")
with gr.Row():
with gr.Accordion("πŸ“™ Citation", open=False):
citation_button = gr.Textbox(
value=CITATION_BUTTON_TEXT,
label=CITATION_BUTTON_LABEL,
lines=10,
elem_id="citation-button",
show_copy_button=True,
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
demo.queue(default_concurrency_limit=40).launch(share=True, ssr_mode=False)