Spaces:
Running
Running
import json | |
import gzip | |
import gradio as gr | |
from gradio_leaderboard import Leaderboard, ColumnFilter, SelectColumns | |
import pandas as pd | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from huggingface_hub import snapshot_download | |
from io import StringIO | |
from src.about import ( | |
CITATION_BUTTON_LABEL, | |
CITATION_BUTTON_TEXT, | |
EVALUATION_QUEUE_TEXT, | |
INTRODUCTION_TEXT, | |
LLM_BENCHMARKS_TEXT, | |
TITLE, | |
) | |
from src.display.css_html_js import custom_css | |
from src.display.utils import ( | |
BENCHMARK_COLS, | |
BENCHMARK_COLS_MULTIMODAL, | |
BENCHMARK_COLS_MIB, | |
COLS, | |
COLS_MIB, | |
COLS_MULTIMODAL, | |
EVAL_COLS, | |
EVAL_TYPES, | |
AutoEvalColumn, | |
AutoEvalColumn_mib, | |
fields, | |
) | |
from src.envs import API, EVAL_REQUESTS_PATH, QUEUE_REPO, REPO_ID, TOKEN, RESULTS_REPO_MIB_SUBGRAPH, EVAL_RESULTS_MIB_SUBGRAPH_PATH, RESULTS_REPO_MIB_CAUSALGRAPH, EVAL_RESULTS_MIB_CAUSALGRAPH_PATH | |
from src.populate import get_evaluation_queue_df, get_leaderboard_df, get_leaderboard_df_mib | |
from src.submission.submit import add_new_eval | |
def restart_space(): | |
API.restart_space(repo_id=REPO_ID) | |
### Space initialisation | |
try: | |
print(EVAL_REQUESTS_PATH) | |
snapshot_download( | |
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN | |
) | |
except Exception: | |
restart_space() | |
# print("EVAL_RESULTS_PATH") | |
# try: | |
# print(EVAL_RESULTS_PATH) | |
# snapshot_download( | |
# repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN | |
# ) | |
# except Exception: | |
# restart_space() | |
try: | |
print(RESULTS_REPO_MIB_SUBGRAPH) | |
snapshot_download( | |
repo_id=RESULTS_REPO_MIB_SUBGRAPH, local_dir=EVAL_RESULTS_MIB_SUBGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN | |
) | |
except Exception: | |
restart_space() | |
try: | |
print(RESULTS_REPO_MIB_CAUSALGRAPH) | |
snapshot_download( | |
repo_id=RESULTS_REPO_MIB_CAUSALGRAPH, local_dir=EVAL_RESULTS_MIB_CAUSALGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN | |
) | |
except Exception: | |
restart_space() | |
LEADERBOARD_DF_MIB_SUBGRAPH = get_leaderboard_df_mib(EVAL_RESULTS_MIB_SUBGRAPH_PATH, EVAL_REQUESTS_PATH, COLS_MIB, BENCHMARK_COLS_MIB) | |
LEADERBOARD_DF_MIB_CAUSALGRAPH = get_leaderboard_df_mib(EVAL_RESULTS_MIB_CAUSALGRAPH_PATH, EVAL_REQUESTS_PATH, COLS_MIB, BENCHMARK_COLS_MIB) | |
# LEADERBOARD_DF = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS) | |
# LEADERBOARD_DF_MULTIMODAL = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS_MULTIMODAL, BENCHMARK_COLS_MULTIMODAL) | |
( | |
finished_eval_queue_df, | |
running_eval_queue_df, | |
pending_eval_queue_df, | |
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) | |
def init_leaderboard_mib(dataframe, track): | |
print(f"init_leaderboard_mib: dataframe head before loc is {dataframe.head()}\n") | |
if dataframe is None or dataframe.empty: | |
raise ValueError("Leaderboard DataFrame is empty or None.") | |
# filter for correct track | |
# dataframe = dataframe.loc[dataframe["Track"] == track] | |
print(f"init_leaderboard_mib: dataframe head after loc is {dataframe.head()}\n") | |
return Leaderboard( | |
value=dataframe, | |
datatype=[c.type for c in fields(AutoEvalColumn_mib)], | |
select_columns=SelectColumns( | |
default_selection=[c.name for c in fields(AutoEvalColumn_mib) if c.displayed_by_default], | |
cant_deselect=[c.name for c in fields(AutoEvalColumn_mib) if c.never_hidden], | |
label="Select Columns to Display:", | |
), | |
search_columns=["Method"], # Changed from AutoEvalColumn_mib.model.name to "Method" | |
hide_columns=[c.name for c in fields(AutoEvalColumn_mib) if c.hidden], | |
bool_checkboxgroup_label="Hide models", | |
interactive=False, | |
) | |
def calculate_best_layer_scores(task_data: Dict[str, Any]) -> Dict[str, float]: | |
""" | |
Calculate the best scores across layers for output token and location | |
Args: | |
task_data: Dictionary containing task scores for different layers | |
Returns: | |
Dictionary with best scores and corresponding layer | |
""" | |
output_token_scores = [layer_data['output_token'] for layer_data in task_data.values()] | |
output_location_scores = [layer_data['output_location'] for layer_data in task_data.values()] | |
best_output_token = max(output_token_scores) | |
best_output_location = max(output_location_scores) | |
# Find the layer with the best combined performance | |
layer_scores = [(layer, layer_data['output_token'] + layer_data['output_location']) | |
for layer, layer_data in task_data.items()] | |
best_layer = max(layer_scores, key=lambda x: x[1])[0] | |
return { | |
'output_token': best_output_token, | |
'output_location': best_output_location, | |
'best_layer': int(best_layer) | |
} | |
def process_single_method(json_data: Dict[str, Any]) -> List[Dict[str, Any]]: | |
""" | |
Process results for a single method into summary rows | |
Args: | |
json_data: Dictionary containing results for one method | |
Returns: | |
List of summary rows for the method | |
""" | |
summary_rows = [] | |
method_name = json_data['method_name'] | |
for model_result in json_data['results']: | |
model_id = model_result['model_id'] | |
task_data = model_result['task_scores']['MCQA'] | |
best_scores = calculate_best_layer_scores(task_data) | |
summary_row = { | |
'Method': method_name, | |
'Model': model_id, | |
'Best Output Token Score': best_scores['output_token'], | |
'Best Output Location Score': best_scores['output_location'], | |
'Best Layer': best_scores['best_layer'] | |
} | |
summary_rows.append(summary_row) | |
return summary_rows | |
def init_leaderboard_mib_causal(json_data_list: List[Dict[str, Any]], track: str) -> 'Leaderboard': | |
""" | |
Creates a leaderboard summary for causal intervention results from multiple methods | |
Args: | |
json_data_list: List of dictionaries containing results for different methods | |
track: Track identifier (currently unused but maintained for compatibility) | |
Returns: | |
Leaderboard object containing processed and formatted results | |
""" | |
# Process all methods | |
all_summary_data = [] | |
for method_data in json_data_list: | |
method_summary = process_single_method(method_data) | |
all_summary_data.extend(method_summary) | |
# Convert to DataFrame | |
results_df = pd.DataFrame(all_summary_data) | |
# Sort by best score (using output token score as primary metric) | |
results_df = results_df.sort_values('Best Output Token Score', ascending=False) | |
# Round numeric columns to 3 decimal places | |
numeric_cols = ['Best Output Token Score', 'Best Output Location Score'] | |
results_df[numeric_cols] = results_df[numeric_cols].round(3) | |
return Leaderboard( | |
value=results_df, | |
datatype=['text', 'text', 'number', 'number', 'number'], | |
select_columns=SelectColumns( | |
default_selection=['Method', 'Model', 'Best Output Token Score', 'Best Output Location Score', 'Best Layer'], | |
cant_deselect=['Method', 'Model'], | |
label="Select Metrics to Display:", | |
), | |
search_columns=['Method', 'Model'], | |
interactive=False, | |
) | |
def init_leaderboard(dataframe, track): | |
if dataframe is None or dataframe.empty: | |
raise ValueError("Leaderboard DataFrame is empty or None.") | |
# filter for correct track | |
dataframe = dataframe.loc[dataframe["Track"] == track] | |
# print(f"\n\n\n dataframe is {dataframe}\n\n\n") | |
return Leaderboard( | |
value=dataframe, | |
datatype=[c.type for c in fields(AutoEvalColumn)], | |
select_columns=SelectColumns( | |
default_selection=[c.name for c in fields(AutoEvalColumn) if c.displayed_by_default], | |
cant_deselect=[c.name for c in fields(AutoEvalColumn) if c.never_hidden], | |
label="Select Columns to Display:", | |
), | |
search_columns=[AutoEvalColumn.model.name], | |
hide_columns=[c.name for c in fields(AutoEvalColumn) if c.hidden], | |
bool_checkboxgroup_label="Hide models", | |
interactive=False, | |
) | |
def process_json(temp_file): | |
if temp_file is None: | |
return {} | |
# Handle file upload | |
try: | |
file_path = temp_file.name | |
if file_path.endswith('.gz'): | |
with gzip.open(file_path, 'rt') as f: | |
data = json.load(f) | |
else: | |
with open(file_path, 'r') as f: | |
data = json.load(f) | |
except Exception as e: | |
raise gr.Error(f"Error processing file: {str(e)}") | |
gr.Markdown("Upload successful!") | |
return data | |
demo = gr.Blocks(css=custom_css) | |
with demo: | |
gr.HTML(TITLE) | |
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
with gr.Tabs(elem_classes="tab-buttons") as tabs: | |
# with gr.TabItem("Strict", elem_id="strict-benchmark-tab-table", id=0): | |
# leaderboard = init_leaderboard(LEADERBOARD_DF, "strict") | |
# with gr.TabItem("Strict-small", elem_id="strict-small-benchmark-tab-table", id=1): | |
# leaderboard = init_leaderboard(LEADERBOARD_DF, "strict-small") | |
# with gr.TabItem("Multimodal", elem_id="multimodal-benchmark-tab-table", id=2): | |
# leaderboard = init_leaderboard(LEADERBOARD_DF_MULTIMODAL, "multimodal") | |
# with gr.TabItem("π About", elem_id="llm-benchmark-tab-table", id=4): | |
# gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") | |
# with gr.TabItem("πΆ Submit", elem_id="llm-benchmark-tab-table", id=5): | |
# with gr.Column(): | |
# with gr.Row(): | |
# gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text") | |
with gr.TabItem("Subgraph", elem_id="subgraph", id=0): | |
leaderboard = init_leaderboard_mib(LEADERBOARD_DF_MIB_SUBGRAPH, "Subgraph") | |
# leaderboard = init_leaderboard_mib(LEADERBOARD_DF, "mib") | |
with gr.TabItem("Causal Graph", elem_id="causalgraph", id=1): | |
leaderboard = init_leaderboard_mib_causal(LEADERBOARD_DF_MIB_CAUSALGRAPH, "Causal Graph") | |
# with gr.Row(): | |
# with gr.Accordion("π Citation", open=False): | |
# citation_button = gr.Textbox( | |
# value=CITATION_BUTTON_TEXT, | |
# label=CITATION_BUTTON_LABEL, | |
# lines=20, | |
# elem_id="citation-button", | |
# show_copy_button=True, | |
# ) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(restart_space, "interval", seconds=1800) | |
scheduler.start() | |
demo.launch(share=True, ssr_mode=False) | |