DOoM-lb / src /leaderboard /build_leaderboard.py
Anonumous's picture
Refactor build_leadearboard_df function to integrate data conversion from Small Shlepa format to DeathMath; enhance error handling and logging for model data processing
b0cab2d
import json
import logging
import os
import time
import pandas as pd
from huggingface_hub import snapshot_download
from src.envs import DATA_PATH, H4_TOKEN, RESULTS_REPO, METAINFO_REPO
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def time_diff_wrapper(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
diff = end_time - start_time
logging.info("Time taken for %s: %s seconds", func.__name__, diff)
return result
return wrapper
def chmod_recursive(path, mode):
os.chmod(path, mode)
for root, dirs, files in os.walk(path):
for dir in dirs:
os.chmod(os.path.join(root, dir), mode)
for file in files:
os.chmod(os.path.join(root, file), mode)
@time_diff_wrapper
def download_dataset(repo_id, local_dir, repo_type="dataset", max_attempts=3, backoff_factor=1.5):
"""Download dataset with exponential backoff retries."""
os.makedirs(local_dir, exist_ok=True)
os.makedirs('./tmp', exist_ok=True)
attempt = 0
while attempt < max_attempts:
try:
logging.info("Downloading %s to %s", repo_id, local_dir)
snapshot_download(
repo_id=repo_id,
local_dir=local_dir,
cache_dir='./tmp',
repo_type=repo_type,
tqdm_class=None,
token=H4_TOKEN,
etag_timeout=30,
max_workers=8,
force_download=True,
local_dir_use_symlinks=False
)
logging.info("Download successful")
return
except Exception as e:
wait_time = backoff_factor**attempt
logging.error("Error downloading %s: %s, retrying in %ss", repo_id, e, wait_time)
time.sleep(wait_time)
attempt += 1
logging.error("Failed to download %s after %s attempts", repo_id, max_attempts)
def download_openbench():
"""
Скачивает необходимые данные для лидерборда из репозиториев HuggingFace
"""
# Скачиваем метаданные лидерборда
try:
download_dataset(METAINFO_REPO, DATA_PATH)
logging.info("Successfully downloaded leaderboard metainfo data")
except Exception as e:
logging.error(f"Failed to download leaderboard metainfo: {e}")
# Скачиваем результаты моделей
try:
download_dataset(RESULTS_REPO, "m_data")
logging.info("Successfully downloaded model evaluation results")
except Exception as e:
logging.error(f"Failed to download model evaluation results: {e}")
def build_leadearboard_df():
"""
Функция для сбора данных лидерборда из всех доступных источников.
Гарантирует, что в лидерборде будет только одна запись для каждой модели (с наивысшим score).
"""
results = []
best_model_results = {} # Словарь для отслеживания лучших результатов моделей
# 1. Пытаемся загрузить данные из метаинформации лидерборда
try:
leaderboard_path = os.path.join(DATA_PATH, "leaderboard.json")
if os.path.exists(leaderboard_path):
with open(leaderboard_path, "r", encoding="utf-8") as eval_file:
saved_data = json.load(eval_file)
if saved_data:
logging.info(f"Loaded {len(saved_data)} models from saved leaderboard data")
# Обрабатываем каждую модель, сохраняя только лучший результат
for item in saved_data:
try:
# Получаем имя модели, проверяя разные возможные ключи
model_name = item.get("model_name", item.get("model", ""))
if not model_name:
continue
# Стандартизируем данные
model_data = {
"model": model_name,
"score": float(item.get("score", 0.0)),
"math_score": float(item.get("math_score", 0.0)),
"physics_score": float(item.get("physics_score", 0.0)),
"total_tokens": int(item.get("total_tokens", 0)),
"evaluation_time": float(item.get("evaluation_time", 0.0)),
"system_prompt": item.get("system_prompt", "Вы - полезный помощник по математике и физике. Ответьте на русском языке.")
}
# Определяем, является ли это лучшим результатом для данной модели
model_base_name = model_name.split("/")[-1].split("_v")[0]
if model_base_name in best_model_results:
if model_data["score"] > best_model_results[model_base_name]["score"]:
best_model_results[model_base_name] = model_data
else:
best_model_results[model_base_name] = model_data
except KeyError as e:
# Логируем ошибку, но продолжаем обработку других моделей
logging.error(f"Failed to process model data: {e}")
except Exception as e:
logging.error(f"Failed to load saved leaderboard data: {e}")
# 2. Загружаем модели из директории внешних моделей
try:
external_dir = "./m_data/model_data/external/"
if os.path.exists(external_dir):
for file in os.listdir(external_dir):
if file.endswith(".json"):
try:
with open(os.path.join(external_dir, file), "r", encoding="utf-8") as f:
data = json.load(f)
# Конвертируем данные из любого формата в формат DeathMath
converted_data = convert_old_format_to_deatmath(data)
# Проверяем наличие необходимых полей после конвертации
model_name = converted_data.get("model_name", converted_data.get("model", ""))
if not model_name:
logging.error(f"Failed to parse {file}: 'model_name' not found after conversion")
continue
# Стандартизируем данные
model_data = {
"model": model_name,
"score": float(converted_data.get("score", 0.0)),
"math_score": float(converted_data.get("math_score", 0.0)),
"physics_score": float(converted_data.get("physics_score", 0.0)),
"total_tokens": int(converted_data.get("total_tokens", 0)),
"evaluation_time": float(converted_data.get("evaluation_time", 0.0)),
"system_prompt": converted_data.get("system_prompt",
"Вы - полезный помощник по математике и физике. Ответьте на русском языке.")
}
# Определяем, является ли это лучшим результатом для данной модели
model_base_name = model_name.split("/")[-1].split("_v")[0]
if model_base_name in best_model_results:
if model_data["score"] > best_model_results[model_base_name]["score"]:
best_model_results[model_base_name] = model_data
else:
best_model_results[model_base_name] = model_data
except Exception as e:
logging.error(f"Failed to parse {file}: {str(e)}")
continue
except Exception as e:
logging.error(f"Failed to process external model data: {e}")
# 3. Собираем все лучшие результаты
results = list(best_model_results.values())
# 4. Добавляем базовые модели по умолчанию, если список пуст
if not results:
# Добавляем несколько моделей-заглушек для отображения интерфейса
results = [
{
"model": "example/model-1",
"score": 0.7,
"math_score": 0.8,
"physics_score": 0.6,
"total_tokens": 1000000,
"evaluation_time": 3600.0,
"system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке."
},
{
"model": "example/model-2",
"score": 0.6,
"math_score": 0.7,
"physics_score": 0.5,
"total_tokens": 800000,
"evaluation_time": 3000.0,
"system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке."
}
]
logging.warning("No model data found, using example models")
# Создаем DataFrame и сортируем по общему баллу
df = pd.DataFrame(results)
df.sort_values(by='score', ascending=False, inplace=True)
# Округляем числовые столбцы для красивого отображения
numeric_cols = df.select_dtypes(include=['number']).columns
if not numeric_cols.empty:
df[numeric_cols] = df[numeric_cols].round(3)
return df
def convert_old_format_to_deatmath(data):
"""
Конвертирует данные из старого формата Small Shlepa в формат DeathMath
Args:
data (dict): Данные модели в старом формате
Returns:
dict: Конвертированные данные в формате DeathMath
"""
# Проверяем, возможно это файл уже в формате DeathMath
if "score" in data:
return data
# Проверяем формат Small Shlepa с полями: musicmc, moviesmc, booksmc, lawmc, mmluproru
small_shlepa_fields = ["musicmc", "moviesmc", "booksmc", "lawmc", "mmluproru", "model"]
is_shlepa_format = any(field in data for field in small_shlepa_fields)
if is_shlepa_format:
logging.info(f"Конвертация модели из формата Small Shlepa в формат DeathMath: {data.get('model', 'Unknown')}")
# Конвертируем данные с примерным соответствием:
# math_score = среднее(musicmc, booksmc, mmluproru)
# physics_score = lawmc или moviesmc
math_score = 0.0
math_components = 0
if "musicmc" in data and data["musicmc"] is not None:
math_score += float(data["musicmc"])
math_components += 1
if "booksmc" in data and data["booksmc"] is not None:
math_score += float(data["booksmc"])
math_components += 1
if "mmluproru" in data and data["mmluproru"] is not None:
math_score += float(data["mmluproru"])
math_components += 1
if math_components > 0:
math_score /= math_components
# Для physics_score используем значение lawmc или moviesmc (что доступно)
physics_score = 0.0
if "lawmc" in data and data["lawmc"] is not None:
physics_score = float(data["lawmc"])
elif "moviesmc" in data and data["moviesmc"] is not None:
physics_score = float(data["moviesmc"])
# Общий скор - среднее арифметическое
avg_score = (math_score + physics_score) / 2 if math_score or physics_score else 0.0
converted_data = {
"model_name": data.get("model", "Unknown"),
"score": avg_score,
"math_score": math_score,
"physics_score": physics_score,
"total_tokens": int(data.get("total_tokens", 0)),
"evaluation_time": float(data.get("evaluation_time", 0.0)),
"system_prompt": data.get("system_prompt",
"Вы - полезный помощник по математике и физике. Ответьте на русском языке.")
}
return converted_data
# Если формат неизвестен, возвращаем стандартный шаблон
logging.warning(f"Неизвестный формат данных модели, использую шаблон")
return {
"model_name": data.get("model_name", data.get("model", "Unknown")),
"score": 0.0,
"math_score": 0.0,
"physics_score": 0.0,
"total_tokens": 0,
"evaluation_time": 0.0,
"system_prompt": "Вы - полезный помощник по математике и физике. Ответьте на русском языке."
}