import json import os from collections import defaultdict from functools import lru_cache import argilla as rg import pandas as pd from dotenv import load_dotenv load_dotenv() # Constants DATA_DIR = "data" PARTICIPANTS_CSV = os.path.join(DATA_DIR, "participants.csv") EQUIPOS_CSV = os.path.join(DATA_DIR, "equipos.csv") LEADERBOARD_PERSONAL_CSV = "leaderboard_personal.csv" LEADERBOARD_EQUIPOS_CSV = "leaderboard_equipos.csv" # Column mappings for participants info COLUMN_MAP = { "gmail": "Dirección de correo electrónico", "discord": "¿Cuál es tu nombre en Discord?", "hf_username": "¿Cuál es tu nombre en el Hub de Hugging Face?", "contact_email": "Email de contacto", } # Initialize Argilla client try: client = rg.Argilla( api_url=os.getenv("ARGILLA_API_URL", ""), api_key=os.getenv("ARGILLA_API_KEY", ""), ) except Exception as e: print(f"Error initializing Argilla client: {e}") client = None # Countries data countries = { "Argentina": {"iso": "ARG", "emoji": "🇦🇷"}, "Bolivia": {"iso": "BOL", "emoji": "🇧🇴"}, "Chile": {"iso": "CHL", "emoji": "🇨🇱"}, "Colombia": {"iso": "COL", "emoji": "🇨🇴"}, "Costa Rica": {"iso": "CRI", "emoji": "🇨🇷"}, "Cuba": {"iso": "CUB", "emoji": "🇨🇺"}, "Ecuador": {"iso": "ECU", "emoji": "🇪🇨"}, "El Salvador": {"iso": "SLV", "emoji": "🇸🇻"}, "España": {"iso": "ESP", "emoji": "🇪🇸"}, "Guatemala": {"iso": "GTM", "emoji": "🇬🇹"}, "Honduras": {"iso": "HND", "emoji": "🇭🇳"}, "México": {"iso": "MEX", "emoji": "🇲🇽"}, "Nicaragua": {"iso": "NIC", "emoji": "🇳🇮"}, "Panamá": {"iso": "PAN", "emoji": "🇵🇦"}, "Paraguay": {"iso": "PRY", "emoji": "🇵🇾"}, "Perú": {"iso": "PER", "emoji": "🇵🇪"}, "Puerto Rico": {"iso": "PRI", "emoji": "🇵🇷"}, "República Dominicana": {"iso": "DOM", "emoji": "🇩🇴"}, "Uruguay": {"iso": "URY", "emoji": "🇺🇾"}, "Venezuela": {"iso": "VEN", "emoji": "🇻🇪"}, } @lru_cache(maxsize=1) def get_user_mapping(): """Get cached mapping of emails and hf_usernames to discord usernames.""" if not os.path.exists(PARTICIPANTS_CSV): return {}, {} try: df = pd.read_csv(PARTICIPANTS_CSV) email_to_discord = {} hf_to_discord = {} for _, row in df.iterrows(): discord = row.get(COLUMN_MAP["discord"], "") if pd.notna(discord) and discord != "NA": discord_lower = discord.lower() # Map gmail to discord gmail = row.get(COLUMN_MAP["gmail"], "") if pd.notna(gmail) and gmail.strip(): email_to_discord[gmail.lower()] = discord_lower # Map contact_email to discord contact_email = row.get(COLUMN_MAP["contact_email"], "") if pd.notna(contact_email) and contact_email.strip(): email_to_discord[contact_email.lower()] = discord_lower # Map hf_username to discord hf_username = row.get(COLUMN_MAP["hf_username"], "") if pd.notna(hf_username) and hf_username.strip(): hf_to_discord[hf_username.lower()] = discord_lower return email_to_discord, hf_to_discord except Exception as e: print(f"Error loading {PARTICIPANTS_CSV}: {e}") return {}, {} def get_discord_username(identifier): """Get discord username from email or hf_username.""" email_to_discord, hf_to_discord = get_user_mapping() if "@" in identifier: return email_to_discord.get(identifier.lower(), identifier.split("@")[0]) return hf_to_discord.get(identifier.lower(), identifier) def get_participant_info(): """Get participant information from CSV.""" if not os.path.exists(PARTICIPANTS_CSV): return {} try: df = pd.read_csv(PARTICIPANTS_CSV) participant_info = {} for _, row in df.iterrows(): discord_username = row.get(COLUMN_MAP["discord"], "") if pd.notna(discord_username) and discord_username != "NA": participant_info[discord_username.lower()] = { "gmail": row.get(COLUMN_MAP["gmail"], ""), "discord_username": discord_username, "hf_username": row.get(COLUMN_MAP["hf_username"], ""), "email": row.get(COLUMN_MAP["contact_email"], ""), } return participant_info except Exception as e: print(f"Error loading participant info: {e}") return {} def get_blend_es_data(): """Get blend-es data from Argilla.""" if not client: return [] data = [] for country, info in countries.items(): dataset_name = f"{info['emoji']} {country} - {info['iso']} - Responder" try: dataset = client.datasets(dataset_name) records = list(dataset.records(with_responses=True)) user_counts = defaultdict(int) user_mapping = {} for record in records: if "answer_1" in record.responses: for answer in record.responses["answer_1"]: if answer.user_id: user_id = answer.user_id user_counts[user_id] += 1 if user_id not in user_mapping: try: user = client.users(id=user_id) user_mapping[user_id] = user.username except: user_mapping[user_id] = f"User-{user_id[:8]}" for user_id, count in user_counts.items(): hf_username = user_mapping.get(user_id, f"User-{user_id[:8]}") username = get_discord_username(hf_username) data.append( {"source": "blend-es", "username": username, "count": count} ) except Exception as e: print(f"Error processing {dataset_name}: {e}") return data def get_include_data(): """Get include data from CSV.""" csv_path = os.path.join(DATA_DIR, "include.csv") if not os.path.exists(csv_path): return [] try: df = pd.read_csv(csv_path) username_col = "Nombre en Discord / username" questions_col = "Total preguntas hackathon" if username_col not in df.columns or questions_col not in df.columns: return [] user_counts = defaultdict(int) for _, row in df.iterrows(): username = row[username_col][1:] if pd.notna(row[username_col]) else "" questions = row[questions_col] if pd.notna(row[questions_col]) else 0 if username and questions: user_counts[username.lower()] += int(questions) return [ {"source": "include", "username": username, "count": count} for username, count in user_counts.items() ] except Exception as e: print(f"Error loading include data: {e}") return [] def get_estereotipos_data(): """Get estereotipos data from CSV.""" csv_path = os.path.join(DATA_DIR, "stereotypes.csv") if not os.path.exists(csv_path): return [] try: df = pd.read_csv(csv_path) if "token_id" not in df.columns or "count" not in df.columns: return [] user_counts = defaultdict(int) for _, row in df.iterrows(): mail = row.get("token_id", "") count = row.get("count", 0) if pd.notna(mail) and pd.notna(count): user_counts[mail.lower()] += int(count) return [ { "source": "estereotipos", "username": get_discord_username(mail), "count": count, } for mail, count in user_counts.items() ] except Exception as e: print(f"Error loading estereotipos data: {e}") return [] def get_arena_data(): """Get arena data from CSV.""" csv_path = os.path.join(DATA_DIR, "arena_data_cruzada.csv") if not os.path.exists(csv_path): return [] try: df = pd.read_csv(csv_path) # Check if username column exists if "username" not in df.columns: print("Error: 'username' column not found in arena_data_cruzada.csv") return [] user_counts = defaultdict(int) for _, row in df.iterrows(): username = row.get("username", "") if pd.notna(username) and username.strip(): user_counts[username.lower()] += 1 return [ {"source": "arena", "username": get_discord_username(email), "count": count} for email, count in user_counts.items() ] except Exception as e: print(f"Error loading arena data: {e}") return [] def create_challenge_leaderboards(display_df): """Create individual CSV files for each challenge.""" # Create leaderboards directory if it doesn't exist import os leaderboards_dir = "leaderboards" os.makedirs(leaderboards_dir, exist_ok=True) for challenge in ["Arena", "Blend-ES", "Estereotipos", "INCLUDE"]: if challenge in display_df.columns: # Create challenge-specific dataframe with only username and challenge score challenge_df = display_df[["Username", challenge]].copy() # Sort by score (descending) and then by username (ascending) for ties challenge_df = challenge_df.sort_values( [challenge, "Username"], ascending=[False, True] ) # Generate filenames in leaderboards directory clean_challenge = challenge.replace(" ", "_").replace("-", "_") csv_filename = os.path.join( leaderboards_dir, f"leaderboard_{clean_challenge.lower()}.csv" ) txt_filename = os.path.join( leaderboards_dir, f"leaderboard_{clean_challenge.lower()}.txt" ) # Save to CSV (include all participants) challenge_df.to_csv(csv_filename, index=False, encoding="utf-8") print(f"Created {csv_filename} with {len(challenge_df)} participants") # Save to TXT as markdown table (exclude users with 0 scores) with open(txt_filename, "w", encoding="utf-8") as f: f.write(f"# {challenge} Leaderboard\n\n") f.write("| Puesto | Discord ID | Puntuación |\n") f.write("|------|----------|-------|\n") rank = 1 for _, row in challenge_df.iterrows(): username = row["Username"] score = row[challenge] # Skip users with 0 scores if score == 0: continue # Use medal emojis for top 3 ranks if rank == 1: rank_display = "🥇" elif rank == 2: rank_display = "🥈" elif rank == 3: rank_display = "🥉" else: rank_display = str(rank) f.write(f"| {rank_display} | {username} | {score} |\n") rank += 1 print( f"Created {txt_filename} with markdown table format (excluding 0 scores)" ) # Show top 5 scores print(f"Top 5 {challenge} scores:") for i, (_, row) in enumerate(challenge_df.head().iterrows(), 1): print(f" {i}. {row['Username']}: {row[challenge]}") print() def calculate_personal_scores(): """Consolidate all data sources and create leaderboard.""" # Collect all data all_data = ( get_blend_es_data() + get_include_data() + get_estereotipos_data() + get_arena_data() ) # Get participant info participant_info = get_participant_info() # Aggregate user contributions user_contributions = defaultdict( lambda: { "username": "", "gmail": "", "discord_username": "", "hf_username": "", "email": "", "blend_es": 0, "include": 0, "estereotipos": 0, "arena": 0, } ) for item in all_data: source = item["source"] username = item["username"] count = item["count"] user_key = username.lower() if not user_contributions[user_key]["username"]: user_contributions[user_key]["username"] = username if username.lower() in participant_info: info = participant_info[username.lower()] user_contributions[user_key].update( { "gmail": info["gmail"], "discord_username": info["discord_username"], "hf_username": info["hf_username"], "email": info["email"], } ) if source == "blend-es": user_contributions[user_key]["blend_es"] += count elif source == "include": user_contributions[user_key]["include"] += count elif source == "estereotipos": user_contributions[user_key]["estereotipos"] += count elif source == "arena": user_contributions[user_key]["arena"] += count # Create dataframes full_rows = [] display_rows = [] for data in user_contributions.values(): # Full data for CSV full_rows.append( { "Username": data["username"], "Gmail": data["gmail"], "Discord_Username": data["discord_username"], "HF_Username": data["hf_username"], "Email": data["email"], "Arena": data["arena"], "Blend-ES": data["blend_es"], "Estereotipos": data["estereotipos"], "INCLUDE": data["include"], } ) # Display data for UI (public) display_rows.append( { "Username": data["username"], "Arena": data["arena"], "Blend-ES": data["blend_es"], "Estereotipos": data["estereotipos"], "INCLUDE": data["include"], } ) # Save full data to CSV full_df = pd.DataFrame(full_rows) if not full_df.empty: full_df.sort_values("Arena", ascending=False, inplace=True) full_df.to_csv( os.path.join(DATA_DIR, LEADERBOARD_PERSONAL_CSV), index=False, encoding="utf-8", ) # Return display dataframe for UI display_df = pd.DataFrame(display_rows) if not display_df.empty: display_df.sort_values("Arena", ascending=False, inplace=True) display_df.to_csv( os.path.join(LEADERBOARD_PERSONAL_CSV), index=False, encoding="utf-8" ) # Create individual challenge leaderboards print("\nCreating individual challenge leaderboards...") create_challenge_leaderboards(display_df) return display_df if __name__ == "__main__": calculate_personal_scores()