|
import gradio as gr |
|
import asyncio |
|
import websockets |
|
import uuid |
|
import argparse |
|
from datetime import datetime |
|
import os |
|
import random |
|
import time |
|
|
|
|
|
FUN_USERNAMES = [ |
|
"CosmicJester π", "PixelPanda πΌ", "QuantumQuack π¦", "StellarSquirrel πΏοΈ", |
|
"GizmoGuru βοΈ", "NebulaNinja π ", "ByteBuster πΎ", "GalacticGopher π", |
|
"RocketRaccoon π", "EchoElf π§", "PhantomFox π¦", "WittyWizard π§", |
|
"LunarLlama π", "SolarSloth βοΈ", "AstroAlpaca π¦", "CyberCoyote πΊ", |
|
"MysticMoose π¦", "GlitchGnome π§", "VortexViper π", "ChronoChimp π" |
|
] |
|
|
|
|
|
CHAT_DIR = "chat_logs" |
|
os.makedirs(CHAT_DIR, exist_ok=True) |
|
|
|
|
|
CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md") |
|
|
|
|
|
def get_node_name(): |
|
"""π² Spins the wheel of fate to name our node - a random alias or user pick! π·οΈ""" |
|
parser = argparse.ArgumentParser(description='Start a chat node with a specific name') |
|
parser.add_argument('--node-name', type=str, default=None, help='Name for this chat node') |
|
parser.add_argument('--port', type=int, default=7860, help='Port to run the Gradio interface on') |
|
args = parser.parse_args() |
|
return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port |
|
|
|
|
|
def save_chat_entry(username, message): |
|
"""ποΈ Carves a chat line into the grand Markdown tome - history in the making! ποΈ""" |
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
entry = f"[{timestamp}] {username}: {message}" |
|
try: |
|
with open(CHAT_FILE, 'a') as f: |
|
f.write(f"{entry}\n") |
|
return True |
|
except Exception as e: |
|
print(f"Oops! Failed to save chat: {e}") |
|
return False |
|
|
|
|
|
def load_chat(): |
|
"""π Digs up the chat treasure from the filesystem - tales of old and new! π°""" |
|
if not os.path.exists(CHAT_FILE): |
|
with open(CHAT_FILE, 'w') as f: |
|
f.write("# Global Chat\n\nNo messages yet - start chatting! π€\n") |
|
try: |
|
with open(CHAT_FILE, 'r') as f: |
|
content = f.read() |
|
lines = content.strip().split('\n') |
|
numbered_content = "\n".join(f"{i+1}. {line}" for i, line in enumerate(lines) if line.strip()) |
|
return numbered_content |
|
except Exception as e: |
|
print(f"Chat load hiccup: {e}") |
|
return "# Error loading chat\nSomething went wonky! π΅" |
|
|
|
|
|
def get_user_list(chat_content): |
|
"""π Peeks at the chat to spot all the cool cats talking - whoβs in the club? πΈ""" |
|
users = set() |
|
for line in chat_content.split('\n'): |
|
if line.strip() and ': ' in line: |
|
user = line.split(': ')[1].split(' ')[0] |
|
users.add(user) |
|
return sorted(list(users)) |
|
|
|
active_connections = {} |
|
|
|
|
|
async def websocket_handler(websocket, path): |
|
"""π§ Guards the chat gate, letting messages fly and booting crashers! π¨""" |
|
try: |
|
client_id = str(uuid.uuid4()) |
|
room_id = "chat" |
|
active_connections.setdefault(room_id, {})[client_id] = websocket |
|
print(f"Client {client_id} joined the chat party!") |
|
|
|
async for message in websocket: |
|
try: |
|
parts = message.split('|', 1) |
|
if len(parts) == 2: |
|
username, content = parts |
|
save_chat_entry(username, content) |
|
await broadcast_message(f"{username}|{content}", room_id) |
|
except Exception as e: |
|
print(f"Message mishap: {e}") |
|
await websocket.send(f"ERROR|Oops, bad message format! π¬") |
|
except websockets.ConnectionClosed: |
|
print(f"Client {client_id} bailed from the chat!") |
|
finally: |
|
if room_id in active_connections and client_id in active_connections[room_id]: |
|
del active_connections[room_id][client_id] |
|
if not active_connections[room_id]: |
|
del active_connections[room_id] |
|
|
|
|
|
async def broadcast_message(message, room_id): |
|
"""π’ Shouts the latest chat beat to every dancer in the room - hear it loud! π΅""" |
|
if room_id in active_connections: |
|
disconnected = [] |
|
for client_id, ws in active_connections[room_id].items(): |
|
try: |
|
await ws.send(message) |
|
except websockets.ConnectionClosed: |
|
disconnected.append(client_id) |
|
for client_id in disconnected: |
|
del active_connections[room_id][client_id] |
|
|
|
|
|
async def start_websocket_server(host='0.0.0.0', port=8765): |
|
"""π Cranks up the WebSocket jukebox, ready to rock the chat scene! πΈ""" |
|
server = await websockets.serve(websocket_handler, host, port) |
|
print(f"WebSocket server jamming on ws://{host}:{port}") |
|
return server |
|
|
|
|
|
def create_gradio_interface(initial_username): |
|
"""ποΈ Sets up the chat stage with live updates and name-switching flair! π""" |
|
with gr.Blocks(title=f"Chat Node: {NODE_NAME}", css=".chat-box { font-family: monospace; background: #1e1e1e; color: #d4d4d4; padding: 10px; border-radius: 5px; }") as demo: |
|
|
|
chat_state = gr.State(value=load_chat()) |
|
|
|
gr.Markdown(f"# Chat Node: {NODE_NAME}") |
|
gr.Markdown("Chat live, switch names, and keep the party going! π") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
chat_display = gr.Code(label="Chat History", language="markdown", lines=15, elem_classes=["chat-box"]) |
|
with gr.Column(scale=1): |
|
user_list = gr.Dropdown( |
|
label="Switch User", |
|
choices=get_user_list(chat_state.value), |
|
value=initial_username, |
|
allow_custom_value=True |
|
) |
|
|
|
with gr.Row(): |
|
username_input = gr.Textbox(label="Your Name", value=initial_username) |
|
message_input = gr.Textbox(label="Message", placeholder="Type your epic line here! βοΈ") |
|
send_button = gr.Button("Send π") |
|
|
|
|
|
def update_chat(username, message, current_chat): |
|
"""π© Conjures a fresh chat view with your latest quip - abracadabra! π""" |
|
if message.strip(): |
|
save_chat_entry(username, message) |
|
new_chat = load_chat() |
|
return new_chat, get_user_list(new_chat) |
|
|
|
|
|
def switch_user(new_username, current_chat): |
|
"""π¦ Transforms you into a new chat hero - new name, same game! π""" |
|
return new_username, current_chat, get_user_list(current_chat) |
|
|
|
|
|
async def persist_chat(): |
|
"""β³ Keeps the chat tome in sync, scribbling updates every few ticks! π""" |
|
while True: |
|
await asyncio.sleep(5) |
|
with open(CHAT_FILE, 'r') as f: |
|
current = f.read() |
|
chat_state.value = current |
|
|
|
|
|
send_button.click( |
|
fn=update_chat, |
|
inputs=[username_input, message_input, chat_state], |
|
outputs=[chat_display, user_list] |
|
) |
|
message_input.submit( |
|
fn=update_chat, |
|
inputs=[username_input, message_input, chat_state], |
|
outputs=[chat_display, user_list] |
|
) |
|
user_list.change( |
|
fn=switch_user, |
|
inputs=[user_list, chat_state], |
|
outputs=[username_input, chat_display, user_list] |
|
) |
|
|
|
|
|
demo.load(lambda: (load_chat(), get_user_list(load_chat())), None, [chat_display, user_list]) |
|
|
|
|
|
asyncio.create_task(persist_chat()) |
|
|
|
return demo |
|
|
|
|
|
async def main(): |
|
"""π€ Drops the mic and starts the chat party - itβs showtime, folks! π""" |
|
global NODE_NAME |
|
NODE_NAME, port = get_node_name() |
|
await start_websocket_server() |
|
|
|
|
|
initial_username = os.environ.get("QUERY_STRING", "").split("username=")[-1] if "username=" in os.environ.get("QUERY_STRING", "") else random.choice(FUN_USERNAMES) |
|
print(f"Welcoming {initial_username} to the chat bash!") |
|
|
|
interface = create_gradio_interface(initial_username) |
|
|
|
|
|
from starlette.middleware.base import BaseHTTPMiddleware |
|
class UsernameMiddleware(BaseHTTPMiddleware): |
|
async def dispatch(self, request, call_next): |
|
"""π Snags your name from the URL VIP list - exclusive entry! π©""" |
|
query_params = dict(request.query_params) |
|
if "username" in query_params and query_params["username"] in FUN_USERNAMES: |
|
global initial_username |
|
initial_username = query_params["username"] |
|
return await call_next(request) |
|
|
|
app = gr.routes.App.create_app(interface) |
|
app.add_middleware(UsernameMiddleware) |
|
|
|
import uvicorn |
|
await uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=port)).serve() |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |