Spaces:
Running
Running
""" | |
Data service provider | |
""" | |
import json | |
from typing import List | |
import pandas as pd | |
from utils.cache_decorator import cache_df_with_custom_key, cache_dict_with_custom_key | |
from utils.http_utils import get | |
COLUMNS = ['model_name', | |
'embd_dtype', 'embd_dim', 'num_params', 'max_tokens', 'similarity', | |
'query_instruct', 'corpus_instruct', | |
] | |
COLUMNS_TYPES = ["markdown", | |
'str', 'str', 'number', 'number', 'str', | |
'str', 'str', | |
] | |
BRANCH = 'main' | |
GIT_URL = f"https://raw.githubusercontent.com/embedding-benchmark/rteb/refs/heads/{BRANCH}/results/" | |
DATASET_URL = f"{GIT_URL}datasets.json" | |
MODEL_URL = f"{GIT_URL}models.json" | |
RESULT_URL = f"{GIT_URL}results.json" | |
class DataEngine: | |
def __init__(self): | |
self.df = self.init_dataframe() | |
def models(self): | |
""" | |
Get models data | |
""" | |
res = get(MODEL_URL) | |
if res.status_code == 200: | |
return res.json() | |
return {} | |
def datasets(self): | |
""" | |
Get tasks data | |
""" | |
res = get(DATASET_URL) | |
if res.status_code == 200: | |
return res.json() | |
return {} | |
def results(self): | |
""" | |
Get results data | |
""" | |
res = get(RESULT_URL) | |
if res.status_code == 200: | |
return res.json() | |
return {} | |
def init_dataframe(self): | |
""" | |
Initialize DataFrame | |
""" | |
d = {"hello": [123], "world": [456]} | |
return pd.DataFrame(d) | |
def jsons_to_df(self): | |
results_list = self.results | |
df_results_list = [] | |
for result_dict in results_list: | |
dataset_name = result_dict["dataset_name"] | |
df_result_row = pd.DataFrame(result_dict["results"]) | |
df_result_row["dataset_name"] = dataset_name | |
df_results_list.append(df_result_row) | |
df_result = pd.concat(df_results_list) | |
df_result = df_result[["model_name", "dataset_name", "ndcg_at_10", "embd_dim", "embd_dtype"]] | |
df_result["ndcg_at_10"] = (df_result["ndcg_at_10"] * 100).round(2) | |
df_datasets_list = [] | |
for item in self.datasets: | |
dataset_names = item["datasets"] | |
df_dataset_row = pd.DataFrame( | |
{ | |
"group_name": [item["name"] for _ in range(len(dataset_names))], | |
"dataset_name": dataset_names, | |
"leaderboard": [item["leaderboard"] for _ in range(len(dataset_names))] | |
} | |
) | |
df_datasets_list.append(df_dataset_row) | |
df_dataset = pd.concat(df_datasets_list).drop_duplicates() | |
models_list = self.models | |
df_model = pd.DataFrame(models_list) | |
# Replace None values in num_params with "Unknown" | |
if 'num_params' in df_model.columns: | |
df_model['num_params'] = df_model['num_params'].fillna("Unknown") | |
# Replace blank/None values in vendor with "Open source" | |
if 'vendor' in df_model.columns: | |
df_model['vendor'] = df_model['vendor'].fillna("Open source") | |
df_model['vendor'] = df_model['vendor'].replace('', "Open source") | |
# Also handle whitespace-only strings | |
df_model['vendor'] = df_model['vendor'].apply(lambda x: "Open source" if isinstance(x, str) and x.strip() == '' else x) | |
# Create mapping for model names/aliases | |
if 'alias' in df_model.columns: | |
# Create a lookup table for alias to model_name mapping | |
alias_mapping = df_model[df_model['alias'].notna()].set_index('alias')['model_name'].to_dict() | |
# Add rows for aliases to enable joining | |
alias_rows = [] | |
for _, row in df_model[df_model['alias'].notna()].iterrows(): | |
alias_row = row.copy() | |
alias_row['model_name'] = row['alias'] | |
alias_rows.append(alias_row) | |
if alias_rows: | |
df_model_extended = pd.concat([df_model, pd.DataFrame(alias_rows)], ignore_index=True) | |
else: | |
df_model_extended = df_model | |
else: | |
df_model_extended = df_model | |
df = pd.merge(df_result, df_dataset, on=["dataset_name"], how="inner") | |
# set dataset default value to 0 | |
df = df.pivot(index=["model_name", "embd_dim", "embd_dtype", "group_name"], columns="dataset_name", | |
values=["ndcg_at_10"]).fillna(0).stack(level=1).reset_index() | |
df = pd.merge(df, df_dataset, on=["group_name","dataset_name"], how="inner") | |
# dataset_num_map = {} | |
# grouped_dataset_count = df.groupby(["group_name"]).agg({ | |
# "dataset_name": "nunique" | |
# }).reset_index() | |
# | |
# for _, row in grouped_dataset_count.iterrows(): | |
# dataset_num_map[row["group_name"]] = row["dataset_name"] | |
grouped_model = df.groupby(["model_name", "group_name", "embd_dim", "embd_dtype"]).agg({ | |
"ndcg_at_10": "mean", | |
}).reset_index() | |
pivot = grouped_model.pivot(index=["model_name", "embd_dim", "embd_dtype"], columns="group_name", | |
values=["ndcg_at_10"]).round(2).fillna(0) | |
# Rename columns | |
pivot.columns = list( | |
map(lambda x: f"{x[1].capitalize()} Average" if x[1] != 'text' else f"Average", pivot.columns)) | |
pivot_dataset = df_result.pivot(index=["model_name", "embd_dim", "embd_dtype"], columns="dataset_name", values="ndcg_at_10").fillna(0) | |
df = pd.merge(df_model_extended, pivot, on=["model_name", "embd_dim", "embd_dtype"]) | |
df = pd.merge(df, pivot_dataset, on=["model_name", "embd_dim", "embd_dtype"]) | |
if df.empty: | |
return pd.DataFrame(columns=COLUMNS + ["reference"]) | |
return df | |
def filter_df(self, group_name: str): | |
""" | |
filter_by_providers | |
""" | |
df = self.jsons_to_df() | |
return df[df["group_name"] == group_name][COLUMNS][:] | |