Spaces:
Running
Running
import json | |
import os | |
import shutil | |
import re | |
import numpy as np | |
import pandas as pd | |
import gradio as gr | |
from urllib.parse import urlparse | |
from collections import defaultdict | |
from datetime import datetime, timedelta, timezone | |
from typing import Literal | |
from huggingface_hub import HfApi, HfFileSystem, hf_hub_url, get_hf_file_metadata | |
from huggingface_hub import ModelCard | |
from huggingface_hub.hf_api import ModelInfo | |
from transformers import AutoConfig | |
from transformers.models.auto.tokenization_auto import AutoTokenizer | |
from src.display.utils import TEXT_TASKS, VISION_TASKS, NUM_EXPECTED_EXAMPLES | |
from src.envs import EVAL_REQUESTS_SUBGRAPH, EVAL_REQUESTS_CAUSALGRAPH | |
def is_model_on_hub(model_name: str, revision: str, token: str = None, trust_remote_code=False, test_tokenizer=False) -> tuple[bool, str]: | |
"""Checks if the model model_name is on the hub, and whether it (and its tokenizer) can be loaded with AutoClasses.""" | |
try: | |
config = AutoConfig.from_pretrained(model_name, revision=revision, trust_remote_code=trust_remote_code, token=token) | |
if test_tokenizer: | |
try: | |
tk = AutoTokenizer.from_pretrained(model_name, revision=revision, trust_remote_code=trust_remote_code, token=token) | |
except ValueError as e: | |
return ( | |
False, | |
f"uses a tokenizer which is not in a transformers release: {e}", | |
None | |
) | |
except Exception as e: | |
return (False, "'s tokenizer cannot be loaded. Is your tokenizer class in a stable transformers release, and correctly configured?", None) | |
return True, None, config | |
except ValueError: | |
return ( | |
False, | |
"needs to be launched with `trust_remote_code=True`. For safety reason, we do not allow these models to be automatically submitted to the leaderboard.", | |
None | |
) | |
except Exception as e: | |
return False, "was not found on hub!", None | |
def get_model_size(model_info: ModelInfo, precision: str): | |
"""Gets the model size from the configuration, or the model name if the configuration does not contain the information.""" | |
try: | |
model_size = round(model_info.safetensors["total"] / 1e9, 3) | |
except (AttributeError, TypeError): | |
return 0 # Unknown model sizes are indicated as 0, see NUMERIC_INTERVALS in app.py | |
size_factor = 8 if (precision == "GPTQ" or "gptq" in model_info.modelId.lower()) else 1 | |
model_size = size_factor * model_size | |
return model_size | |
def get_model_arch(model_info: ModelInfo): | |
"""Gets the model architecture from the configuration""" | |
return model_info.config.get("architectures", "Unknown") | |
def already_submitted_models(requested_models_dir: str) -> set[str]: | |
"""Gather a list of already submitted models to avoid duplicates""" | |
depth = 1 | |
file_names = [] | |
users_to_submission_dates = defaultdict(list) | |
for root, _, files in os.walk(requested_models_dir): | |
current_depth = root.count(os.sep) - requested_models_dir.count(os.sep) | |
if current_depth == depth: | |
for file in files: | |
if not file.endswith(".json"): | |
continue | |
with open(os.path.join(root, file), "r") as f: | |
info = json.load(f) | |
file_names.append(f"{info['model']}_{info['revision']}_{info['track']}") | |
# Select organisation | |
if info["model"].count("/") == 0 or "submitted_time" not in info: | |
continue | |
organisation, _ = info["model"].split("/") | |
users_to_submission_dates[organisation].append(info["submitted_time"]) | |
return set(file_names), users_to_submission_dates | |
def is_valid_predictions(predictions: dict) -> tuple[bool, str]: | |
out_msg = "" | |
for task in TEXT_TASKS: | |
if task not in predictions: | |
out_msg = f"Error: {task} not present" | |
break | |
for subtask in TEXT_TASKS[task]: | |
if subtask not in predictions[task]: | |
out_msg = f"Error: {subtask} not present under {task}" | |
break | |
if out_msg != "": | |
break | |
if "vqa" in predictions or "winoground" in predictions or "devbench" in predictions: | |
for task in VISION_TASKS: | |
if task not in predictions: | |
out_msg = f"Error: {task} not present" | |
break | |
for subtask in VISION_TASKS[task]: | |
if subtask not in predictions[task]: | |
out_msg = f"Error: {subtask} not present under {task}" | |
break | |
if out_msg != "": | |
break | |
# Make sure all examples have predictions, and that predictions are the correct type | |
for task in predictions: | |
for subtask in predictions[task]: | |
if task == "devbench": | |
a = np.array(predictions[task][subtask]["predictions"]) | |
if subtask == "sem-things": | |
required_shape = (1854, 1854) | |
elif subtask == "gram-trog": | |
required_shape = (76, 4, 1) | |
elif subtask == "lex-viz_vocab": | |
required_shape = (119, 4, 1) | |
if a.shape[0] != required_shape[0] or a.shape[1] != required_shape[1]: | |
out_msg = f"Error: Wrong shape for results for `{subtask}` in `{task}`." | |
break | |
if not str(a.dtype).startswith("float"): | |
out_msg = f"Error: Results for `{subtask}` ({task}) \ | |
should be floats but aren't." | |
break | |
continue | |
num_expected_examples = NUM_EXPECTED_EXAMPLES[task][subtask] | |
if len(predictions[task][subtask]["predictions"]) != num_expected_examples: | |
out_msg = f"Error: {subtask} has the wrong number of examples." | |
break | |
if task == "glue": | |
if type(predictions[task][subtask]["predictions"][0]["pred"]) != int: | |
out_msg = f"Error: results for `{subtask}` (`{task}`) should be integers but aren't." | |
break | |
else: | |
if type(predictions[task][subtask]["predictions"][0]["pred"]) != str: | |
out_msg = f"Error: results for `{subtask}` (`{task}`) should be strings but aren't." | |
break | |
if out_msg != "": | |
break | |
if out_msg != "": | |
return False, out_msg | |
return True, "Upload successful." | |
def _format_time(earliest_time): | |
time_left = (earliest_time.tz_convert("UTC") + timedelta(weeks=1)) - pd.Timestamp.utcnow() | |
hours = time_left.seconds // 3600 | |
minutes, seconds = divmod(time_left.seconds % 3600, 60) | |
time_left_formatted = f"{hours:02}:{minutes:02}:{seconds:02}" | |
if time_left.days > 0: | |
time_left_formatted = f"{time_left.days} days, {time_left_formatted}" | |
return time_left_formatted | |
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
"""Creates the different dataframes for the evaluation queues requests""" | |
entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] | |
all_evals = [] | |
for entry in entries: | |
if ".json" in entry: | |
file_path = os.path.join(save_path, entry) | |
with open(file_path) as fp: | |
data = json.load(fp) | |
# if "still_on_hub" in data and data["still_on_hub"]: | |
# data[EvalQueueColumn.model.name] = make_clickable_model(data["hf_repo"], data["model"]) | |
# data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
# else: | |
# data[EvalQueueColumn.model.name] = data["model"] | |
# data[EvalQueueColumn.revision.name] = "N/A" | |
all_evals.append(data) | |
elif ".md" not in entry: | |
# this is a folder | |
sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] | |
for sub_entry in sub_entries: | |
file_path = os.path.join(save_path, entry, sub_entry) | |
with open(file_path) as fp: | |
data = json.load(fp) | |
all_evals.append(data) | |
return pd.DataFrame(all_evals) | |
def check_rate_limit(track, user_name, contact_email): | |
if "Circuit" in track: | |
save_path = EVAL_REQUESTS_SUBGRAPH | |
else: | |
save_path = EVAL_REQUESTS_CAUSALGRAPH | |
evaluation_queue = get_evaluation_queue_df(save_path, ["user_name", "contact_email"]) | |
if evaluation_queue.empty: | |
return True, None | |
one_week_ago = pd.Timestamp.utcnow() - timedelta(weeks=1) | |
user_name_occurrences = evaluation_queue[evaluation_queue["user_name"] == user_name] | |
user_name_occurrences["submit_time"] = pd.to_datetime(user_name_occurrences["submit_time"], utc=True) | |
user_name_occurrences = user_name_occurrences[user_name_occurrences["submit_time"] >= one_week_ago] | |
email_occurrences = evaluation_queue[evaluation_queue["contact_email"] == contact_email.lower()] | |
email_occurrences["submit_time"] = pd.to_datetime(email_occurrences["submit_time"], utc=True) | |
email_occurrences = email_occurrences[email_occurrences["submit_time"] >= one_week_ago] | |
if user_name_occurrences.shape[0] >= 2: | |
earliest_time = user_name_occurrences["submit_time"].min() | |
time_left_formatted = _format_time(earliest_time) | |
return False, time_left_formatted | |
if email_occurrences.shape[0] >= 2: | |
earliest_time = email_occurrences["submit_time"].min() | |
time_left_formatted = _format_time(earliest_time) | |
return False, time_left_formatted | |
return True, None | |
def parse_huggingface_url(url: str): | |
""" | |
Extracts repo_id and subfolder path from a Hugging Face URL. | |
Returns (repo_id, folder_path). | |
""" | |
# Handle cases where the input is already a repo_id (no URL) | |
if not url.startswith(("http://", "https://")): | |
return url, None | |
parsed = urlparse(url) | |
path_parts = parsed.path.strip("/").split("/") | |
# Extract repo_id (username/repo_name) | |
if len(path_parts) < 2: | |
raise ValueError("Invalid Hugging Face URL: Could not extract repo_id.") | |
repo_id = f"{path_parts[0]}/{path_parts[1]}" | |
# Extract folder path (if in /tree/ or /blob/) | |
if "tree" in path_parts or "blob" in path_parts: | |
try: | |
branch_idx = path_parts.index("tree") if "tree" in path_parts else path_parts.index("blob") | |
folder_path = "/".join(path_parts[branch_idx + 2:]) # Skip "tree/main" or "blob/main" | |
except (ValueError, IndexError): | |
folder_path = None | |
else: | |
folder_path = None | |
return repo_id, folder_path | |
def validate_directory(fs: HfFileSystem, repo_id: str, dirname: str, curr_tm: str, circuit_level:Literal['edge', 'node','neuron']='edge'): | |
errors = [] | |
warnings = [] | |
task, model = curr_tm.split("_") | |
curr_tm_display = curr_tm.replace("_", "/") | |
files = fs.ls(dirname) | |
# Detect whether multi-circuit or importances | |
is_multiple_circuits = False | |
files = [f["name"] for f in files if (f["name"].endswith(".json") or f["name"].endswith(".pt"))] | |
if len(files) == 1: | |
is_multiple_circuits = False | |
elif len(files) > 1: | |
is_multiple_circuits = True | |
if len(files) < 9: | |
errors.append(f"Folder for {curr_tm_display} contains multiple circuits, but not enough. If you intended to submit importances, include only one circuit in the folder. Otherwise, please add the rest of the circuits.") | |
else: | |
warnings.append(f"Directory present for {curr_tm_display} but is empty") | |
offset = 0 | |
for idx, file in enumerate(files): | |
file_suffix = file.split(repo_id + "/")[1] | |
file_url = hf_hub_url(repo_id=repo_id, filename=file_suffix) | |
file_info = get_hf_file_metadata(file_url) | |
file_size_mb = file_info.size / (1024 * 1024) | |
if file_size_mb > 150: | |
warnings.append(f"Will skip file >150MB: {file}") | |
offset -= 1 | |
continue | |
if is_multiple_circuits and idx + offset >= 9: | |
break | |
return errors, warnings | |
def verify_circuit_submission(hf_repo, level, progress=gr.Progress()): | |
VALID_COMBINATIONS = [ | |
"ioi_gpt2", "ioi_qwen2.5", "ioi_gemma2", "ioi_llama3", "ioi_interpbench", | |
"mcqa_qwen2.5", "mcqa_gemma2", "mcqa_llama3", | |
"arithmetic-addition_llama3", "arithmetic-subtraction_llama3", | |
"arc-easy_gemma2", "arc-easy_llama3", | |
"arc-challenge_llama3" | |
] | |
TASKS = ["ioi", "mcqa", "arithmetic-addition", "arithmetic-subtraction", "arc-easy", "arc-challenge"] | |
MODELS = ["gpt2", "qwen2.5", "gemma2", "llama3", "interpbench"] | |
errors = [] | |
warnings = [] | |
directories_present = {tm: False for tm in VALID_COMBINATIONS} | |
directories_valid = {tm: False for tm in VALID_COMBINATIONS} | |
fs = HfFileSystem() | |
path = hf_repo | |
level = level | |
folder_path = path.split("huggingface.co/")[1] | |
repo_id = "/".join(folder_path.split("/")[:2]) | |
try: | |
files = fs.listdir(folder_path) | |
except Exception as e: | |
errors.append(f"Could not open Huggingface URL: {e}") | |
return errors, warnings | |
file_counts = 0 | |
for dirname in progress.tqdm(files, desc="Validating directories in repo"): | |
file_counts += 1 | |
if file_counts >= 30: | |
warnings.append("Folder contains many files/directories; stopped at 30.") | |
break | |
circuit_dir = dirname["name"] | |
dirname_proc = circuit_dir.lower().split("/")[-1] | |
if not fs.isdir(circuit_dir): | |
continue | |
curr_task = None | |
curr_model = None | |
# Look for task names in filename | |
for task in TASKS: | |
if dirname_proc.startswith(task) or f"_{task}" in dirname_proc: | |
curr_task = task | |
# Look for model names in filename | |
for model in MODELS: | |
if dirname_proc.startswith(model) or f"_{model}" in dirname_proc: | |
curr_model = model | |
if curr_task is not None and curr_model is not None: | |
curr_tm = f"{curr_task}_{curr_model}" | |
if curr_tm in VALID_COMBINATIONS: | |
directories_present[curr_tm] = True | |
else: | |
continue | |
else: | |
continue | |
# Parse circuits directory | |
print(f"validating {circuit_dir}") | |
vd_errors, vd_warnings = validate_directory(fs, repo_id, circuit_dir, curr_tm, level) | |
errors.extend(vd_errors) | |
warnings.extend(vd_warnings) | |
if len(vd_errors) == 0: | |
directories_valid[curr_tm] = True | |
task_set, model_set = set(), set() | |
for tm in directories_present: | |
if not directories_present[tm]: | |
continue | |
if not directories_valid[tm]: | |
warnings.append(f"Directory found for {tm.replace('_', '/')}, but circuits not valid or present") | |
continue | |
task, model = tm.split("_") | |
task_set.add(task) | |
model_set.add(model) | |
if len(task_set) < 2: | |
errors.append("At least 2 tasks are required") | |
if len(model_set) < 2: | |
errors.append("At least 2 models are required") | |
no_tm_display = [tm.replace("_", "/") for tm in directories_valid if not directories_valid[tm]] | |
if len(no_tm_display) > 0: | |
warnings.append(f"No valid circuits or importance scores found for the following tasks/models: {*no_tm_display,}") | |
return errors, warnings | |
def verify_causal_variable_submission(hf_repo, layer, position, code_upload): | |
return |