leaderboard-hackaton-2025 / calculate_personal_scores.py
mariagrandury's picture
update arena scores
a3d9457
import json
import os
from collections import defaultdict
from functools import lru_cache
import argilla as rg
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
# Constants
DATA_DIR = "data"
PARTICIPANTS_CSV = os.path.join(DATA_DIR, "participants.csv")
EQUIPOS_CSV = os.path.join(DATA_DIR, "equipos.csv")
LEADERBOARD_PERSONAL_CSV = "leaderboard_personal.csv"
LEADERBOARD_EQUIPOS_CSV = "leaderboard_equipos.csv"
# Column mappings for participants info
COLUMN_MAP = {
"gmail": "Dirección de correo electrónico",
"discord": "¿Cuál es tu nombre en Discord?",
"hf_username": "¿Cuál es tu nombre en el Hub de Hugging Face?",
"contact_email": "Email de contacto",
}
# Initialize Argilla client
try:
client = rg.Argilla(
api_url=os.getenv("ARGILLA_API_URL", ""),
api_key=os.getenv("ARGILLA_API_KEY", ""),
)
except Exception as e:
print(f"Error initializing Argilla client: {e}")
client = None
# Countries data
countries = {
"Argentina": {"iso": "ARG", "emoji": "🇦🇷"},
"Bolivia": {"iso": "BOL", "emoji": "🇧🇴"},
"Chile": {"iso": "CHL", "emoji": "🇨🇱"},
"Colombia": {"iso": "COL", "emoji": "🇨🇴"},
"Costa Rica": {"iso": "CRI", "emoji": "🇨🇷"},
"Cuba": {"iso": "CUB", "emoji": "🇨🇺"},
"Ecuador": {"iso": "ECU", "emoji": "🇪🇨"},
"El Salvador": {"iso": "SLV", "emoji": "🇸🇻"},
"España": {"iso": "ESP", "emoji": "🇪🇸"},
"Guatemala": {"iso": "GTM", "emoji": "🇬🇹"},
"Honduras": {"iso": "HND", "emoji": "🇭🇳"},
"México": {"iso": "MEX", "emoji": "🇲🇽"},
"Nicaragua": {"iso": "NIC", "emoji": "🇳🇮"},
"Panamá": {"iso": "PAN", "emoji": "🇵🇦"},
"Paraguay": {"iso": "PRY", "emoji": "🇵🇾"},
"Perú": {"iso": "PER", "emoji": "🇵🇪"},
"Puerto Rico": {"iso": "PRI", "emoji": "🇵🇷"},
"República Dominicana": {"iso": "DOM", "emoji": "🇩🇴"},
"Uruguay": {"iso": "URY", "emoji": "🇺🇾"},
"Venezuela": {"iso": "VEN", "emoji": "🇻🇪"},
}
@lru_cache(maxsize=1)
def get_user_mapping():
"""Get cached mapping of emails and hf_usernames to discord usernames."""
if not os.path.exists(PARTICIPANTS_CSV):
return {}, {}
try:
df = pd.read_csv(PARTICIPANTS_CSV)
email_to_discord = {}
hf_to_discord = {}
for _, row in df.iterrows():
discord = row.get(COLUMN_MAP["discord"], "")
if pd.notna(discord) and discord != "NA":
discord_lower = discord.lower()
# Map gmail to discord
gmail = row.get(COLUMN_MAP["gmail"], "")
if pd.notna(gmail) and gmail.strip():
email_to_discord[gmail.lower()] = discord_lower
# Map contact_email to discord
contact_email = row.get(COLUMN_MAP["contact_email"], "")
if pd.notna(contact_email) and contact_email.strip():
email_to_discord[contact_email.lower()] = discord_lower
# Map hf_username to discord
hf_username = row.get(COLUMN_MAP["hf_username"], "")
if pd.notna(hf_username) and hf_username.strip():
hf_to_discord[hf_username.lower()] = discord_lower
return email_to_discord, hf_to_discord
except Exception as e:
print(f"Error loading {PARTICIPANTS_CSV}: {e}")
return {}, {}
def get_discord_username(identifier):
"""Get discord username from email or hf_username."""
email_to_discord, hf_to_discord = get_user_mapping()
if "@" in identifier:
return email_to_discord.get(identifier.lower(), identifier.split("@")[0])
return hf_to_discord.get(identifier.lower(), identifier)
def get_participant_info():
"""Get participant information from CSV."""
if not os.path.exists(PARTICIPANTS_CSV):
return {}
try:
df = pd.read_csv(PARTICIPANTS_CSV)
participant_info = {}
for _, row in df.iterrows():
discord_username = row.get(COLUMN_MAP["discord"], "")
if pd.notna(discord_username) and discord_username != "NA":
participant_info[discord_username.lower()] = {
"gmail": row.get(COLUMN_MAP["gmail"], ""),
"discord_username": discord_username,
"hf_username": row.get(COLUMN_MAP["hf_username"], ""),
"email": row.get(COLUMN_MAP["contact_email"], ""),
}
return participant_info
except Exception as e:
print(f"Error loading participant info: {e}")
return {}
def get_blend_es_data():
"""Get blend-es data from Argilla."""
if not client:
return []
data = []
for country, info in countries.items():
dataset_name = f"{info['emoji']} {country} - {info['iso']} - Responder"
try:
dataset = client.datasets(dataset_name)
records = list(dataset.records(with_responses=True))
user_counts = defaultdict(int)
user_mapping = {}
for record in records:
if "answer_1" in record.responses:
for answer in record.responses["answer_1"]:
if answer.user_id:
user_id = answer.user_id
user_counts[user_id] += 1
if user_id not in user_mapping:
try:
user = client.users(id=user_id)
user_mapping[user_id] = user.username
except:
user_mapping[user_id] = f"User-{user_id[:8]}"
for user_id, count in user_counts.items():
hf_username = user_mapping.get(user_id, f"User-{user_id[:8]}")
username = get_discord_username(hf_username)
data.append(
{"source": "blend-es", "username": username, "count": count}
)
except Exception as e:
print(f"Error processing {dataset_name}: {e}")
return data
def get_include_data():
"""Get include data from CSV."""
csv_path = os.path.join(DATA_DIR, "include.csv")
if not os.path.exists(csv_path):
return []
try:
df = pd.read_csv(csv_path)
username_col = "Nombre en Discord / username"
questions_col = "Total preguntas hackathon"
if username_col not in df.columns or questions_col not in df.columns:
return []
user_counts = defaultdict(int)
for _, row in df.iterrows():
username = row[username_col][1:] if pd.notna(row[username_col]) else ""
questions = row[questions_col] if pd.notna(row[questions_col]) else 0
if username and questions:
user_counts[username.lower()] += int(questions)
return [
{"source": "include", "username": username, "count": count}
for username, count in user_counts.items()
]
except Exception as e:
print(f"Error loading include data: {e}")
return []
def get_estereotipos_data():
"""Get estereotipos data from CSV."""
csv_path = os.path.join(DATA_DIR, "stereotypes.csv")
if not os.path.exists(csv_path):
return []
try:
df = pd.read_csv(csv_path)
if "token_id" not in df.columns or "count" not in df.columns:
return []
user_counts = defaultdict(int)
for _, row in df.iterrows():
mail = row.get("token_id", "")
count = row.get("count", 0)
if pd.notna(mail) and pd.notna(count):
user_counts[mail.lower()] += int(count)
return [
{
"source": "estereotipos",
"username": get_discord_username(mail),
"count": count,
}
for mail, count in user_counts.items()
]
except Exception as e:
print(f"Error loading estereotipos data: {e}")
return []
def get_arena_data():
"""Get arena data from CSV."""
csv_path = os.path.join(DATA_DIR, "arena_data_cruzada.csv")
if not os.path.exists(csv_path):
return []
try:
df = pd.read_csv(csv_path)
# Check if username column exists
if "username" not in df.columns:
print("Error: 'username' column not found in arena_data_cruzada.csv")
return []
user_counts = defaultdict(int)
for _, row in df.iterrows():
username = row.get("username", "")
if pd.notna(username) and username.strip():
user_counts[username.lower()] += 1
return [
{"source": "arena", "username": get_discord_username(email), "count": count}
for email, count in user_counts.items()
]
except Exception as e:
print(f"Error loading arena data: {e}")
return []
def create_challenge_leaderboards(display_df):
"""Create individual CSV files for each challenge."""
# Create leaderboards directory if it doesn't exist
import os
leaderboards_dir = "leaderboards"
os.makedirs(leaderboards_dir, exist_ok=True)
for challenge in ["Arena", "Blend-ES", "Estereotipos", "INCLUDE"]:
if challenge in display_df.columns:
# Create challenge-specific dataframe with only username and challenge score
challenge_df = display_df[["Username", challenge]].copy()
# Sort by score (descending) and then by username (ascending) for ties
challenge_df = challenge_df.sort_values(
[challenge, "Username"], ascending=[False, True]
)
# Generate filenames in leaderboards directory
clean_challenge = challenge.replace(" ", "_").replace("-", "_")
csv_filename = os.path.join(
leaderboards_dir, f"leaderboard_{clean_challenge.lower()}.csv"
)
txt_filename = os.path.join(
leaderboards_dir, f"leaderboard_{clean_challenge.lower()}.txt"
)
# Save to CSV (include all participants)
challenge_df.to_csv(csv_filename, index=False, encoding="utf-8")
print(f"Created {csv_filename} with {len(challenge_df)} participants")
# Save to TXT as markdown table (exclude users with 0 scores)
with open(txt_filename, "w", encoding="utf-8") as f:
f.write(f"# {challenge} Leaderboard\n\n")
f.write("| Puesto | Discord ID | Puntuación |\n")
f.write("|------|----------|-------|\n")
rank = 1
for _, row in challenge_df.iterrows():
username = row["Username"]
score = row[challenge]
# Skip users with 0 scores
if score == 0:
continue
# Use medal emojis for top 3 ranks
if rank == 1:
rank_display = "🥇"
elif rank == 2:
rank_display = "🥈"
elif rank == 3:
rank_display = "🥉"
else:
rank_display = str(rank)
f.write(f"| {rank_display} | {username} | {score} |\n")
rank += 1
print(
f"Created {txt_filename} with markdown table format (excluding 0 scores)"
)
# Show top 5 scores
print(f"Top 5 {challenge} scores:")
for i, (_, row) in enumerate(challenge_df.head().iterrows(), 1):
print(f" {i}. {row['Username']}: {row[challenge]}")
print()
def calculate_personal_scores():
"""Consolidate all data sources and create leaderboard."""
# Collect all data
all_data = (
get_blend_es_data()
+ get_include_data()
+ get_estereotipos_data()
+ get_arena_data()
)
# Get participant info
participant_info = get_participant_info()
# Aggregate user contributions
user_contributions = defaultdict(
lambda: {
"username": "",
"gmail": "",
"discord_username": "",
"hf_username": "",
"email": "",
"blend_es": 0,
"include": 0,
"estereotipos": 0,
"arena": 0,
}
)
for item in all_data:
source = item["source"]
username = item["username"]
count = item["count"]
user_key = username.lower()
if not user_contributions[user_key]["username"]:
user_contributions[user_key]["username"] = username
if username.lower() in participant_info:
info = participant_info[username.lower()]
user_contributions[user_key].update(
{
"gmail": info["gmail"],
"discord_username": info["discord_username"],
"hf_username": info["hf_username"],
"email": info["email"],
}
)
if source == "blend-es":
user_contributions[user_key]["blend_es"] += count
elif source == "include":
user_contributions[user_key]["include"] += count
elif source == "estereotipos":
user_contributions[user_key]["estereotipos"] += count
elif source == "arena":
user_contributions[user_key]["arena"] += count
# Create dataframes
full_rows = []
display_rows = []
for data in user_contributions.values():
# Full data for CSV
full_rows.append(
{
"Username": data["username"],
"Gmail": data["gmail"],
"Discord_Username": data["discord_username"],
"HF_Username": data["hf_username"],
"Email": data["email"],
"Arena": data["arena"],
"Blend-ES": data["blend_es"],
"Estereotipos": data["estereotipos"],
"INCLUDE": data["include"],
}
)
# Display data for UI (public)
display_rows.append(
{
"Username": data["username"],
"Arena": data["arena"],
"Blend-ES": data["blend_es"],
"Estereotipos": data["estereotipos"],
"INCLUDE": data["include"],
}
)
# Save full data to CSV
full_df = pd.DataFrame(full_rows)
if not full_df.empty:
full_df.sort_values("Arena", ascending=False, inplace=True)
full_df.to_csv(
os.path.join(DATA_DIR, LEADERBOARD_PERSONAL_CSV),
index=False,
encoding="utf-8",
)
# Return display dataframe for UI
display_df = pd.DataFrame(display_rows)
if not display_df.empty:
display_df.sort_values("Arena", ascending=False, inplace=True)
display_df.to_csv(
os.path.join(LEADERBOARD_PERSONAL_CSV), index=False, encoding="utf-8"
)
# Create individual challenge leaderboards
print("\nCreating individual challenge leaderboards...")
create_challenge_leaderboards(display_df)
return display_df
if __name__ == "__main__":
calculate_personal_scores()