File size: 10,284 Bytes
b1fe0dd e2ffbc8 962f379 5ede71e b1fe0dd f9e683f 962f379 5ede71e 6862189 f9e683f b1fe0dd f9e683f b1fe0dd 6862189 a58b36c 5ede71e e2ffbc8 5ede71e e2ffbc8 5ede71e a58b36c 5ede71e 6862189 5ede71e 6862189 5ede71e d597cda 6862189 d597cda 5ede71e b1fe0dd 5ede71e b1fe0dd 5ede71e 962f379 5ede71e 962f379 b1fe0dd 5ede71e ca4200f 5ede71e ca4200f 5ede71e ca4200f 5ede71e ca4200f 5ede71e ca4200f 5ede71e ca4200f 5ede71e ca4200f 5ede71e ca4200f 5ede71e dfdec71 ca4200f 5ede71e ca4200f 5ede71e 6862189 ca4200f 6862189 5ede71e ca4200f 5ede71e ca4200f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
import gradio as gr
import asyncio
import websockets
import uuid
import argparse
from datetime import datetime
import os
import random
import time
# Fun usernames with emojis - the VIP list of quirky characters! ππ
FUN_USERNAMES = [
"CosmicJester π", "PixelPanda πΌ", "QuantumQuack π¦", "StellarSquirrel πΏοΈ",
"GizmoGuru βοΈ", "NebulaNinja π ", "ByteBuster πΎ", "GalacticGopher π",
"RocketRaccoon π", "EchoElf π§", "PhantomFox π¦", "WittyWizard π§",
"LunarLlama π", "SolarSloth βοΈ", "AstroAlpaca π¦", "CyberCoyote πΊ",
"MysticMoose π¦", "GlitchGnome π§", "VortexViper π", "ChronoChimp π"
]
# Directory for chat logs - the secret vault where tales are stashed! ποΈπ
CHAT_DIR = "chat_logs"
os.makedirs(CHAT_DIR, exist_ok=True)
# Persistent chat file - the grand tome of all chatter! πβ¨
CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md")
# Node name - the appβs codename generator, sneaky and slick! π΅οΈββοΈπΎ
def get_node_name():
"""π² Spins the wheel of fate to name our node - a random alias or user pick! π·οΈ"""
parser = argparse.ArgumentParser(description='Start a chat node with a specific name')
parser.add_argument('--node-name', type=str, default=None, help='Name for this chat node')
parser.add_argument('--port', type=int, default=7860, help='Port to run the Gradio interface on')
args = parser.parse_args()
return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port
# Chat saver - the scribe etching epic messages into the eternal scroll! ποΈπ
def save_chat_entry(username, message):
"""ποΈ Carves a chat line into the grand Markdown tome - history in the making! ποΈ"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] {username}: {message}"
try:
with open(CHAT_FILE, 'a') as f:
f.write(f"{entry}\n")
return True
except Exception as e:
print(f"Oops! Failed to save chat: {e}")
return False
# Chat loader - the archaeologist unearthing the chat saga! βοΈπ
def load_chat():
"""π Digs up the chat treasure from the filesystem - tales of old and new! π°"""
if not os.path.exists(CHAT_FILE):
with open(CHAT_FILE, 'w') as f:
f.write("# Global Chat\n\nNo messages yet - start chatting! π€\n")
try:
with open(CHAT_FILE, 'r') as f:
content = f.read()
lines = content.strip().split('\n')
numbered_content = "\n".join(f"{i+1}. {line}" for i, line in enumerate(lines) if line.strip())
return numbered_content
except Exception as e:
print(f"Chat load hiccup: {e}")
return "# Error loading chat\nSomething went wonky! π΅"
# User list grabber - the social butterfly spotting all the chatters! π¦π₯
def get_user_list(chat_content):
"""π Peeks at the chat to spot all the cool cats talking - whoβs in the club? πΈ"""
users = set()
for line in chat_content.split('\n'):
if line.strip() and ': ' in line:
user = line.split(': ')[1].split(' ')[0]
users.add(user)
return sorted(list(users))
active_connections = {}
# WebSocket handler - the bouncer at the chat rave, keeping it hopping! ππͺ
async def websocket_handler(websocket, path):
"""π§ Guards the chat gate, letting messages fly and booting crashers! π¨"""
try:
client_id = str(uuid.uuid4())
room_id = "chat"
active_connections.setdefault(room_id, {})[client_id] = websocket
print(f"Client {client_id} joined the chat party!")
async for message in websocket:
try:
# Parse message as simple text (no JSON)
parts = message.split('|', 1)
if len(parts) == 2:
username, content = parts
save_chat_entry(username, content)
await broadcast_message(f"{username}|{content}", room_id)
except Exception as e:
print(f"Message mishap: {e}")
await websocket.send(f"ERROR|Oops, bad message format! π¬")
except websockets.ConnectionClosed:
print(f"Client {client_id} bailed from the chat!")
finally:
if room_id in active_connections and client_id in active_connections[room_id]:
del active_connections[room_id][client_id]
if not active_connections[room_id]:
del active_connections[room_id]
# Broadcaster - the megaphone blasting chat vibes to all! π£πΆ
async def broadcast_message(message, room_id):
"""π’ Shouts the latest chat beat to every dancer in the room - hear it loud! π΅"""
if room_id in active_connections:
disconnected = []
for client_id, ws in active_connections[room_id].items():
try:
await ws.send(message)
except websockets.ConnectionClosed:
disconnected.append(client_id)
for client_id in disconnected:
del active_connections[room_id][client_id]
# WebSocket starter - the DJ spinning up the chat tunes! π§π₯
async def start_websocket_server(host='0.0.0.0', port=8765):
"""π Cranks up the WebSocket jukebox, ready to rock the chat scene! πΈ"""
server = await websockets.serve(websocket_handler, host, port)
print(f"WebSocket server jamming on ws://{host}:{port}")
return server
# Chat interface maker - the stage builder for our chat extravaganza! πποΈ
def create_gradio_interface(initial_username):
"""ποΈ Sets up the chat stage with live updates and name-switching flair! π"""
with gr.Blocks(title=f"Chat Node: {NODE_NAME}", css=".chat-box { font-family: monospace; background: #1e1e1e; color: #d4d4d4; padding: 10px; border-radius: 5px; }") as demo:
# Shared state - the magic memory box all clients peek into! πͺπ¦
chat_state = gr.State(value=load_chat())
gr.Markdown(f"# Chat Node: {NODE_NAME}")
gr.Markdown("Chat live, switch names, and keep the party going! π")
with gr.Row():
with gr.Column(scale=3):
chat_display = gr.Code(label="Chat History", language="markdown", lines=15, elem_classes=["chat-box"])
with gr.Column(scale=1):
user_list = gr.Dropdown(label="Switch User", choices=get_user_list(chat_state.value), value=initial_username)
with gr.Row():
username_input = gr.Textbox(label="Your Name", value=initial_username)
message_input = gr.Textbox(label="Message", placeholder="Type your epic line here! βοΈ")
send_button = gr.Button("Send π")
# Chat updater - the wizard keeping the chat spell alive! π§ββοΈβ¨
def update_chat(username, message, current_chat):
"""π© Conjures a fresh chat view with your latest quip - abracadabra! π"""
if message.strip():
save_chat_entry(username, message)
new_chat = load_chat()
return new_chat, get_user_list(new_chat)
# Name switcher - the shapeshifter swapping your chat persona! π¦ΈββοΈπ
def switch_user(new_username, current_chat):
"""π¦ Transforms you into a new chat hero - new name, same game! π"""
return new_username, current_chat, get_user_list(current_chat)
# Live chat persistence - the timekeeper syncing the chat clock! β°π
async def persist_chat():
"""β³ Keeps the chat tome in sync, scribbling updates every few ticks! π"""
while True:
await asyncio.sleep(5) # Persist every 5 seconds
with open(CHAT_FILE, 'r') as f:
current = f.read()
chat_state.value = current
# Hook up the magic! π£
send_button.click(
fn=update_chat,
inputs=[username_input, message_input, chat_state],
outputs=[chat_display, user_list],
_js="() => ['', document.getElementById('message_input').value, document.getElementById('chat_display').value]"
)
message_input.submit(
fn=update_chat,
inputs=[username_input, message_input, chat_state],
outputs=[chat_display, user_list]
)
user_list.change(
fn=switch_user,
inputs=[user_list, chat_state],
outputs=[username_input, chat_display, user_list]
)
# Start with the latest chat vibes! π΅
demo.load(lambda: (load_chat(), get_user_list(load_chat())), None, [chat_display, user_list])
# Kick off the persistence dance! π
asyncio.create_task(persist_chat())
return demo
# Main event - the ringmaster kicking off the chat circus! πͺπ€‘
async def main():
"""π€ Drops the mic and starts the chat party - itβs showtime, folks! π"""
global NODE_NAME
NODE_NAME, port = get_node_name()
await start_websocket_server()
# Grab username from URL or roll the dice! π²
initial_username = os.environ.get("QUERY_STRING", "").split("username=")[-1] if "username=" in os.environ.get("QUERY_STRING", "") else random.choice(FUN_USERNAMES)
print(f"Welcoming {initial_username} to the chat bash!")
interface = create_gradio_interface(initial_username)
# URL param magic - the gatekeeper checking your VIP pass! ποΈ
from starlette.middleware.base import BaseHTTPMiddleware
class UsernameMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
"""π Snags your name from the URL VIP list - exclusive entry! π©"""
query_params = dict(request.query_params)
if "username" in query_params and query_params["username"] in FUN_USERNAMES:
global initial_username
initial_username = query_params["username"]
return await call_next(request)
app = gr.routes.App.create_app(interface)
app.add_middleware(UsernameMiddleware)
import uvicorn
await uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=port)).serve()
if __name__ == "__main__":
asyncio.run(main()) |