import gradio as gr import asyncio import websockets import uuid import argparse from datetime import datetime import os import random import time # Fun usernames with emojis - the VIP list of quirky characters! πŸŽ‰πŸ˜œ FUN_USERNAMES = [ "CosmicJester 🌌", "PixelPanda 🐼", "QuantumQuack πŸ¦†", "StellarSquirrel 🐿️", "GizmoGuru βš™οΈ", "NebulaNinja 🌠", "ByteBuster πŸ’Ύ", "GalacticGopher 🌍", "RocketRaccoon πŸš€", "EchoElf 🧝", "PhantomFox 🦊", "WittyWizard πŸ§™", "LunarLlama πŸŒ™", "SolarSloth β˜€οΈ", "AstroAlpaca πŸ¦™", "CyberCoyote 🐺", "MysticMoose 🦌", "GlitchGnome 🧚", "VortexViper 🐍", "ChronoChimp πŸ’" ] # Directory for chat logs - the secret vault where tales are stashed! πŸ—„οΈπŸ”’ CHAT_DIR = "chat_logs" os.makedirs(CHAT_DIR, exist_ok=True) # Persistent chat file - the grand tome of all chatter! πŸ“–βœ¨ CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md") # Node name - the app’s codename generator, sneaky and slick! πŸ•΅οΈβ€β™‚οΈπŸ’Ύ def get_node_name(): """🎲 Spins the wheel of fate to name our node - a random alias or user pick! 🏷️""" parser = argparse.ArgumentParser(description='Start a chat node with a specific name') parser.add_argument('--node-name', type=str, default=None, help='Name for this chat node') parser.add_argument('--port', type=int, default=7860, help='Port to run the Gradio interface on') args = parser.parse_args() return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port # Chat saver - the scribe etching epic messages into the eternal scroll! πŸ–‹οΈπŸ“œ def save_chat_entry(username, message): """πŸ–ŒοΈ Carves a chat line into the grand Markdown tome - history in the making! πŸ›οΈ""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") entry = f"[{timestamp}] {username}: {message}" try: with open(CHAT_FILE, 'a') as f: f.write(f"{entry}\n") return True except Exception as e: print(f"Oops! Failed to save chat: {e}") return False # Chat loader - the archaeologist unearthing the chat saga! β›οΈπŸ“š def load_chat(): """πŸ” Digs up the chat treasure from the filesystem - tales of old and new! πŸ’°""" if not os.path.exists(CHAT_FILE): with open(CHAT_FILE, 'w') as f: f.write("# Global Chat\n\nNo messages yet - start chatting! 🎀\n") try: with open(CHAT_FILE, 'r') as f: content = f.read() lines = content.strip().split('\n') numbered_content = "\n".join(f"{i+1}. {line}" for i, line in enumerate(lines) if line.strip()) return numbered_content except Exception as e: print(f"Chat load hiccup: {e}") return "# Error loading chat\nSomething went wonky! 😡" # User list grabber - the social butterfly spotting all the chatters! πŸ¦‹πŸ‘₯ def get_user_list(chat_content): """πŸ‘€ Peeks at the chat to spot all the cool cats talking - who’s in the club? 🎸""" users = set() for line in chat_content.split('\n'): if line.strip() and ': ' in line: user = line.split(': ')[1].split(' ')[0] users.add(user) return sorted(list(users)) active_connections = {} # WebSocket handler - the bouncer at the chat rave, keeping it hopping! πŸŽ‰πŸšͺ async def websocket_handler(websocket, path): """🎧 Guards the chat gate, letting messages fly and booting crashers! 🚨""" try: client_id = str(uuid.uuid4()) room_id = "chat" active_connections.setdefault(room_id, {})[client_id] = websocket print(f"Client {client_id} joined the chat party!") async for message in websocket: try: parts = message.split('|', 1) if len(parts) == 2: username, content = parts save_chat_entry(username, content) await broadcast_message(f"{username}|{content}", room_id) except Exception as e: print(f"Message mishap: {e}") await websocket.send(f"ERROR|Oops, bad message format! 😬") except websockets.ConnectionClosed: print(f"Client {client_id} bailed from the chat!") finally: if room_id in active_connections and client_id in active_connections[room_id]: del active_connections[room_id][client_id] if not active_connections[room_id]: del active_connections[room_id] # Broadcaster - the megaphone blasting chat vibes to all! πŸ“£πŸŽΆ async def broadcast_message(message, room_id): """πŸ“’ Shouts the latest chat beat to every dancer in the room - hear it loud! 🎡""" if room_id in active_connections: disconnected = [] for client_id, ws in active_connections[room_id].items(): try: await ws.send(message) except websockets.ConnectionClosed: disconnected.append(client_id) for client_id in disconnected: del active_connections[room_id][client_id] # WebSocket starter - the DJ spinning up the chat tunes! 🎧πŸ”₯ async def start_websocket_server(host='0.0.0.0', port=8765): """🌐 Cranks up the WebSocket jukebox, ready to rock the chat scene! 🎸""" server = await websockets.serve(websocket_handler, host, port) print(f"WebSocket server jamming on ws://{host}:{port}") return server # Chat interface maker - the stage builder for our chat extravaganza! 🎭🏟️ def create_gradio_interface(initial_username): """πŸ–ŒοΈ Sets up the chat stage with live updates and name-switching flair! 🌟""" with gr.Blocks(title=f"Chat Node: {NODE_NAME}", css=".chat-box { font-family: monospace; background: #1e1e1e; color: #d4d4d4; padding: 10px; border-radius: 5px; }") as demo: # Shared state - the magic memory box all clients peek into! πŸͺ„πŸ“¦ chat_state = gr.State(value=load_chat()) gr.Markdown(f"# Chat Node: {NODE_NAME}") gr.Markdown("Chat live, switch names, and keep the party going! πŸŽ‰") with gr.Row(): with gr.Column(scale=3): chat_display = gr.Code(label="Chat History", language="markdown", lines=15, elem_classes=["chat-box"]) with gr.Column(scale=1): user_list = gr.Dropdown( label="Switch User", choices=get_user_list(chat_state.value), value=initial_username, allow_custom_value=True # Let the initial username stick! 🎯 ) with gr.Row(): username_input = gr.Textbox(label="Your Name", value=initial_username) message_input = gr.Textbox(label="Message", placeholder="Type your epic line here! ✍️") send_button = gr.Button("Send πŸš€") # Chat updater - the wizard keeping the chat spell alive! πŸ§™β€β™‚οΈβœ¨ def update_chat(username, message, current_chat): """🎩 Conjures a fresh chat view with your latest quip - abracadabra! πŸŽ‡""" if message.strip(): save_chat_entry(username, message) new_chat = load_chat() return new_chat, get_user_list(new_chat) # Name switcher - the shapeshifter swapping your chat persona! πŸ¦Έβ€β™‚οΈπŸŽ­ def switch_user(new_username, current_chat): """πŸ¦„ Transforms you into a new chat hero - new name, same game! πŸ†""" return new_username, current_chat, get_user_list(current_chat) # Live chat persistence - the timekeeper syncing the chat clock! β°πŸ”„ async def persist_chat(): """⏳ Keeps the chat tome in sync, scribbling updates every few ticks! πŸ“""" while True: await asyncio.sleep(5) # Persist every 5 seconds with open(CHAT_FILE, 'r') as f: current = f.read() chat_state.value = current # Hook up the magic! 🎣 send_button.click( fn=update_chat, inputs=[username_input, message_input, chat_state], outputs=[chat_display, user_list] ) message_input.submit( fn=update_chat, inputs=[username_input, message_input, chat_state], outputs=[chat_display, user_list] ) user_list.change( fn=switch_user, inputs=[user_list, chat_state], outputs=[username_input, chat_display, user_list] ) # Start with the latest chat vibes! 🎡 demo.load(lambda: (load_chat(), get_user_list(load_chat())), None, [chat_display, user_list]) # Kick off the persistence dance! πŸ’ƒ asyncio.create_task(persist_chat()) return demo # Main event - the ringmaster kicking off the chat circus! πŸŽͺ🀑 async def main(): """🎀 Drops the mic and starts the chat party - it’s showtime, folks! πŸŽ‰""" global NODE_NAME NODE_NAME, port = get_node_name() await start_websocket_server() # Grab username from URL or roll the dice! 🎲 initial_username = os.environ.get("QUERY_STRING", "").split("username=")[-1] if "username=" in os.environ.get("QUERY_STRING", "") else random.choice(FUN_USERNAMES) print(f"Welcoming {initial_username} to the chat bash!") interface = create_gradio_interface(initial_username) # URL param magic - the gatekeeper checking your VIP pass! 🎟️ from starlette.middleware.base import BaseHTTPMiddleware class UsernameMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): """πŸ”‘ Snags your name from the URL VIP list - exclusive entry! 🎩""" query_params = dict(request.query_params) if "username" in query_params and query_params["username"] in FUN_USERNAMES: global initial_username initial_username = query_params["username"] return await call_next(request) app = gr.routes.App.create_app(interface) app.add_middleware(UsernameMiddleware) import uvicorn await uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=port)).serve() if __name__ == "__main__": asyncio.run(main())