awacke1's picture
Update app.py
da22973 verified
raw
history blame
10.3 kB
import gradio as gr
import asyncio
import websockets
import uuid
import argparse
from datetime import datetime
import os
import random
import time
# Fun usernames with emojis - the VIP list of quirky characters! πŸŽ‰πŸ˜œ
FUN_USERNAMES = [
"CosmicJester 🌌", "PixelPanda 🐼", "QuantumQuack πŸ¦†", "StellarSquirrel 🐿️",
"GizmoGuru βš™οΈ", "NebulaNinja 🌠", "ByteBuster πŸ’Ύ", "GalacticGopher 🌍",
"RocketRaccoon πŸš€", "EchoElf 🧝", "PhantomFox 🦊", "WittyWizard πŸ§™",
"LunarLlama πŸŒ™", "SolarSloth β˜€οΈ", "AstroAlpaca πŸ¦™", "CyberCoyote 🐺",
"MysticMoose 🦌", "GlitchGnome 🧚", "VortexViper 🐍", "ChronoChimp πŸ’"
]
# Directory for chat logs - the secret vault where tales are stashed! πŸ—„οΈπŸ”’
CHAT_DIR = "chat_logs"
os.makedirs(CHAT_DIR, exist_ok=True)
# Persistent chat file - the grand tome of all chatter! πŸ“–βœ¨
CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md")
# Node name - the app’s codename generator, sneaky and slick! πŸ•΅οΈβ€β™‚οΈπŸ’Ύ
def get_node_name():
"""🎲 Spins the wheel of fate to name our node - a random alias or user pick! 🏷️"""
parser = argparse.ArgumentParser(description='Start a chat node with a specific name')
parser.add_argument('--node-name', type=str, default=None, help='Name for this chat node')
parser.add_argument('--port', type=int, default=7860, help='Port to run the Gradio interface on')
args = parser.parse_args()
return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port
# Chat saver - the scribe etching epic messages into the eternal scroll! πŸ–‹οΈπŸ“œ
def save_chat_entry(username, message):
"""πŸ–ŒοΈ Carves a chat line into the grand Markdown tome - history in the making! πŸ›οΈ"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] {username}: {message}"
try:
with open(CHAT_FILE, 'a') as f:
f.write(f"{entry}\n")
return True
except Exception as e:
print(f"Oops! Failed to save chat: {e}")
return False
# Chat loader - the archaeologist unearthing the chat saga! β›οΈπŸ“š
def load_chat():
"""πŸ” Digs up the chat treasure from the filesystem - tales of old and new! πŸ’°"""
if not os.path.exists(CHAT_FILE):
with open(CHAT_FILE, 'w') as f:
f.write("# Global Chat\n\nNo messages yet - start chatting! 🎀\n")
try:
with open(CHAT_FILE, 'r') as f:
content = f.read()
lines = content.strip().split('\n')
numbered_content = "\n".join(f"{i+1}. {line}" for i, line in enumerate(lines) if line.strip())
return numbered_content
except Exception as e:
print(f"Chat load hiccup: {e}")
return "# Error loading chat\nSomething went wonky! 😡"
# User list grabber - the social butterfly spotting all the chatters! πŸ¦‹πŸ‘₯
def get_user_list(chat_content):
"""πŸ‘€ Peeks at the chat to spot all the cool cats talking - who’s in the club? 🎸"""
users = set()
for line in chat_content.split('\n'):
if line.strip() and ': ' in line:
user = line.split(': ')[1].split(' ')[0]
users.add(user)
return sorted(list(users))
active_connections = {}
# WebSocket handler - the bouncer at the chat rave, keeping it hopping! πŸŽ‰πŸšͺ
async def websocket_handler(websocket, path):
"""🎧 Guards the chat gate, letting messages fly and booting crashers! 🚨"""
try:
client_id = str(uuid.uuid4())
room_id = "chat"
active_connections.setdefault(room_id, {})[client_id] = websocket
print(f"Client {client_id} joined the chat party!")
async for message in websocket:
try:
parts = message.split('|', 1)
if len(parts) == 2:
username, content = parts
save_chat_entry(username, content)
await broadcast_message(f"{username}|{content}", room_id)
except Exception as e:
print(f"Message mishap: {e}")
await websocket.send(f"ERROR|Oops, bad message format! 😬")
except websockets.ConnectionClosed:
print(f"Client {client_id} bailed from the chat!")
finally:
if room_id in active_connections and client_id in active_connections[room_id]:
del active_connections[room_id][client_id]
if not active_connections[room_id]:
del active_connections[room_id]
# Broadcaster - the megaphone blasting chat vibes to all! πŸ“£πŸŽΆ
async def broadcast_message(message, room_id):
"""πŸ“’ Shouts the latest chat beat to every dancer in the room - hear it loud! 🎡"""
if room_id in active_connections:
disconnected = []
for client_id, ws in active_connections[room_id].items():
try:
await ws.send(message)
except websockets.ConnectionClosed:
disconnected.append(client_id)
for client_id in disconnected:
del active_connections[room_id][client_id]
# WebSocket starter - the DJ spinning up the chat tunes! 🎧πŸ”₯
async def start_websocket_server(host='0.0.0.0', port=8765):
"""🌐 Cranks up the WebSocket jukebox, ready to rock the chat scene! 🎸"""
server = await websockets.serve(websocket_handler, host, port)
print(f"WebSocket server jamming on ws://{host}:{port}")
return server
# Chat interface maker - the stage builder for our chat extravaganza! 🎭🏟️
def create_gradio_interface(initial_username):
"""πŸ–ŒοΈ Sets up the chat stage with live updates and name-switching flair! 🌟"""
with gr.Blocks(title=f"Chat Node: {NODE_NAME}", css=".chat-box { font-family: monospace; background: #1e1e1e; color: #d4d4d4; padding: 10px; border-radius: 5px; }") as demo:
# Shared state - the magic memory box all clients peek into! πŸͺ„πŸ“¦
chat_state = gr.State(value=load_chat())
gr.Markdown(f"# Chat Node: {NODE_NAME}")
gr.Markdown("Chat live, switch names, and keep the party going! πŸŽ‰")
with gr.Row():
with gr.Column(scale=3):
chat_display = gr.Code(label="Chat History", language="markdown", lines=15, elem_classes=["chat-box"])
with gr.Column(scale=1):
user_list = gr.Dropdown(
label="Switch User",
choices=get_user_list(chat_state.value),
value=initial_username,
allow_custom_value=True # Let the initial username stick! 🎯
)
with gr.Row():
username_input = gr.Textbox(label="Your Name", value=initial_username)
message_input = gr.Textbox(label="Message", placeholder="Type your epic line here! ✍️")
send_button = gr.Button("Send πŸš€")
# Chat updater - the wizard keeping the chat spell alive! πŸ§™β€β™‚οΈβœ¨
def update_chat(username, message, current_chat):
"""🎩 Conjures a fresh chat view with your latest quip - abracadabra! πŸŽ‡"""
if message.strip():
save_chat_entry(username, message)
new_chat = load_chat()
return new_chat, get_user_list(new_chat)
# Name switcher - the shapeshifter swapping your chat persona! πŸ¦Έβ€β™‚οΈπŸŽ­
def switch_user(new_username, current_chat):
"""πŸ¦„ Transforms you into a new chat hero - new name, same game! πŸ†"""
return new_username, current_chat, get_user_list(current_chat)
# Live chat persistence - the timekeeper syncing the chat clock! β°πŸ”„
async def persist_chat():
"""⏳ Keeps the chat tome in sync, scribbling updates every few ticks! πŸ“"""
while True:
await asyncio.sleep(5) # Persist every 5 seconds
with open(CHAT_FILE, 'r') as f:
current = f.read()
chat_state.value = current
# Hook up the magic! 🎣
send_button.click(
fn=update_chat,
inputs=[username_input, message_input, chat_state],
outputs=[chat_display, user_list]
)
message_input.submit(
fn=update_chat,
inputs=[username_input, message_input, chat_state],
outputs=[chat_display, user_list]
)
user_list.change(
fn=switch_user,
inputs=[user_list, chat_state],
outputs=[username_input, chat_display, user_list]
)
# Start with the latest chat vibes! 🎡
demo.load(lambda: (load_chat(), get_user_list(load_chat())), None, [chat_display, user_list])
# Kick off the persistence dance! πŸ’ƒ
asyncio.create_task(persist_chat())
return demo
# Main event - the ringmaster kicking off the chat circus! πŸŽͺ🀑
async def main():
"""🎀 Drops the mic and starts the chat party - it’s showtime, folks! πŸŽ‰"""
global NODE_NAME
NODE_NAME, port = get_node_name()
await start_websocket_server()
# Grab username from URL or roll the dice! 🎲
initial_username = os.environ.get("QUERY_STRING", "").split("username=")[-1] if "username=" in os.environ.get("QUERY_STRING", "") else random.choice(FUN_USERNAMES)
print(f"Welcoming {initial_username} to the chat bash!")
interface = create_gradio_interface(initial_username)
# URL param magic - the gatekeeper checking your VIP pass! 🎟️
from starlette.middleware.base import BaseHTTPMiddleware
class UsernameMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
"""πŸ”‘ Snags your name from the URL VIP list - exclusive entry! 🎩"""
query_params = dict(request.query_params)
if "username" in query_params and query_params["username"] in FUN_USERNAMES:
global initial_username
initial_username = query_params["username"]
return await call_next(request)
app = gr.routes.App.create_app(interface)
app.add_middleware(UsernameMiddleware)
import uvicorn
await uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=port)).serve()
if __name__ == "__main__":
asyncio.run(main())