Spaces:
Runtime error
Runtime error
import json | |
import os | |
from collections import defaultdict | |
from functools import lru_cache | |
import argilla as rg | |
import pandas as pd | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Constants | |
DATA_DIR = "data" | |
PARTICIPANTS_CSV = os.path.join(DATA_DIR, "participants.csv") | |
EQUIPOS_CSV = os.path.join(DATA_DIR, "equipos.csv") | |
LEADERBOARD_PERSONAL_CSV = "leaderboard_personal.csv" | |
LEADERBOARD_EQUIPOS_CSV = "leaderboard_equipos.csv" | |
# Column mappings for participants info | |
COLUMN_MAP = { | |
"gmail": "Dirección de correo electrónico", | |
"discord": "¿Cuál es tu nombre en Discord?", | |
"hf_username": "¿Cuál es tu nombre en el Hub de Hugging Face?", | |
"contact_email": "Email de contacto", | |
} | |
# Initialize Argilla client | |
try: | |
client = rg.Argilla( | |
api_url=os.getenv("ARGILLA_API_URL", ""), | |
api_key=os.getenv("ARGILLA_API_KEY", ""), | |
) | |
except Exception as e: | |
print(f"Error initializing Argilla client: {e}") | |
client = None | |
# Countries data | |
countries = { | |
"Argentina": {"iso": "ARG", "emoji": "🇦🇷"}, | |
"Bolivia": {"iso": "BOL", "emoji": "🇧🇴"}, | |
"Chile": {"iso": "CHL", "emoji": "🇨🇱"}, | |
"Colombia": {"iso": "COL", "emoji": "🇨🇴"}, | |
"Costa Rica": {"iso": "CRI", "emoji": "🇨🇷"}, | |
"Cuba": {"iso": "CUB", "emoji": "🇨🇺"}, | |
"Ecuador": {"iso": "ECU", "emoji": "🇪🇨"}, | |
"El Salvador": {"iso": "SLV", "emoji": "🇸🇻"}, | |
"España": {"iso": "ESP", "emoji": "🇪🇸"}, | |
"Guatemala": {"iso": "GTM", "emoji": "🇬🇹"}, | |
"Honduras": {"iso": "HND", "emoji": "🇭🇳"}, | |
"México": {"iso": "MEX", "emoji": "🇲🇽"}, | |
"Nicaragua": {"iso": "NIC", "emoji": "🇳🇮"}, | |
"Panamá": {"iso": "PAN", "emoji": "🇵🇦"}, | |
"Paraguay": {"iso": "PRY", "emoji": "🇵🇾"}, | |
"Perú": {"iso": "PER", "emoji": "🇵🇪"}, | |
"Puerto Rico": {"iso": "PRI", "emoji": "🇵🇷"}, | |
"República Dominicana": {"iso": "DOM", "emoji": "🇩🇴"}, | |
"Uruguay": {"iso": "URY", "emoji": "🇺🇾"}, | |
"Venezuela": {"iso": "VEN", "emoji": "🇻🇪"}, | |
} | |
def get_user_mapping(): | |
"""Get cached mapping of emails and hf_usernames to discord usernames.""" | |
if not os.path.exists(PARTICIPANTS_CSV): | |
return {}, {} | |
try: | |
df = pd.read_csv(PARTICIPANTS_CSV) | |
email_to_discord = {} | |
hf_to_discord = {} | |
for _, row in df.iterrows(): | |
discord = row.get(COLUMN_MAP["discord"], "") | |
if pd.notna(discord) and discord != "NA": | |
discord_lower = discord.lower() | |
# Map gmail to discord | |
gmail = row.get(COLUMN_MAP["gmail"], "") | |
if pd.notna(gmail) and gmail.strip(): | |
email_to_discord[gmail.lower()] = discord_lower | |
# Map contact_email to discord | |
contact_email = row.get(COLUMN_MAP["contact_email"], "") | |
if pd.notna(contact_email) and contact_email.strip(): | |
email_to_discord[contact_email.lower()] = discord_lower | |
# Map hf_username to discord | |
hf_username = row.get(COLUMN_MAP["hf_username"], "") | |
if pd.notna(hf_username) and hf_username.strip(): | |
hf_to_discord[hf_username.lower()] = discord_lower | |
return email_to_discord, hf_to_discord | |
except Exception as e: | |
print(f"Error loading {PARTICIPANTS_CSV}: {e}") | |
return {}, {} | |
def get_discord_username(identifier): | |
"""Get discord username from email or hf_username.""" | |
email_to_discord, hf_to_discord = get_user_mapping() | |
if "@" in identifier: | |
return email_to_discord.get(identifier.lower(), identifier.split("@")[0]) | |
return hf_to_discord.get(identifier.lower(), identifier) | |
def get_participant_info(): | |
"""Get participant information from CSV.""" | |
if not os.path.exists(PARTICIPANTS_CSV): | |
return {} | |
try: | |
df = pd.read_csv(PARTICIPANTS_CSV) | |
participant_info = {} | |
for _, row in df.iterrows(): | |
discord_username = row.get(COLUMN_MAP["discord"], "") | |
if pd.notna(discord_username) and discord_username != "NA": | |
participant_info[discord_username.lower()] = { | |
"gmail": row.get(COLUMN_MAP["gmail"], ""), | |
"discord_username": discord_username, | |
"hf_username": row.get(COLUMN_MAP["hf_username"], ""), | |
"email": row.get(COLUMN_MAP["contact_email"], ""), | |
} | |
return participant_info | |
except Exception as e: | |
print(f"Error loading participant info: {e}") | |
return {} | |
def get_blend_es_data(): | |
"""Get blend-es data from Argilla.""" | |
if not client: | |
return [] | |
data = [] | |
for country, info in countries.items(): | |
dataset_name = f"{info['emoji']} {country} - {info['iso']} - Responder" | |
try: | |
dataset = client.datasets(dataset_name) | |
records = list(dataset.records(with_responses=True)) | |
user_counts = defaultdict(int) | |
user_mapping = {} | |
for record in records: | |
if "answer_1" in record.responses: | |
for answer in record.responses["answer_1"]: | |
if answer.user_id: | |
user_id = answer.user_id | |
user_counts[user_id] += 1 | |
if user_id not in user_mapping: | |
try: | |
user = client.users(id=user_id) | |
user_mapping[user_id] = user.username | |
except: | |
user_mapping[user_id] = f"User-{user_id[:8]}" | |
for user_id, count in user_counts.items(): | |
hf_username = user_mapping.get(user_id, f"User-{user_id[:8]}") | |
username = get_discord_username(hf_username) | |
data.append( | |
{"source": "blend-es", "username": username, "count": count} | |
) | |
except Exception as e: | |
print(f"Error processing {dataset_name}: {e}") | |
return data | |
def get_include_data(): | |
"""Get include data from CSV.""" | |
csv_path = os.path.join(DATA_DIR, "include.csv") | |
if not os.path.exists(csv_path): | |
return [] | |
try: | |
df = pd.read_csv(csv_path) | |
username_col = "Nombre en Discord / username" | |
questions_col = "Total preguntas hackathon" | |
if username_col not in df.columns or questions_col not in df.columns: | |
return [] | |
user_counts = defaultdict(int) | |
for _, row in df.iterrows(): | |
username = row[username_col][1:] if pd.notna(row[username_col]) else "" | |
questions = row[questions_col] if pd.notna(row[questions_col]) else 0 | |
if username and questions: | |
user_counts[username.lower()] += int(questions) | |
return [ | |
{"source": "include", "username": username, "count": count} | |
for username, count in user_counts.items() | |
] | |
except Exception as e: | |
print(f"Error loading include data: {e}") | |
return [] | |
def get_estereotipos_data(): | |
"""Get estereotipos data from CSV.""" | |
csv_path = os.path.join(DATA_DIR, "stereotypes.csv") | |
if not os.path.exists(csv_path): | |
return [] | |
try: | |
df = pd.read_csv(csv_path) | |
if "token_id" not in df.columns or "count" not in df.columns: | |
return [] | |
user_counts = defaultdict(int) | |
for _, row in df.iterrows(): | |
mail = row.get("token_id", "") | |
count = row.get("count", 0) | |
if pd.notna(mail) and pd.notna(count): | |
user_counts[mail.lower()] += int(count) | |
return [ | |
{ | |
"source": "estereotipos", | |
"username": get_discord_username(mail), | |
"count": count, | |
} | |
for mail, count in user_counts.items() | |
] | |
except Exception as e: | |
print(f"Error loading estereotipos data: {e}") | |
return [] | |
def get_arena_data(): | |
"""Get arena data from CSV.""" | |
csv_path = os.path.join(DATA_DIR, "arena_data_cruzada.csv") | |
if not os.path.exists(csv_path): | |
return [] | |
try: | |
df = pd.read_csv(csv_path) | |
# Check if username column exists | |
if "username" not in df.columns: | |
print("Error: 'username' column not found in arena_data_cruzada.csv") | |
return [] | |
user_counts = defaultdict(int) | |
for _, row in df.iterrows(): | |
username = row.get("username", "") | |
if pd.notna(username) and username.strip(): | |
user_counts[username.lower()] += 1 | |
return [ | |
{"source": "arena", "username": get_discord_username(email), "count": count} | |
for email, count in user_counts.items() | |
] | |
except Exception as e: | |
print(f"Error loading arena data: {e}") | |
return [] | |
def create_challenge_leaderboards(display_df): | |
"""Create individual CSV files for each challenge.""" | |
# Create leaderboards directory if it doesn't exist | |
import os | |
leaderboards_dir = "leaderboards" | |
os.makedirs(leaderboards_dir, exist_ok=True) | |
for challenge in ["Arena", "Blend-ES", "Estereotipos", "INCLUDE"]: | |
if challenge in display_df.columns: | |
# Create challenge-specific dataframe with only username and challenge score | |
challenge_df = display_df[["Username", challenge]].copy() | |
# Sort by score (descending) and then by username (ascending) for ties | |
challenge_df = challenge_df.sort_values( | |
[challenge, "Username"], ascending=[False, True] | |
) | |
# Generate filenames in leaderboards directory | |
clean_challenge = challenge.replace(" ", "_").replace("-", "_") | |
csv_filename = os.path.join( | |
leaderboards_dir, f"leaderboard_{clean_challenge.lower()}.csv" | |
) | |
txt_filename = os.path.join( | |
leaderboards_dir, f"leaderboard_{clean_challenge.lower()}.txt" | |
) | |
# Save to CSV (include all participants) | |
challenge_df.to_csv(csv_filename, index=False, encoding="utf-8") | |
print(f"Created {csv_filename} with {len(challenge_df)} participants") | |
# Save to TXT as markdown table (exclude users with 0 scores) | |
with open(txt_filename, "w", encoding="utf-8") as f: | |
f.write(f"# {challenge} Leaderboard\n\n") | |
f.write("| Puesto | Discord ID | Puntuación |\n") | |
f.write("|------|----------|-------|\n") | |
rank = 1 | |
for _, row in challenge_df.iterrows(): | |
username = row["Username"] | |
score = row[challenge] | |
# Skip users with 0 scores | |
if score == 0: | |
continue | |
# Use medal emojis for top 3 ranks | |
if rank == 1: | |
rank_display = "🥇" | |
elif rank == 2: | |
rank_display = "🥈" | |
elif rank == 3: | |
rank_display = "🥉" | |
else: | |
rank_display = str(rank) | |
f.write(f"| {rank_display} | {username} | {score} |\n") | |
rank += 1 | |
print( | |
f"Created {txt_filename} with markdown table format (excluding 0 scores)" | |
) | |
# Show top 5 scores | |
print(f"Top 5 {challenge} scores:") | |
for i, (_, row) in enumerate(challenge_df.head().iterrows(), 1): | |
print(f" {i}. {row['Username']}: {row[challenge]}") | |
print() | |
def calculate_personal_scores(): | |
"""Consolidate all data sources and create leaderboard.""" | |
# Collect all data | |
all_data = ( | |
get_blend_es_data() | |
+ get_include_data() | |
+ get_estereotipos_data() | |
+ get_arena_data() | |
) | |
# Get participant info | |
participant_info = get_participant_info() | |
# Aggregate user contributions | |
user_contributions = defaultdict( | |
lambda: { | |
"username": "", | |
"gmail": "", | |
"discord_username": "", | |
"hf_username": "", | |
"email": "", | |
"blend_es": 0, | |
"include": 0, | |
"estereotipos": 0, | |
"arena": 0, | |
} | |
) | |
for item in all_data: | |
source = item["source"] | |
username = item["username"] | |
count = item["count"] | |
user_key = username.lower() | |
if not user_contributions[user_key]["username"]: | |
user_contributions[user_key]["username"] = username | |
if username.lower() in participant_info: | |
info = participant_info[username.lower()] | |
user_contributions[user_key].update( | |
{ | |
"gmail": info["gmail"], | |
"discord_username": info["discord_username"], | |
"hf_username": info["hf_username"], | |
"email": info["email"], | |
} | |
) | |
if source == "blend-es": | |
user_contributions[user_key]["blend_es"] += count | |
elif source == "include": | |
user_contributions[user_key]["include"] += count | |
elif source == "estereotipos": | |
user_contributions[user_key]["estereotipos"] += count | |
elif source == "arena": | |
user_contributions[user_key]["arena"] += count | |
# Create dataframes | |
full_rows = [] | |
display_rows = [] | |
for data in user_contributions.values(): | |
# Full data for CSV | |
full_rows.append( | |
{ | |
"Username": data["username"], | |
"Gmail": data["gmail"], | |
"Discord_Username": data["discord_username"], | |
"HF_Username": data["hf_username"], | |
"Email": data["email"], | |
"Arena": data["arena"], | |
"Blend-ES": data["blend_es"], | |
"Estereotipos": data["estereotipos"], | |
"INCLUDE": data["include"], | |
} | |
) | |
# Display data for UI (public) | |
display_rows.append( | |
{ | |
"Username": data["username"], | |
"Arena": data["arena"], | |
"Blend-ES": data["blend_es"], | |
"Estereotipos": data["estereotipos"], | |
"INCLUDE": data["include"], | |
} | |
) | |
# Save full data to CSV | |
full_df = pd.DataFrame(full_rows) | |
if not full_df.empty: | |
full_df.sort_values("Arena", ascending=False, inplace=True) | |
full_df.to_csv( | |
os.path.join(DATA_DIR, LEADERBOARD_PERSONAL_CSV), | |
index=False, | |
encoding="utf-8", | |
) | |
# Return display dataframe for UI | |
display_df = pd.DataFrame(display_rows) | |
if not display_df.empty: | |
display_df.sort_values("Arena", ascending=False, inplace=True) | |
display_df.to_csv( | |
os.path.join(LEADERBOARD_PERSONAL_CSV), index=False, encoding="utf-8" | |
) | |
# Create individual challenge leaderboards | |
print("\nCreating individual challenge leaderboards...") | |
create_challenge_leaderboards(display_df) | |
return display_df | |
if __name__ == "__main__": | |
calculate_personal_scores() | |