import streamlit as st import streamlit.components.v1 as components import asyncio import websockets import uuid import os import random import time import hashlib from datetime import datetime import pytz import nest_asyncio import edge_tts from audio_recorder_streamlit import audio_recorder # Patch asyncio for nesting nest_asyncio.apply() # Page Config st.set_page_config(page_title="Galaxian Snake 3D Multiplayer", layout="wide") st.title("Galaxian Snake 3D Multiplayer") st.write("Navigate a 3D city with continuous motion, eat food, and chat in real-time!") # Sliders for container size max_width = min(1200, st.session_state.get('window_width', 1200)) max_height = min(1600, st.session_state.get('window_height', 1600)) col1, col2 = st.columns(2) with col1: container_width = st.slider("Container Width (px)", 300, max_width, 768, step=50) with col2: container_height = st.slider("Container Height (px)", 400, max_height, 1024, step=50) # Session State Initialization def init_session_state(): defaults = { 'server_running': False, 'active_connections': {}, 'chat_history': [], 'last_chat_update': 0, 'username': None, 'tts_voice': "en-US-AriaNeural", 'audio_cache': {} } for k, v in defaults.items(): if k not in st.session_state: st.session_state[k] = v init_session_state() # Usernames and Voices FUN_USERNAMES = { "CosmicJester 🌌": "en-US-AriaNeural", "PixelPanda 🐼": "en-US-JennyNeural", "QuantumQuack 🦆": "en-GB-SoniaNeural", "StellarSquirrel 🐿️": "en-AU-NatashaNeural", "GizmoGuru ⚙️": "en-CA-ClaraNeural", "NebulaNinja 🌠": "en-US-GuyNeural", } if not st.session_state.username: st.session_state.username = random.choice(list(FUN_USERNAMES.keys())) st.session_state.tts_voice = FUN_USERNAMES[st.session_state.username] # Chat File CHAT_FILE = "chat_logs/global_chat.md" os.makedirs("chat_logs", exist_ok=True) if not os.path.exists(CHAT_FILE): with open(CHAT_FILE, 'a') as f: f.write("# Multiplayer Snake Chat\n\nWelcome to the cosmic city! 🎤\n") # Audio Processing async def async_edge_tts_generate(text, voice, username): cache_key = f"{text[:100]}_{voice}" if cache_key in st.session_state['audio_cache']: return st.session_state['audio_cache'][cache_key] filename = f"audio_logs/{datetime.now().strftime('%Y%m%d_%H%M%S')}-by-{username}-{hashlib.md5(text.encode()).hexdigest()[:8]}.mp3" os.makedirs("audio_logs", exist_ok=True) communicate = edge_tts.Communicate(text, voice) await communicate.save(filename) if os.path.exists(filename) and os.path.getsize(filename) > 0: st.session_state['audio_cache'][cache_key] = filename return filename return None def play_and_download_audio(file_path): if file_path and os.path.exists(file_path): with open(file_path, "rb") as f: audio_bytes = f.read() st.audio(audio_bytes, format="audio/mp3") b64 = base64.b64encode(audio_bytes).decode() st.markdown(f'🎵 Download {os.path.basename(file_path)}', unsafe_allow_html=True) # WebSocket Handling async def websocket_handler(websocket, path): client_id = str(uuid.uuid4()) room_id = "snake_chat" if room_id not in st.session_state.active_connections: st.session_state.active_connections[room_id] = {} st.session_state.active_connections[room_id][client_id] = websocket username = st.session_state.username await broadcast_message(f"System|{username} has joined the game!", room_id) try: async for message in websocket: if '|' in message: sender, content = message.split('|', 1) await save_chat_entry(sender, content) else: await websocket.send("ERROR|Message format: username|content") except websockets.ConnectionClosed: await broadcast_message(f"System|{username} has left the game!", room_id) finally: if room_id in st.session_state.active_connections and client_id in st.session_state.active_connections[room_id]: del st.session_state.active_connections[room_id][client_id] async def broadcast_message(message, room_id): if room_id in st.session_state.active_connections: disconnected = [] for client_id, ws in st.session_state.active_connections[room_id].items(): try: await ws.send(message) except websockets.ConnectionClosed: disconnected.append(client_id) for client_id in disconnected: if client_id in st.session_state.active_connections[room_id]: del st.session_state.active_connections[room_id][client_id] async def start_websocket_server(): if not st.session_state.get('server_running', False): server = await websockets.serve(websocket_handler, '0.0.0.0', 8765) st.session_state['server_running'] = True st.session_state['server'] = server await asyncio.Future() # Chat Functions async def save_chat_entry(username, message): central = pytz.timezone('US/Central') timestamp = datetime.now(central).strftime("%Y-%m-%d %H:%M:%S") entry = f"[{timestamp}] {username}: {message}" with open(CHAT_FILE, 'a') as f: f.write(f"{entry}\n") audio_file = await async_edge_tts_generate(message, FUN_USERNAMES.get(username, "en-US-AriaNeural"), username) if audio_file: play_and_download_audio(audio_file) await broadcast_message(f"{username}|{message}", "snake_chat") st.session_state.chat_history.append(entry) st.session_state.last_chat_update = time.time() async def load_chat(): with open(CHAT_FILE, 'r') as f: content = f.read().strip() return content.split('\n') # Game HTML html_code = f"""
Controls: W/A/S/D or Arrow Keys to steer
Eat yellow cubes to grow and score!