Spaces:
Running
Running
# main.py - FastAPI application for Pokemon Livestream | |
import asyncio | |
import os | |
import random | |
import time | |
import traceback | |
import logging | |
from typing import List, Dict, Optional, Set | |
from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
from fastapi.responses import HTMLResponse | |
from fastapi.staticfiles import StaticFiles | |
# --- Imports for poke_env and agents --- | |
from poke_env.player import Player | |
from poke_env import AccountConfiguration, ServerConfiguration | |
from poke_env.environment.battle import Battle | |
# Import the actual agent classes | |
from agents import OpenAIAgent, GeminiAgent, MistralAgent | |
# --- Configuration --- | |
CUSTOM_SERVER_URL = "wss://jofthomas.com/showdown/websocket" | |
CUSTOM_ACTION_URL = 'https://play.pokemonshowdown.com/action.php?' | |
CUSTOM_BATTLE_VIEW_URL_TEMPLATE = "https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}" | |
custom_config = ServerConfiguration(CUSTOM_SERVER_URL, CUSTOM_ACTION_URL) | |
DEFAULT_BATTLE_FORMAT = "gen9randombattle" | |
# Define available agents with their corresponding classes | |
AGENT_CONFIGS = { | |
"OpenAIAgent": {"class": OpenAIAgent, "password_env_var": "OPENAI_AGENT_PASSWORD"}, | |
"GeminiAgent": {"class": GeminiAgent, "password_env_var": "GEMINI_AGENT_PASSWORD"}, | |
"MistralAgent": {"class": MistralAgent, "password_env_var": "MISTRAL_AGENT_PASSWORD"}, | |
} | |
# Filter out agents with missing passwords | |
AVAILABLE_AGENT_NAMES = [ | |
name for name, cfg in AGENT_CONFIGS.items() | |
if os.environ.get(cfg.get("password_env_var", "")) | |
] | |
if not AVAILABLE_AGENT_NAMES: | |
print("FATAL ERROR: No agent configurations have their required password environment variables set. Exiting.") | |
exit(1) | |
# --- Global State Variables --- | |
active_agent_name: Optional[str] = None | |
active_agent_instance: Optional[Player] = None | |
active_agent_task: Optional[asyncio.Task] = None | |
current_battle_instance: Optional[Battle] = None | |
background_task_handle: Optional[asyncio.Task] = None | |
# --- Create FastAPI app --- | |
app = FastAPI(title="Pokemon Battle Livestream") | |
# --- WebSocket connection manager --- | |
class ConnectionManager: | |
def __init__(self): | |
self.active_connections: Set[WebSocket] = set() | |
self.current_html: str = create_idle_html("Initializing...", "Setting up Pokemon Battle Stream") | |
async def connect(self, websocket: WebSocket): | |
await websocket.accept() | |
self.active_connections.add(websocket) | |
# Send current state to newly connected client | |
await websocket.send_text(self.current_html) | |
def disconnect(self, websocket: WebSocket): | |
self.active_connections.remove(websocket) | |
async def update_all(self, html: str): | |
"""Update the current HTML and broadcast to all clients""" | |
self.current_html = html | |
if self.active_connections: | |
# Only log if there are connections to update | |
print(f"Broadcasting update to {len(self.active_connections)} clients") | |
# Make a copy of the connections set to avoid modification during iteration | |
connections_copy = self.active_connections.copy() | |
for connection in connections_copy: | |
try: | |
await connection.send_text(html) | |
except Exception as e: | |
print(f"Error sending to client: {e}") | |
# Don't remove here - will be handled by disconnect route | |
manager = ConnectionManager() | |
# --- Helper Functions --- | |
def get_active_battle(agent: Player) -> Optional[Battle]: | |
"""Returns the first non-finished battle for an agent.""" | |
if agent and agent._battles: | |
active_battles = [b for b in agent._battles.values() if not b.finished] | |
if active_battles: | |
if active_battles[0].battle_tag: | |
return active_battles[0] | |
else: | |
print(f"WARN: Found active battle for {agent.username} but it has no battle_tag yet.") | |
return None | |
return None | |
def create_battle_iframe(battle_id: str) -> str: | |
"""Creates the HTML for the battle iframe.""" | |
print("CURRENT BATTLE ID: ", battle_id) | |
battle_url = f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}" | |
return f""" | |
<html> | |
<head> | |
<title>Pokemon Battle Livestream</title> | |
<style> | |
body, html {{ | |
margin: 0; | |
padding: 0; | |
height: 100%; | |
overflow: hidden; | |
}} | |
iframe {{ | |
position: fixed; | |
top: 0; | |
left: 0; | |
bottom: 0; | |
right: 0; | |
width: 100%; | |
height: 100%; | |
border: none; | |
margin: 0; | |
padding: 0; | |
overflow: hidden; | |
z-index: 999999; | |
}} | |
</style> | |
</head> | |
<body> | |
<iframe | |
src="{battle_url}" | |
allowfullscreen | |
></iframe> | |
</body> | |
</html> | |
""" | |
def create_idle_html(status_message: str, instruction: str) -> str: | |
"""Creates a visually appealing idle screen HTML.""" | |
return f""" | |
<html> | |
<head> | |
<title>Pokemon Battle Livestream</title> | |
<style> | |
body, html {{ | |
margin: 0; | |
padding: 0; | |
height: 100%; | |
overflow: hidden; | |
font-family: Arial, sans-serif; | |
}} | |
.container {{ | |
position: fixed; | |
top: 0; | |
left: 0; | |
width: 100%; | |
height: 100%; | |
display: flex; | |
flex-direction: column; | |
justify-content: center; | |
align-items: center; | |
background-image: url('/static/pokemon_huggingface.png'); | |
background-size: cover; | |
background-position: center; | |
z-index: 999999; | |
}} | |
.message-box {{ | |
background-color: rgba(0, 0, 0, 0.65); | |
padding: 30px; | |
border-radius: 15px; | |
max-width: 80%; | |
text-align: center; | |
}} | |
.status {{ | |
font-size: 2.5em; | |
margin-bottom: 20px; | |
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.8); | |
color: white; | |
}} | |
.instruction {{ | |
font-size: 1.5em; | |
text-shadow: 1px 1px 3px rgba(0, 0, 0, 0.8); | |
color: white; | |
}} | |
</style> | |
</head> | |
<body> | |
<div class="container"> | |
<div class="message-box"> | |
<p class="status">{status_message}</p> | |
<p class="instruction">{instruction}</p> | |
</div> | |
</div> | |
</body> | |
</html> | |
""" | |
def create_error_html(error_msg: str) -> str: | |
"""Creates HTML to display an error message.""" | |
return f""" | |
<html> | |
<head> | |
<title>Pokemon Battle Livestream - Error</title> | |
<style> | |
body, html {{ | |
margin: 0; | |
padding: 0; | |
height: 100%; | |
overflow: hidden; | |
font-family: Arial, sans-serif; | |
}} | |
.container {{ | |
position: fixed; | |
top: 0; | |
left: 0; | |
width: 100%; | |
height: 100%; | |
display: flex; | |
flex-direction: column; | |
justify-content: center; | |
align-items: center; | |
background-color: #330000; | |
z-index: 999999; | |
}} | |
.message-box {{ | |
background-color: rgba(200, 0, 0, 0.7); | |
padding: 30px; | |
border-radius: 15px; | |
max-width: 80%; | |
text-align: center; | |
}} | |
.title {{ | |
font-size: 2em; | |
margin-bottom: 20px; | |
color: white; | |
}} | |
.message {{ | |
font-size: 1.2em; | |
color: #ffdddd; | |
}} | |
</style> | |
</head> | |
<body> | |
<div class="container"> | |
<div class="message-box"> | |
<p class="title">An Error Occurred</p> | |
<p class="message">{error_msg}</p> | |
</div> | |
</div> | |
</body> | |
</html> | |
""" | |
async def update_display_html(new_html: str) -> None: | |
"""Updates the current display HTML and broadcasts to all clients.""" | |
await manager.update_all(new_html) | |
print("HTML Display UPDATED and broadcasted.") | |
# --- Agent Lifecycle Management --- | |
async def select_and_activate_new_agent(): | |
"""Selects a random available agent, instantiates it, and starts its listening task.""" | |
global active_agent_name, active_agent_instance, active_agent_task | |
if not AVAILABLE_AGENT_NAMES: | |
print("Lifecycle: No available agents with passwords set.") | |
await update_display_html(create_error_html("No agents available (check password env vars).")) | |
return False | |
selected_name = random.choice(AVAILABLE_AGENT_NAMES) | |
config = AGENT_CONFIGS[selected_name] | |
AgentClass = config["class"] | |
password_env_var = config["password_env_var"] | |
agent_password = os.environ.get(password_env_var) | |
print(f"Lifecycle: Activating agent '{selected_name}'...") | |
await update_display_html(create_idle_html("Selecting Next Agent...", f"Preparing {selected_name}...")) | |
try: | |
account_config = AccountConfiguration(selected_name, agent_password) | |
agent = AgentClass( | |
account_configuration=account_config, | |
server_configuration=custom_config, | |
battle_format=DEFAULT_BATTLE_FORMAT, | |
log_level=logging.INFO, | |
max_concurrent_battles=1 | |
) | |
# Start the task to accept exactly one battle challenge | |
task = asyncio.create_task(agent.accept_challenges(None, 1), name=f"accept_challenge_{selected_name}") | |
task.add_done_callback(log_task_exception) | |
# Update global state | |
active_agent_name = selected_name | |
active_agent_instance = agent | |
active_agent_task = task | |
print(f"Lifecycle: Agent '{selected_name}' is active and listening for 1 challenge.") | |
await update_display_html(create_idle_html(f"Agent <strong>{selected_name}</strong> is ready!", | |
f"Please challenge <strong>{selected_name}</strong> to a <strong>{DEFAULT_BATTLE_FORMAT}</strong> battle.")) | |
return True | |
except Exception as e: | |
error_msg = f"Failed to activate agent '{selected_name}': {e}" | |
print(error_msg) | |
traceback.print_exc() | |
# Clear state if activation failed | |
active_agent_name = None | |
active_agent_instance = None | |
active_agent_task = None | |
return False | |
async def check_for_new_battle(): | |
"""Checks if the active agent has started a battle.""" | |
global active_agent_instance, current_battle_instance | |
if active_agent_instance: | |
battle = get_active_battle(active_agent_instance) | |
if battle and battle.battle_tag: | |
print(f"Lifecycle: Agent '{active_agent_name}' started battle: {battle.battle_tag}") | |
current_battle_instance = battle | |
# Prevent accepting more challenges | |
if active_agent_task and not active_agent_task.done(): | |
print(f"Lifecycle: Cancelling accept_challenges task for {active_agent_name} as battle started.") | |
active_agent_task.cancel() | |
async def deactivate_current_agent(reason: str = "cycle"): | |
"""Cleans up the currently active agent and resets state.""" | |
global active_agent_name, active_agent_instance, active_agent_task, current_battle_instance | |
print(f"Lifecycle: Deactivating agent '{active_agent_name}' (Reason: {reason})...") | |
# Different messages based on deactivation reason | |
if reason == "battle_end": | |
await update_display_html(create_idle_html("Battle Finished", "Preparing to select next agent...")) | |
elif reason == "cycle": | |
await update_display_html(create_idle_html("Cycling Agents", "Preparing for next selection...")) | |
else: | |
await update_display_html(create_idle_html(f"Agent Reset ({reason})", "Preparing for next selection...")) | |
agent = active_agent_instance | |
task = active_agent_task | |
# Store a local copy of the battle instance before clearing it | |
last_battle_instance = current_battle_instance | |
# Clear state first to prevent race conditions | |
active_agent_name = None | |
active_agent_instance = None | |
active_agent_task = None | |
current_battle_instance = None | |
# Cancel the accept_challenges task if it's still running | |
if task and not task.done(): | |
print(f"Lifecycle: Cancelling task for {agent.username if agent else 'unknown agent'}...") | |
task.cancel() | |
try: | |
await asyncio.wait_for(task, timeout=2.0) | |
except asyncio.CancelledError: | |
print(f"Lifecycle: Task cancellation confirmed.") | |
except asyncio.TimeoutError: | |
print(f"Lifecycle: Task did not confirm cancellation within timeout.") | |
except Exception as e: | |
print(f"Lifecycle: Error during task cancellation wait: {e}") | |
# Disconnect the player | |
if agent: | |
print(f"Lifecycle: Disconnecting player {agent.username}...") | |
try: | |
if hasattr(agent, '_websocket') and agent._websocket and agent._websocket.open: | |
await agent.disconnect() | |
print(f"Lifecycle: Player {agent.username} disconnected.") | |
else: | |
print(f"Lifecycle: Player {agent.username} already disconnected or websocket not ready.") | |
except Exception as e: | |
print(f"ERROR during agent disconnect ({agent.username}): {e}") | |
# Add a brief delay after deactivation to ensure clean state | |
await asyncio.sleep(3) | |
print(f"Lifecycle: Agent deactivation complete.") | |
# --- Main Background Task --- | |
async def manage_agent_lifecycle(): | |
"""Runs the main loop selecting, running, and cleaning up agents sequentially.""" | |
global active_agent_instance, active_agent_task, current_battle_instance | |
print("Background lifecycle manager started.") | |
REFRESH_INTERVAL_SECONDS = 3 | |
loop_counter = 0 | |
while True: | |
loop_counter += 1 | |
try: | |
print(f"\n--- Lifecycle Check #{loop_counter} [{time.strftime('%H:%M:%S')}] ---") | |
# ================================== | |
# State 1: No agent active | |
# ================================== | |
if active_agent_instance is None: | |
print(f"[{loop_counter}] State 1: No active agent. Selecting...") | |
activated = await select_and_activate_new_agent() | |
if not activated: | |
print(f"[{loop_counter}] State 1: Activation failed. Waiting before retry.") | |
await asyncio.sleep(10) | |
continue | |
else: | |
print(f"[{loop_counter}] State 1: Agent '{active_agent_name}' activated.") | |
# ================================== | |
# State 2: Agent is active | |
# ================================== | |
else: | |
print(f"[{loop_counter}] State 2: Agent '{active_agent_name}' active.") | |
# --- Sub-state: Monitoring an ongoing battle --- | |
if current_battle_instance is not None: | |
battle_tag = current_battle_instance.battle_tag | |
print(f"[{loop_counter}] State 2: Monitoring battle {battle_tag}") | |
# Get the potentially updated battle object from the agent's internal list | |
battle_obj = active_agent_instance._battles.get(battle_tag) | |
if battle_obj and battle_obj.finished: | |
# Battle is finished, deactivate agent | |
print(f"[{loop_counter}] Battle {battle_tag} is FINISHED. Deactivating agent.") | |
await deactivate_current_agent(reason="battle_end") | |
await asyncio.sleep(5) # Brief cooldown between battles | |
continue | |
elif not battle_obj: | |
# Battle object missing unexpectedly, deactivate agent | |
print(f"[{loop_counter}] WARNING: Battle object for {battle_tag} MISSING from agent's list! Deactivating.") | |
await deactivate_current_agent(reason="battle_object_missing") | |
continue | |
else: | |
# Battle is ongoing and battle object exists | |
pass | |
# --- Sub-state: Checking for a new battle / Agent is idle --- | |
elif current_battle_instance is None: | |
print(f"[{loop_counter}] State 2: Checking for new battle...") | |
await check_for_new_battle() | |
# --- If a NEW battle was just detected by check_for_new_battle() --- | |
if current_battle_instance: | |
battle_tag = current_battle_instance.battle_tag | |
print(f"[{loop_counter}] State 2: *** NEW BATTLE DETECTED: {battle_tag} ***") | |
# Check if the battle ID has the non-public/suffixed format | |
parts = battle_tag.split('-') | |
is_suffixed_format = len(parts) > 3 and parts[2].isdigit() | |
if is_suffixed_format: | |
# Non-public format detected, forfeit immediately | |
print(f"[{loop_counter}] Detected non-public battle format ({battle_tag}). Forfeiting.") | |
await update_display_html(create_idle_html(f"Forfeiting non-public battle...", f"Agent {active_agent_name} detected private room.")) | |
try: | |
# Send forfeit command TO THE SPECIFIC BATTLE ROOM | |
await active_agent_instance.send_message("/forfeit", battle_tag) | |
print(f"[{loop_counter}] Sent /forfeit command for {battle_tag}.") | |
await asyncio.sleep(1) # Give a moment for forfeit to process server-side | |
except Exception as forfeit_err: | |
print(f"[{loop_counter}] ERROR sending forfeit command for {battle_tag}: {forfeit_err}") | |
# Deactivate the agent as the battle is effectively over | |
await deactivate_current_agent(reason="forfeited_private_battle") | |
continue | |
else: | |
# Public battle format detected, display the battle iframe | |
print(f"[{loop_counter}] Public battle format detected. Displaying battle.") | |
await update_display_html(create_battle_iframe(battle_tag)) | |
# --- If no new battle was found, agent remains idle --- | |
else: | |
print(f"[{loop_counter}] State 2: No new battle found. Agent '{active_agent_name}' remains idle.") | |
expected_idle_html = create_idle_html(f"Agent <strong>{active_agent_name}</strong> is ready!", | |
f"Please challenge <strong>{active_agent_name}</strong> to a <strong>{DEFAULT_BATTLE_FORMAT}</strong> battle.") | |
await update_display_html(expected_idle_html) | |
# --- Global Exception Handling for the main loop --- | |
except Exception as e: | |
print(f"ERROR in main lifecycle loop #{loop_counter}: {e}") | |
traceback.print_exc() | |
if active_agent_instance: | |
# If an error happened while an agent was active, try to deactivate it | |
await deactivate_current_agent(reason="main_loop_error") | |
else: | |
# If error happened before agent activation, show generic error | |
await update_display_html(create_error_html(f"Error in lifecycle manager: {e}")) | |
# Wait longer after a major error before trying again | |
await asyncio.sleep(10) | |
# --- Sleep at the end of each loop iteration (if no 'continue' was hit) --- | |
print(f"[{loop_counter}] Sleeping {REFRESH_INTERVAL_SECONDS}s before next check.") | |
await asyncio.sleep(REFRESH_INTERVAL_SECONDS) | |
def log_task_exception(task: asyncio.Task): | |
"""Callback to log exceptions from background tasks.""" | |
try: | |
if task.cancelled(): | |
print(f"Task {task.get_name()} was cancelled.") | |
return | |
task.result() # Raises exception if task failed | |
print(f"Task {task.get_name()} finished cleanly.") | |
except asyncio.CancelledError: | |
pass | |
except Exception as e: | |
print(f"Exception in background task {task.get_name()}: {e}") | |
traceback.print_exc() | |
# --- API Routes --- | |
async def get_homepage(): | |
"""Serves the main HTML page with WebSocket connection.""" | |
return """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Pokemon Battle Livestream</title> | |
<style> | |
body, html { | |
margin: 0; | |
padding: 0; | |
height: 100%; | |
overflow: hidden; | |
} | |
#stream-container { | |
position: fixed; | |
top: 0; | |
left: 0; | |
bottom: 0; | |
right: 0; | |
width: 100%; | |
height: 100%; | |
} | |
</style> | |
</head> | |
<body> | |
<div id="stream-container"></div> | |
<script> | |
const streamContainer = document.getElementById('stream-container'); | |
// Create WebSocket connection | |
const ws = new WebSocket(`${location.protocol === 'https:' ? 'wss' : 'ws'}://${location.host}/ws`); | |
// Connection opened | |
ws.addEventListener('open', (event) => { | |
console.log('Connected to WebSocket server'); | |
}); | |
// Listen for messages | |
ws.addEventListener('message', (event) => { | |
console.log('Received update from server'); | |
// Set inner HTML directly - complete document replacement | |
const parser = new DOMParser(); | |
const doc = parser.parseFromString(event.data, 'text/html'); | |
const bodyContent = doc.body.innerHTML; | |
// Update just the body content to avoid page refresh flicker | |
if (bodyContent) { | |
streamContainer.innerHTML = bodyContent; | |
} | |
}); | |
// Handle connection close/errors | |
ws.addEventListener('close', (event) => { | |
console.log('WebSocket connection closed'); | |
setTimeout(() => { | |
console.log('Attempting to reconnect...'); | |
location.reload(); | |
}, 3000); | |
}); | |
ws.addEventListener('error', (event) => { | |
console.error('WebSocket error:', event); | |
}); | |
</script> | |
</body> | |
</html> | |
""" | |
async def websocket_endpoint(websocket: WebSocket): | |
await manager.connect(websocket) | |
try: | |
while True: | |
# Just keep the connection open | |
# The updates are pushed from the lifecycle manager | |
await websocket.receive_text() | |
except WebSocketDisconnect: | |
manager.disconnect(websocket) | |
except Exception as e: | |
print(f"WebSocket error: {e}") | |
manager.disconnect(websocket) | |
async def startup_event(): | |
"""Start background tasks when the application starts.""" | |
global background_task_handle | |
# Mount static files directory for images | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
print("🚀 Starting background tasks") | |
background_task_handle = asyncio.create_task(manage_agent_lifecycle(), name="lifecycle_manager") | |
background_task_handle.add_done_callback(log_task_exception) | |
print("✅ Background tasks started") | |
async def shutdown_event(): | |
"""Clean up tasks when shutting down.""" | |
global background_task_handle, active_agent_instance | |
print("\nShutting down app. Cleaning up...") | |
# Cancel background task | |
if background_task_handle and not background_task_handle.done(): | |
background_task_handle.cancel() | |
try: | |
await asyncio.wait_for(background_task_handle, timeout=5.0) | |
except (asyncio.CancelledError, asyncio.TimeoutError): | |
pass | |
# Disconnect agent if active | |
if active_agent_instance: | |
try: | |
if hasattr(active_agent_instance, '_websocket') and active_agent_instance._websocket and active_agent_instance._websocket.open: | |
await active_agent_instance.disconnect() | |
except Exception as e: | |
print(f"Error during agent disconnect on shutdown: {e}") | |
print("Cleanup complete. Shutting down.") | |
# For direct script execution | |
if __name__ == "__main__": | |
import uvicorn | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logging.getLogger('poke_env').setLevel(logging.WARNING) | |
print("Starting Pokemon Battle Livestream Server...") | |
print("="*50) | |
if not AVAILABLE_AGENT_NAMES: | |
print("FATAL: No agents found with configured passwords. Please set environment variables:") | |
for name, cfg in AGENT_CONFIGS.items(): | |
print(f"- {cfg.get('password_env_var', 'N/A')} (for agent: {name})") | |
print("="*50) | |
exit("Exiting due to missing agent passwords.") | |
else: | |
print("Found available agents:") | |
for name in AVAILABLE_AGENT_NAMES: | |
print(f"- {name}") | |
print("="*50) | |
# Run with uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |