|
import gradio as gr |
|
import asyncio |
|
import websockets |
|
import json |
|
import uuid |
|
import argparse |
|
import urllib.parse |
|
from datetime import datetime |
|
import logging |
|
import sys |
|
import os |
|
import shutil |
|
from pathlib import Path |
|
import time |
|
import random |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[logging.StreamHandler(sys.stdout)] |
|
) |
|
logger = logging.getLogger("chat-node") |
|
|
|
|
|
active_connections = {} |
|
chat_history = {} |
|
log_history = [] |
|
file_modification_times = {} |
|
sector_users = {} |
|
|
|
|
|
GRID_WIDTH = 10 |
|
GRID_HEIGHT = 10 |
|
|
|
|
|
HISTORY_DIR = "chat_history" |
|
LOG_DIR = "server_logs" |
|
os.makedirs(HISTORY_DIR, exist_ok=True) |
|
os.makedirs(LOG_DIR, exist_ok=True) |
|
|
|
|
|
for dir_path, content in [ |
|
(HISTORY_DIR, "# Chat History\n\nThis directory contains persistent chat history files.\n"), |
|
(LOG_DIR, "# Server Logs\n\nThis directory contains server log files.\n") |
|
]: |
|
readme_path = os.path.join(dir_path, "README.md") |
|
if not os.path.exists(readme_path): |
|
with open(readme_path, "w") as f: |
|
f.write(content) |
|
|
|
|
|
FUN_USERNAMES = [ |
|
"CosmicJester π", "PixelPanda πΌ", "QuantumQuack π¦", "StellarSquirrel πΏοΈ", |
|
"GizmoGuru βοΈ", "NebulaNinja π ", "ByteBuster πΎ", "GalacticGopher π", |
|
"RocketRaccoon π", "EchoElf π§", "PhantomFox π¦", "WittyWizard π§", |
|
"LunarLlama π", "SolarSloth βοΈ", "AstroAlpaca π¦", "CyberCoyote πΊ", |
|
"MysticMoose π¦", "GlitchGnome π§", "VortexViper π", "ChronoChimp π" |
|
] |
|
|
|
|
|
def get_node_name(): |
|
parser = argparse.ArgumentParser(description='Start a chat node with a specific name') |
|
parser.add_argument('--node-name', type=str, default=None, help='Name for this chat node') |
|
parser.add_argument('--port', type=int, default=7860, help='Port to run the Gradio interface on') |
|
args = parser.parse_args() |
|
node_name = args.node_name or f"node-{uuid.uuid4().hex[:8]}" |
|
return node_name, args.port |
|
|
|
def get_room_history_file(room_id): |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
return os.path.join(HISTORY_DIR, f"{room_id}_{timestamp}.jsonl") |
|
|
|
def get_log_file(): |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
return os.path.join(LOG_DIR, f"log_{timestamp}.jsonl") |
|
|
|
def get_all_room_history_files(room_id): |
|
files = [os.path.join(HISTORY_DIR, f) for f in os.listdir(HISTORY_DIR) if f.startswith(f"{room_id}_") and f.endswith(".jsonl")] |
|
files.sort(key=os.path.getmtime, reverse=True) |
|
return files |
|
|
|
def list_all_history_files(): |
|
files = [] |
|
for file in os.listdir(HISTORY_DIR): |
|
if file.endswith(".jsonl") and file != "README.md": |
|
room_id = file.split('_', 1)[0] |
|
file_path = os.path.join(HISTORY_DIR, file) |
|
mod_time = os.path.getmtime(file_path) |
|
files.append((room_id, file_path, mod_time)) |
|
files.sort(key=lambda x: x[2], reverse=True) |
|
return files |
|
|
|
def load_room_history(room_id): |
|
if room_id not in chat_history: |
|
chat_history[room_id] = [] |
|
history_files = get_all_room_history_files(room_id) |
|
for file in history_files: |
|
file_modification_times[file] = file_modification_times.get(file, os.path.getmtime(file)) |
|
try: |
|
with open(file, 'r') as f: |
|
for line in f: |
|
if line.strip(): |
|
try: |
|
chat_history[room_id].append(json.loads(line)) |
|
except json.JSONDecodeError: |
|
logger.error(f"Error parsing JSON line in {file}") |
|
except Exception as e: |
|
logger.error(f"Error loading history from {file}: {e}") |
|
chat_history[room_id].sort(key=lambda x: x.get("timestamp", "")) |
|
logger.info(f"Loaded {len(chat_history[room_id])} messages from {len(history_files)} files for room {room_id}") |
|
sector_users[room_id] = sector_users.get(room_id, set()) |
|
return chat_history[room_id] |
|
|
|
def save_message_to_history(room_id, message): |
|
history_files = get_all_room_history_files(room_id) |
|
history_file = get_room_history_file(room_id) if not history_files or os.path.getsize(history_files[0]) > 1024 * 1024 else history_files[0] |
|
try: |
|
with open(history_file, 'a') as f: |
|
f.write(json.dumps(message) + '\n') |
|
file_modification_times[history_file] = os.path.getmtime(history_file) |
|
except Exception as e: |
|
logger.error(f"Error saving message to {history_file}: {e}") |
|
|
|
def save_log_entry(entry): |
|
log_files = [os.path.join(LOG_DIR, f) for f in os.listdir(LOG_DIR) if f.startswith("log_") and f.endswith(".jsonl")] |
|
log_files.sort(key=lambda x: os.path.getmtime(x), reverse=True) |
|
log_file = get_log_file() if not log_files or os.path.getsize(log_files[0]) > 1024 * 1024 else log_files[0] |
|
try: |
|
with open(log_file, 'a') as f: |
|
f.write(json.dumps(entry) + '\n') |
|
file_modification_times[log_file] = os.path.getmtime(log_file) |
|
except Exception as e: |
|
logger.error(f"Error saving log to {log_file}: {e}") |
|
|
|
def get_sector_coordinates(room_id): |
|
try: |
|
if ',' in room_id: |
|
x, y = map(int, room_id.split(',')) |
|
return max(0, min(x, GRID_WIDTH-1)), max(0, min(y, GRID_HEIGHT-1)) |
|
except: |
|
hash_val = hash(room_id) |
|
return abs(hash_val) % GRID_WIDTH, abs(hash_val >> 8) % GRID_HEIGHT |
|
|
|
def generate_sector_map(): |
|
grid = [[' ' for _ in range(GRID_WIDTH)] for _ in range(GRID_HEIGHT)] |
|
for room_id, users in sector_users.items(): |
|
if users: |
|
x, y = get_sector_coordinates(room_id) |
|
user_count = len(users) |
|
grid[y][x] = str(min(user_count, 9)) if user_count < 10 else '+' |
|
header = ' ' + ''.join([str(i % 10) for i in range(GRID_WIDTH)]) |
|
map_str = header + '\n' + '\n'.join(f"{y % 10}|{''.join(grid[y])}|" for y in range(GRID_HEIGHT)) + '\n' + header |
|
return f"```\n{map_str}\n```\n\nLegend: Number indicates users in sector. '+' means 10+ users." |
|
|
|
async def clear_all_history(): |
|
global chat_history |
|
chat_history = {} |
|
for file in os.listdir(HISTORY_DIR): |
|
if file.endswith(".jsonl"): |
|
os.remove(os.path.join(HISTORY_DIR, file)) |
|
clear_msg = {"type": "system", "content": "π§Ή All chat history cleared", "timestamp": datetime.now().isoformat(), "sender": "system"} |
|
for room_id in list(active_connections.keys()): |
|
clear_msg["room_id"] = room_id |
|
await broadcast_message(clear_msg, room_id) |
|
logger.info("All chat history cleared") |
|
return "All chat history cleared" |
|
|
|
async def websocket_handler(websocket, path): |
|
try: |
|
room_id = path.strip('/').split('/')[0] or "default" |
|
client_id = str(uuid.uuid4()) |
|
active_connections.setdefault(room_id, {})[client_id] = websocket |
|
sector_users.setdefault(room_id, set()).add(client_id) |
|
x, y = get_sector_coordinates(room_id) |
|
room_history = load_room_history(room_id) |
|
|
|
for msg in [ |
|
{"type": "system", "content": f"Welcome to room '{room_id}' (Sector {x},{y})! Node: '{NODE_NAME}'", "timestamp": datetime.now().isoformat(), "sender": "system", "room_id": room_id}, |
|
{"type": "system", "content": f"Sector Map:\n{generate_sector_map()}", "timestamp": datetime.now().isoformat(), "sender": "system", "room_id": room_id} |
|
] + room_history: |
|
await websocket.send(json.dumps(msg)) |
|
|
|
join_msg = {"type": "system", "content": f"User joined (Sector {x},{y}) - {len(sector_users[room_id])} users", "timestamp": datetime.now().isoformat(), "sender": "system", "room_id": room_id} |
|
await broadcast_message(join_msg, room_id) |
|
save_message_to_history(room_id, join_msg) |
|
logger.info(f"Client {client_id} connected to room {room_id} (Sector {x},{y})") |
|
|
|
if room_id == "logs": |
|
for entry in log_history[-10:]: |
|
await websocket.send(json.dumps({"type": "log", "content": entry["message"], "timestamp": entry["timestamp"], "sender": "system", "room_id": "logs"})) |
|
|
|
async for message in websocket: |
|
try: |
|
data = json.loads(message) |
|
if data.get("type") == "command": |
|
if data.get("command") == "clear_history": |
|
await clear_all_history() |
|
continue |
|
if data.get("command") == "show_map": |
|
await websocket.send(json.dumps({"type": "system", "content": f"Sector Map:\n{generate_sector_map()}", "timestamp": datetime.now().isoformat(), "sender": "system", "room_id": room_id})) |
|
continue |
|
data.update({"timestamp": datetime.now().isoformat(), "sender_node": NODE_NAME, "room_id": room_id}) |
|
chat_history[room_id].append(data) |
|
if len(chat_history[room_id]) > 500: |
|
chat_history[room_id] = chat_history[room_id][-500:] |
|
save_message_to_history(room_id, data) |
|
await broadcast_message(data, room_id) |
|
except json.JSONDecodeError: |
|
await websocket.send(json.dumps({"type": "error", "content": "Invalid JSON", "timestamp": datetime.now().isoformat(), "sender": "system", "room_id": room_id})) |
|
except websockets.ConnectionClosed: |
|
logger.info(f"Client {client_id} disconnected from room {room_id}") |
|
finally: |
|
if room_id in active_connections and client_id in active_connections[room_id]: |
|
del active_connections[room_id][client_id] |
|
sector_users[room_id].discard(client_id) |
|
x, y = get_sector_coordinates(room_id) |
|
leave_msg = {"type": "system", "content": f"User left (Sector {x},{y}) - {len(sector_users[room_id])} users", "timestamp": datetime.now().isoformat(), "sender": "system", "room_id": room_id} |
|
await broadcast_message(leave_msg, room_id) |
|
save_message_to_history(room_id, leave_msg) |
|
if not active_connections[room_id]: |
|
del active_connections[room_id] |
|
|
|
async def broadcast_message(message, room_id): |
|
if room_id in active_connections: |
|
disconnected = [] |
|
for client_id, ws in active_connections[room_id].items(): |
|
try: |
|
await ws.send(json.dumps(message)) |
|
except websockets.ConnectionClosed: |
|
disconnected.append(client_id) |
|
for client_id in disconnected: |
|
del active_connections[room_id][client_id] |
|
|
|
async def start_websocket_server(host='0.0.0.0', port=8765): |
|
server = await websockets.serve(websocket_handler, host, port) |
|
logger.info(f"WebSocket server started on ws://{host}:{port}") |
|
return server |
|
|
|
main_event_loop = None |
|
message_queue = [] |
|
|
|
def send_message(message, username, room_id): |
|
if not message.strip(): |
|
return None |
|
msg_data = {"type": "chat", "content": message, "username": username, "room_id": room_id} |
|
message_queue.append(msg_data) |
|
return f"{username}: {message}" |
|
|
|
def join_room(room_id, chat_history_output): |
|
if not room_id.strip(): |
|
return "Please enter a valid room ID", chat_history_output |
|
room_id = urllib.parse.quote(room_id.strip()) |
|
history = load_room_history(room_id) |
|
formatted_history = [ |
|
f"[{datetime.fromisoformat(msg['timestamp']).strftime('%H:%M:%S')}] {msg.get('username', 'Anonymous')} [{'sender_node' in msg and msg['sender_node'] or 'unknown'}]: {msg['content']}" |
|
if msg.get("type") == "chat" else f"System: {msg['content']}" |
|
for msg in history |
|
] |
|
return f"Joined room: {room_id}", "\n".join(formatted_history) |
|
|
|
def list_available_rooms(): |
|
history_files = list_all_history_files() |
|
return "No chat rooms available yet. Join one to create it!" if not history_files else "### Available Chat Rooms\n\n" + "\n".join( |
|
f"- **{room_id}**: Last activity {datetime.fromtimestamp(mod_time).strftime('%Y-%m-%d %H:%M:%S')}" for room_id, _, mod_time in history_files |
|
) |
|
|
|
def create_gradio_interface(): |
|
with gr.Blocks(title=f"Chat Node: {NODE_NAME}") as interface: |
|
|
|
interface.css = """ |
|
#log-overlay { |
|
position: fixed; top: 10px; right: 10px; width: 300px; height: 200px; |
|
background: rgba(0, 0, 0, 0.8); color: white; padding: 10px; overflow-y: auto; |
|
border-radius: 5px; z-index: 1000; display: none; |
|
} |
|
#threejs-container { width: 100%; height: 400px; } |
|
.chat-container { margin-top: 20px; } |
|
""" |
|
interface.js = """ |
|
async () => { |
|
// Log Overlay |
|
let logOverlay = document.createElement('div'); |
|
logOverlay.id = 'log-overlay'; |
|
document.body.appendChild(logOverlay); |
|
let wsLogs = new WebSocket('ws://' + window.location.hostname + ':8765/logs'); |
|
wsLogs.onmessage = (event) => { |
|
let data = JSON.parse(event.data); |
|
if (data.type === 'log') { |
|
logOverlay.style.display = 'block'; |
|
logOverlay.innerHTML += `<p>${data.timestamp} - ${data.content}</p>`; |
|
logOverlay.scrollTop = logOverlay.scrollHeight; |
|
} |
|
}; |
|
wsLogs.onerror = () => console.error('Log WebSocket error'); |
|
wsLogs.onclose = () => console.log('Log WebSocket closed'); |
|
|
|
// Three.js Sector Map |
|
let container = document.getElementById('threejs-container'); |
|
if (!container) return; |
|
const THREE = await import('https://cdn.jsdelivr.net/npm/[email protected]/build/three.module.js'); |
|
const scene = new THREE.Scene(); |
|
const camera = new THREE.PerspectiveCamera(75, container.clientWidth / container.clientHeight, 0.1, 1000); |
|
const renderer = new THREE.WebGLRenderer(); |
|
renderer.setSize(container.clientWidth, container.clientHeight); |
|
container.appendChild(renderer.domElement); |
|
|
|
const gridSize = 10; |
|
const geometry = new THREE.BoxGeometry(0.8, 0.8, 0.8); |
|
const edges = new THREE.EdgesGeometry(geometry); |
|
for (let x = 0; x < gridSize; x++) { |
|
for (let y = 0; y < gridSize; y++) { |
|
const hasUsers = document.querySelector(`[data-room="${x},${y}"]`)?.dataset.users > 0; |
|
const material = new THREE.MeshBasicMaterial({ color: hasUsers ? 0x00ff00 : 0x333333 }); |
|
const cube = new THREE.Mesh(geometry, material); |
|
cube.position.set(x - gridSize / 2, 0, y - gridSize / 2); |
|
scene.add(cube); |
|
if (hasUsers) { |
|
const line = new THREE.LineSegments(edges, new THREE.LineBasicMaterial({ color: 0xffffff })); |
|
line.position.set(x - gridSize / 2, 0, y - gridSize / 2); |
|
scene.add(line); |
|
} |
|
} |
|
} |
|
camera.position.set(0, 10, 10); |
|
camera.lookAt(0, 0, 0); |
|
|
|
function animate() { |
|
requestAnimationFrame(animate); |
|
renderer.render(scene, camera); |
|
} |
|
animate(); |
|
} |
|
""" |
|
|
|
gr.Markdown(f"# Chat Node: {NODE_NAME}") |
|
gr.Markdown("Welcome! You've auto-joined a room with a fun username.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
room_list = gr.Markdown(value=list_available_rooms()) |
|
refresh_button = gr.Button("π Refresh Room List") |
|
with gr.Column(scale=1): |
|
clear_button = gr.Button("π§Ή Clear All Chat History", variant="stop") |
|
|
|
gr.HTML('<div id="threejs-container"></div>') |
|
|
|
with gr.Row(): |
|
room_id_input = gr.Textbox(label="Room ID", placeholder="Change room (e.g., '0,0')") |
|
join_button = gr.Button("Join Room") |
|
|
|
chat_history_output = gr.Textbox(label="Chat History", lines=20, max_lines=20, elem_classes=["chat-container"]) |
|
|
|
with gr.Row(): |
|
username_input = gr.Textbox(label="Username", value=random.choice(FUN_USERNAMES)) |
|
with gr.Column(scale=3): |
|
message_input = gr.Textbox(label="Message", placeholder="Type here...", lines=3) |
|
with gr.Column(scale=1): |
|
send_button = gr.Button("Send") |
|
map_button = gr.Button("πΊοΈ Show Map") |
|
|
|
current_room_display = gr.Textbox(label="Current Room", value="Auto-joining...") |
|
|
|
|
|
refresh_button.click(list_available_rooms, [], [room_list]) |
|
clear_button.click(lambda: (message_queue.append({"type": "command", "command": "clear_history", "username": "System"}), "π§Ή Clearing..."), [], [room_list]) |
|
join_button.click(join_room, [room_id_input, chat_history_output], [current_room_display, chat_history_output]) |
|
|
|
def send_and_clear(message, username, room_id): |
|
if not room_id.startswith("Joined room:"): |
|
return "", "Please join a room first" |
|
actual_room_id = room_id.replace("Joined room: ", "").strip() |
|
formatted_msg = send_message(message, username, actual_room_id) |
|
return "", formatted_msg or "" |
|
|
|
send_button.click(send_and_clear, [message_input, username_input, current_room_display], [message_input, chat_history_output]) |
|
message_input.submit(send_and_clear, [message_input, username_input, current_room_display], [message_input, chat_history_output]) |
|
map_button.click(lambda room_id: generate_sector_map() if room_id.startswith("Joined room:") else "Join a room first", [current_room_display], [chat_history_output]) |
|
|
|
|
|
def auto_join(): |
|
random_room = f"{random.randint(0, GRID_WIDTH-1)},{random.randint(0, GRID_HEIGHT-1)}" |
|
return join_room(random_room, "") |
|
interface.load(auto_join, [], [current_room_display, chat_history_output]) |
|
|
|
return interface |
|
|
|
async def process_message_queue(): |
|
while True: |
|
if message_queue: |
|
msg_data = message_queue.pop(0) |
|
await broadcast_message(msg_data, msg_data["room_id"]) |
|
await asyncio.sleep(0.1) |
|
|
|
async def process_logs(): |
|
while True: |
|
global log_history |
|
updated = False |
|
for file in os.listdir(LOG_DIR): |
|
if file.endswith(".jsonl"): |
|
file_path = os.path.join(LOG_DIR, file) |
|
current_mtime = os.path.getmtime(file_path) |
|
if file_path not in file_modification_times or current_mtime > file_modification_times[file_path]: |
|
updated = True |
|
file_modification_times[file_path] = current_mtime |
|
if updated: |
|
log_history = [] |
|
for file in sorted([f for f in os.listdir(LOG_DIR) if f.endswith(".jsonl")], key=lambda x: os.path.getmtime(os.path.join(LOG_DIR, x))): |
|
try: |
|
with open(os.path.join(LOG_DIR, file), 'r') as f: |
|
for line in f: |
|
if line.strip(): |
|
log_history.append(json.loads(line)) |
|
except Exception as e: |
|
logger.error(f"Error loading logs from {file}: {e}") |
|
log_msg = {"type": "log", "content": "\n".join(entry["message"] for entry in log_history[-10:]), "timestamp": datetime.now().isoformat(), "sender": "system", "room_id": "logs"} |
|
await broadcast_message(log_msg, "logs") |
|
await asyncio.sleep(1) |
|
|
|
class LogBroadcastHandler(logging.Handler): |
|
def emit(self, record): |
|
entry = {"timestamp": datetime.now().isoformat(), "level": record.levelname, "message": self.format(record), "name": record.name} |
|
save_log_entry(entry) |
|
log_history.append(entry) |
|
|
|
logger.addHandler(LogBroadcastHandler()) |
|
|
|
async def main(): |
|
global NODE_NAME, main_event_loop |
|
NODE_NAME, port = get_node_name() |
|
main_event_loop = asyncio.get_running_loop() |
|
await start_websocket_server() |
|
asyncio.create_task(process_message_queue()) |
|
asyncio.create_task(process_logs()) |
|
interface = create_gradio_interface() |
|
|
|
from starlette.middleware.base import BaseHTTPMiddleware |
|
class NodeNameMiddleware(BaseHTTPMiddleware): |
|
async def dispatch(self, request, call_next): |
|
global NODE_NAME |
|
if "node_name" in dict(request.query_params): |
|
NODE_NAME = dict(request.query_params)["node_name"] |
|
logger.info(f"Node name set to {NODE_NAME} from URL") |
|
return await call_next(request) |
|
|
|
app = gr.routes.App.create_app(interface) |
|
app.add_middleware(NodeNameMiddleware) |
|
import uvicorn |
|
await uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=port)).serve() |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |