awacke1 commited on
Commit
e2ffbc8
·
verified ·
1 Parent(s): a58b36c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +145 -205
app.py CHANGED
@@ -8,6 +8,10 @@ import urllib.parse
8
  from datetime import datetime
9
  import logging
10
  import sys
 
 
 
 
11
 
12
  # Configure logging
13
  logging.basicConfig(
@@ -21,6 +25,8 @@ logger = logging.getLogger("chat-node")
21
  active_connections = {}
22
  # Dictionary to store message history for each chat room (in-memory cache)
23
  chat_history = {}
 
 
24
 
25
  # Dictionary to track file modification times
26
  file_modification_times = {}
@@ -32,22 +38,23 @@ sector_users = {}
32
  GRID_WIDTH = 10
33
  GRID_HEIGHT = 10
34
 
35
- # Directory to store persistent chat history
36
  HISTORY_DIR = "chat_history"
37
- import os
38
- import shutil
39
- from pathlib import Path
40
- import time
41
-
42
- # Create history directory if it doesn't exist
43
  os.makedirs(HISTORY_DIR, exist_ok=True)
 
44
 
45
- # README.md file that won't be listed or deleted
46
  README_PATH = os.path.join(HISTORY_DIR, "README.md")
47
  if not os.path.exists(README_PATH):
48
  with open(README_PATH, "w") as f:
49
  f.write("# Chat History\n\nThis directory contains persistent chat history files.\n")
50
 
 
 
 
 
 
51
  # Get node name from URL or command line
52
  def get_node_name():
53
  parser = argparse.ArgumentParser(description='Start a chat node with a specific name')
@@ -58,49 +65,57 @@ def get_node_name():
58
  node_name = args.node_name
59
  port = args.port
60
 
61
- # If no node name specified, generate a random one
62
  if not node_name:
63
  node_name = f"node-{uuid.uuid4().hex[:8]}"
64
 
65
  return node_name, port
66
 
67
  def get_room_history_file(room_id):
68
- """Get the filename for a room's history."""
69
- # Create timestamp-based log files
70
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
71
  return os.path.join(HISTORY_DIR, f"{room_id}_{timestamp}.jsonl")
72
 
 
 
 
 
73
  def get_all_room_history_files(room_id):
74
- """Get all history files for a specific room."""
75
  files = []
76
  for file in os.listdir(HISTORY_DIR):
77
  if file.startswith(f"{room_id}_") and file.endswith(".jsonl"):
78
  files.append(os.path.join(HISTORY_DIR, file))
79
- # Sort by modification time (newest first)
80
  files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
81
  return files
82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
  def load_room_history(room_id):
84
- """Load chat history for a room from all persistent storage files."""
85
  if room_id not in chat_history:
86
  chat_history[room_id] = []
87
-
88
- # Get all history files for this room
89
  history_files = get_all_room_history_files(room_id)
90
 
91
- # Track file modification times
92
  for file in history_files:
93
  if file not in file_modification_times:
94
  file_modification_times[file] = os.path.getmtime(file)
95
 
96
- # Load messages from all files
97
  messages = []
98
  for history_file in history_files:
99
  try:
100
  with open(history_file, 'r') as f:
101
  for line in f:
102
  line = line.strip()
103
- if line: # Skip empty lines
104
  try:
105
  data = json.loads(line)
106
  messages.append(data)
@@ -109,28 +124,20 @@ def load_room_history(room_id):
109
  except Exception as e:
110
  logger.error(f"Error loading history from {history_file}: {e}")
111
 
112
- # Sort by timestamp
113
  messages.sort(key=lambda x: x.get("timestamp", ""), reverse=False)
114
  chat_history[room_id] = messages
115
-
116
  logger.info(f"Loaded {len(messages)} messages from {len(history_files)} files for room {room_id}")
117
 
118
- # Track users in this sector
119
  if room_id not in sector_users:
120
  sector_users[room_id] = set()
121
 
122
  return chat_history[room_id]
123
 
124
  def save_message_to_history(room_id, message):
125
- """Save a single message to the newest history file for a room."""
126
- # Get the newest history file or create a new one
127
  history_files = get_all_room_history_files(room_id)
128
-
129
  if not history_files:
130
- # Create a new file
131
  history_file = get_room_history_file(room_id)
132
  else:
133
- # Use the newest file if it's less than 1 MB, otherwise create a new one
134
  newest_file = history_files[0]
135
  if os.path.getsize(newest_file) > 1024 * 1024: # 1 MB
136
  history_file = get_room_history_file(room_id)
@@ -138,111 +145,132 @@ def save_message_to_history(room_id, message):
138
  history_file = newest_file
139
 
140
  try:
141
- # Append the message as a single line of JSON
142
  with open(history_file, 'a') as f:
143
  f.write(json.dumps(message) + '\n')
144
-
145
- # Update modification time
146
  file_modification_times[history_file] = os.path.getmtime(history_file)
147
-
148
  logger.debug(f"Saved message to {history_file}")
149
  except Exception as e:
150
  logger.error(f"Error saving message to {history_file}: {e}")
151
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  def check_for_new_messages():
153
- """Check for new messages in all history files."""
154
  updated_rooms = set()
155
-
156
- # Check all files in the history directory
157
  for file in os.listdir(HISTORY_DIR):
158
  if file.endswith(".jsonl"):
159
  file_path = os.path.join(HISTORY_DIR, file)
160
  current_mtime = os.path.getmtime(file_path)
161
-
162
- # Check if this file is new or has been modified
163
  if file_path not in file_modification_times or current_mtime > file_modification_times[file_path]:
164
- # Extract room_id from filename
165
  parts = file.split('_', 1)
166
  if len(parts) > 0:
167
  room_id = parts[0]
168
  updated_rooms.add(room_id)
169
-
170
- # Update tracked modification time
171
  file_modification_times[file_path] = current_mtime
172
 
173
- # Reload history for updated rooms
174
  for room_id in updated_rooms:
175
  if room_id in chat_history:
176
- # Remember we had this room loaded
177
  old_history_len = len(chat_history[room_id])
178
- # Clear and reload
179
  chat_history[room_id] = []
180
  load_room_history(room_id)
181
  new_history_len = len(chat_history[room_id])
182
-
183
  if new_history_len > old_history_len:
184
  logger.info(f"Found {new_history_len - old_history_len} new messages for room {room_id}")
185
 
186
  return updated_rooms
187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
188
  def get_sector_coordinates(room_id):
189
- """Convert a room ID to grid coordinates, or assign new ones."""
190
  try:
191
- # Try to parse room ID as "x,y"
192
  if ',' in room_id:
193
  x, y = map(int, room_id.split(','))
194
  return max(0, min(x, GRID_WIDTH-1)), max(0, min(y, GRID_HEIGHT-1))
195
  except:
196
  pass
197
-
198
- # Hash the room_id string to get stable coordinates
199
  hash_val = hash(room_id)
200
  x = abs(hash_val) % GRID_WIDTH
201
  y = abs(hash_val >> 8) % GRID_HEIGHT
202
-
203
  return x, y
204
 
205
  def generate_sector_map():
206
- """Generate an ASCII representation of the sector map."""
207
- # Initialize empty grid
208
  grid = [[' ' for _ in range(GRID_WIDTH)] for _ in range(GRID_HEIGHT)]
209
-
210
- # Place active rooms with user counts
211
  for room_id, users in sector_users.items():
212
- if users: # Only show rooms with users
213
  x, y = get_sector_coordinates(room_id)
214
  user_count = len(users)
215
  grid[y][x] = str(min(user_count, 9)) if user_count < 10 else '+'
216
 
217
- # Create ASCII representation
218
  header = ' ' + ''.join([str(i % 10) for i in range(GRID_WIDTH)])
219
  map_str = header + '\n'
220
-
221
  for y in range(GRID_HEIGHT):
222
- row = f"{y % 10}|"
223
- for x in range(GRID_WIDTH):
224
- row += grid[y][x]
225
- row += '|'
226
  map_str += row + '\n'
227
-
228
- footer = ' ' + ''.join([str(i % 10) for i in range(GRID_WIDTH)])
229
- map_str += footer
230
-
231
  return f"```\n{map_str}\n```\n\nLegend: Number indicates users in sector. '+' means 10+ users."
232
 
233
  async def clear_all_history():
234
- """Clear all chat history for all rooms."""
235
  global chat_history
236
-
237
- # Clear in-memory history
238
  chat_history = {}
239
-
240
- # Delete all history files except README.md
241
  for file in os.listdir(HISTORY_DIR):
242
- if file.endswith(".md") and file != "README.md":
243
  os.remove(os.path.join(HISTORY_DIR, file))
244
 
245
- # Broadcast clear message to all rooms
246
  clear_msg = {
247
  "type": "system",
248
  "content": "🧹 All chat history has been cleared by a user",
@@ -258,31 +286,22 @@ async def clear_all_history():
258
  return "All chat history cleared"
259
 
260
  async def websocket_handler(websocket, path):
261
- """Handle WebSocket connections."""
262
  try:
263
- # Extract room_id from path if present
264
  path_parts = path.strip('/').split('/')
265
  room_id = path_parts[0] if path_parts else "default"
266
 
267
- # Register the new client
268
  client_id = str(uuid.uuid4())
269
  if room_id not in active_connections:
270
  active_connections[room_id] = {}
271
-
272
  active_connections[room_id][client_id] = websocket
273
 
274
- # Add user to sector map
275
  if room_id not in sector_users:
276
  sector_users[room_id] = set()
277
  sector_users[room_id].add(client_id)
278
 
279
- # Get sector coordinates
280
  x, y = get_sector_coordinates(room_id)
281
-
282
- # Load or initialize chat history
283
  room_history = load_room_history(room_id)
284
 
285
- # Send welcome message
286
  welcome_msg = {
287
  "type": "system",
288
  "content": f"Welcome to room '{room_id}' (Sector {x},{y})! Connected from node '{NODE_NAME}'",
@@ -292,7 +311,6 @@ async def websocket_handler(websocket, path):
292
  }
293
  await websocket.send(json.dumps(welcome_msg))
294
 
295
- # Send sector map
296
  map_msg = {
297
  "type": "system",
298
  "content": f"Sector Map:\n{generate_sector_map()}",
@@ -302,11 +320,9 @@ async def websocket_handler(websocket, path):
302
  }
303
  await websocket.send(json.dumps(map_msg))
304
 
305
- # Send chat history
306
  for msg in room_history:
307
  await websocket.send(json.dumps(msg))
308
 
309
- # Broadcast join notification
310
  join_msg = {
311
  "type": "system",
312
  "content": f"User joined the room (Sector {x},{y}) - {len(sector_users[room_id])} users now present",
@@ -319,17 +335,25 @@ async def websocket_handler(websocket, path):
319
 
320
  logger.info(f"New client {client_id} connected to room {room_id} (Sector {x},{y})")
321
 
322
- # Handle messages from this client
 
 
 
 
 
 
 
 
 
 
323
  async for message in websocket:
324
  try:
325
  data = json.loads(message)
326
 
327
- # Check for clear command
328
  if data.get("type") == "command" and data.get("command") == "clear_history":
329
  result = await clear_all_history()
330
  continue
331
 
332
- # Check for map request
333
  if data.get("type") == "command" and data.get("command") == "show_map":
334
  map_msg = {
335
  "type": "system",
@@ -341,20 +365,15 @@ async def websocket_handler(websocket, path):
341
  await websocket.send(json.dumps(map_msg))
342
  continue
343
 
344
- # Add metadata to the message
345
  data["timestamp"] = datetime.now().isoformat()
346
  data["sender_node"] = NODE_NAME
347
  data["room_id"] = room_id
348
 
349
- # Store in history
350
  chat_history[room_id].append(data)
351
- if len(chat_history[room_id]) > 500: # Increased limit to 500 messages
352
  chat_history[room_id] = chat_history[room_id][-500:]
353
 
354
- # Save to persistent storage
355
  save_message_to_history(room_id, data)
356
-
357
- # Broadcast to all clients in the room
358
  await broadcast_message(data, room_id)
359
 
360
  except json.JSONDecodeError:
@@ -370,18 +389,12 @@ async def websocket_handler(websocket, path):
370
  except websockets.exceptions.ConnectionClosed:
371
  logger.info(f"Client {client_id} disconnected from room {room_id}")
372
  finally:
373
- # Remove the client when disconnected
374
  if room_id in active_connections and client_id in active_connections[room_id]:
375
  del active_connections[room_id][client_id]
376
-
377
- # Remove user from sector map
378
  if room_id in sector_users and client_id in sector_users[room_id]:
379
  sector_users[room_id].remove(client_id)
380
 
381
- # Get sector coordinates
382
  x, y = get_sector_coordinates(room_id)
383
-
384
- # Broadcast leave notification
385
  leave_msg = {
386
  "type": "system",
387
  "content": f"User left the room (Sector {x},{y}) - {len(sector_users.get(room_id, set()))} users remaining",
@@ -392,68 +405,51 @@ async def websocket_handler(websocket, path):
392
  await broadcast_message(leave_msg, room_id)
393
  save_message_to_history(room_id, leave_msg)
394
 
395
- # Clean up empty rooms (but keep history)
396
  if not active_connections[room_id]:
397
  del active_connections[room_id]
398
 
399
  async def broadcast_message(message, room_id):
400
- """Broadcast a message to all clients in a room."""
401
  if room_id in active_connections:
402
  disconnected_clients = []
403
-
404
  for client_id, websocket in active_connections[room_id].items():
405
  try:
406
  await websocket.send(json.dumps(message))
407
  except websockets.exceptions.ConnectionClosed:
408
  disconnected_clients.append(client_id)
409
 
410
- # Clean up disconnected clients
411
  for client_id in disconnected_clients:
412
  del active_connections[room_id][client_id]
413
 
414
  async def start_websocket_server(host='0.0.0.0', port=8765):
415
- """Start the WebSocket server."""
416
  server = await websockets.serve(websocket_handler, host, port)
417
  logger.info(f"WebSocket server started on ws://{host}:{port}")
418
  return server
419
 
420
- # Global variables for event loop and queue
421
  main_event_loop = None
422
  message_queue = []
423
 
424
  def send_message(message, username, room_id):
425
- """Function to send a message from the Gradio interface."""
426
  if not message.strip():
427
  return None
428
 
429
  global message_queue
430
-
431
  msg_data = {
432
  "type": "chat",
433
  "content": message,
434
  "username": username,
435
  "room_id": room_id
436
  }
437
-
438
- # Add to queue for processing by the main loop
439
  message_queue.append(msg_data)
440
-
441
- # Format the message for display in the UI
442
  formatted_msg = f"{username}: {message}"
443
  return formatted_msg
444
 
445
  def join_room(room_id, chat_history_output):
446
- """Join a specific chat room."""
447
  if not room_id.strip():
448
  return "Please enter a valid room ID", chat_history_output
449
 
450
- # Sanitize the room ID
451
  room_id = urllib.parse.quote(room_id.strip())
452
-
453
- # Load room history from persistent storage
454
  history = load_room_history(room_id)
455
 
456
- # Format existing messages
457
  formatted_history = []
458
  for msg in history:
459
  if msg.get("type") == "chat":
@@ -472,23 +468,17 @@ def join_room(room_id, chat_history_output):
472
  return f"Joined room: {room_id}", formatted_history
473
 
474
  def send_clear_command():
475
- """Send a command to clear all chat history."""
476
  global message_queue
477
-
478
  msg_data = {
479
  "type": "command",
480
  "command": "clear_history",
481
  "username": "System"
482
  }
483
-
484
- # Add to queue for processing by the main loop
485
  message_queue.append(msg_data)
486
-
487
  return "🧹 Clearing all chat history..."
488
 
489
  def list_available_rooms():
490
- """List all available chat rooms with their last activity time."""
491
- history_files = get_all_history_files()
492
 
493
  if not history_files:
494
  return "No chat rooms available yet. Create one by joining a room!"
@@ -501,12 +491,10 @@ def list_available_rooms():
501
  return room_list
502
 
503
  def create_gradio_interface():
504
- """Create and return the Gradio interface."""
505
  with gr.Blocks(title=f"Chat Node: {NODE_NAME}") as interface:
506
  gr.Markdown(f"# Chat Node: {NODE_NAME}")
507
  gr.Markdown("Join a room by entering a room ID below or create a new one.")
508
 
509
- # Room list and management
510
  with gr.Row():
511
  with gr.Column(scale=3):
512
  room_list = gr.Markdown(value="Loading available rooms...")
@@ -514,7 +502,6 @@ def create_gradio_interface():
514
  with gr.Column(scale=1):
515
  clear_button = gr.Button("🧹 Clear All Chat History", variant="stop")
516
 
517
- # Join room controls with 2D grid input
518
  with gr.Row():
519
  with gr.Column(scale=2):
520
  room_id_input = gr.Textbox(label="Room ID", placeholder="Enter room ID or use x,y coordinates")
@@ -525,79 +512,49 @@ def create_gradio_interface():
525
  y_coord = gr.Number(label="Y", value=0, minimum=0, maximum=GRID_HEIGHT-1, step=1)
526
  grid_join_button = gr.Button("Join by Coordinates")
527
 
528
- # Chat area with multiline support
529
  chat_history_output = gr.Textbox(label="Chat History", lines=20, max_lines=20)
530
 
531
- # Message controls with multiline support
532
  with gr.Row():
533
  username_input = gr.Textbox(label="Username", placeholder="Enter your username", value="User")
534
  with gr.Column(scale=3):
535
  message_input = gr.Textbox(
536
- label="Message",
537
- placeholder="Type your message here. Press Shift+Enter for new line, Enter to send.",
538
  lines=3
539
  )
540
  with gr.Column(scale=1):
541
  send_button = gr.Button("Send")
542
  map_button = gr.Button("🗺️ Show Map")
543
 
544
- # Current room display
545
  current_room_display = gr.Textbox(label="Current Room", value="Not joined any room yet")
546
 
547
- # Event handlers
548
- refresh_button.click(
549
- list_available_rooms,
550
- inputs=[],
551
- outputs=[room_list]
552
- )
553
-
554
- clear_button.click(
555
- send_clear_command,
556
- inputs=[],
557
- outputs=[room_list]
558
- )
559
 
560
  def join_by_coordinates(x, y):
561
- """Join a room using grid coordinates."""
562
- room_id = f"{int(x)},{int(y)}"
563
- return room_id
564
-
565
- # Link grid coordinates to room ID
566
- grid_join_button.click(
567
- join_by_coordinates,
568
- inputs=[x_coord, y_coord],
569
- outputs=[room_id_input]
570
- ).then(
571
- join_room,
572
- inputs=[room_id_input, chat_history_output],
573
- outputs=[current_room_display, chat_history_output]
574
- )
575
 
576
- join_button.click(
577
- join_room,
578
- inputs=[room_id_input, chat_history_output],
579
- outputs=[current_room_display, chat_history_output]
580
  )
581
 
 
 
582
  def send_and_clear(message, username, room_id):
583
  if not room_id.startswith("Joined room:"):
584
  return "Please join a room first", message
585
 
586
  actual_room_id = room_id.replace("Joined room: ", "").strip()
587
-
588
- # Support for multi-line messages
589
  message_lines = message.strip().split("\n")
590
  formatted_msg = ""
591
 
592
  for line in message_lines:
593
- if line.strip(): # Skip empty lines
594
  sent_msg = send_message(line.strip(), username, actual_room_id)
595
  if sent_msg:
596
  formatted_msg += sent_msg + "\n"
597
 
598
- if formatted_msg:
599
- return "", formatted_msg
600
- return message, None
601
 
602
  send_button.click(
603
  send_and_clear,
@@ -608,18 +565,11 @@ def create_gradio_interface():
608
  def show_sector_map(room_id):
609
  if not room_id.startswith("Joined room:"):
610
  return "Please join a room first to view the map"
611
-
612
  return generate_sector_map()
613
 
614
- map_button.click(
615
- show_sector_map,
616
- inputs=[current_room_display],
617
- outputs=[chat_history_output]
618
- )
619
 
620
- # Handle Enter key for sending, Shift+Enter for new line
621
  def on_message_submit(message, username, room_id):
622
- # Simply call send_and_clear
623
  return send_and_clear(message, username, room_id)
624
 
625
  message_input.submit(
@@ -628,48 +578,49 @@ def create_gradio_interface():
628
  outputs=[message_input, chat_history_output]
629
  )
630
 
631
- # On load, populate room list
632
- interface.load(
633
- list_available_rooms,
634
- inputs=[],
635
- outputs=[room_list]
636
- )
637
 
638
  return interface
639
 
640
  async def process_message_queue():
641
- """Process messages in the queue and broadcast them."""
642
  global message_queue
643
-
644
  while True:
645
- # Check if there are messages to process
646
  if message_queue:
647
- # Get the oldest message
648
  msg_data = message_queue.pop(0)
649
- # Broadcast it
650
  await broadcast_message(msg_data, msg_data["room_id"])
651
-
652
- # Sleep to avoid busy-waiting
653
  await asyncio.sleep(0.1)
654
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
655
  async def main():
656
- """Main function to start the application."""
657
  global NODE_NAME, main_event_loop
658
  NODE_NAME, port = get_node_name()
659
-
660
- # Store the main event loop for later use
661
  main_event_loop = asyncio.get_running_loop()
662
 
663
- # Start WebSocket server
664
  server = await start_websocket_server()
665
-
666
- # Start message queue processor
667
  asyncio.create_task(process_message_queue())
 
668
 
669
- # Create and launch Gradio interface
670
  interface = create_gradio_interface()
671
 
672
- # Custom middleware to extract node name from URL query parameters
673
  from starlette.middleware.base import BaseHTTPMiddleware
674
 
675
  class NodeNameMiddleware(BaseHTTPMiddleware):
@@ -679,28 +630,17 @@ async def main():
679
  if "node_name" in query_params:
680
  NODE_NAME = query_params["node_name"]
681
  logger.info(f"Node name set to {NODE_NAME} from URL parameter")
682
-
683
  response = await call_next(request)
684
  return response
685
 
686
- # Apply middleware
687
  app = gr.routes.App.create_app(interface)
688
  app.add_middleware(NodeNameMiddleware)
689
 
690
- # Launch with the modified app
691
- gr.routes.mount_gradio_app(app, interface, path="/")
692
-
693
- # Run the FastAPI app with uvicorn
694
  import uvicorn
695
  config = uvicorn.Config(app, host="0.0.0.0", port=port)
696
  server = uvicorn.Server(config)
697
 
698
  logger.info(f"Starting Gradio interface on http://0.0.0.0:{port} with node name '{NODE_NAME}'")
699
-
700
- # Start message processor
701
- logger.info("Starting message queue processor")
702
-
703
- # Run the server and keep it running
704
  await server.serve()
705
 
706
  if __name__ == "__main__":
 
8
  from datetime import datetime
9
  import logging
10
  import sys
11
+ import os
12
+ import shutil
13
+ from pathlib import Path
14
+ import time
15
 
16
  # Configure logging
17
  logging.basicConfig(
 
25
  active_connections = {}
26
  # Dictionary to store message history for each chat room (in-memory cache)
27
  chat_history = {}
28
+ # Dictionary to store message history for logs
29
+ log_history = []
30
 
31
  # Dictionary to track file modification times
32
  file_modification_times = {}
 
38
  GRID_WIDTH = 10
39
  GRID_HEIGHT = 10
40
 
41
+ # Directories for persistent storage
42
  HISTORY_DIR = "chat_history"
43
+ LOG_DIR = "server_logs"
 
 
 
 
 
44
  os.makedirs(HISTORY_DIR, exist_ok=True)
45
+ os.makedirs(LOG_DIR, exist_ok=True)
46
 
47
+ # README files
48
  README_PATH = os.path.join(HISTORY_DIR, "README.md")
49
  if not os.path.exists(README_PATH):
50
  with open(README_PATH, "w") as f:
51
  f.write("# Chat History\n\nThis directory contains persistent chat history files.\n")
52
 
53
+ LOG_README_PATH = os.path.join(LOG_DIR, "README.md")
54
+ if not os.path.exists(LOG_README_PATH):
55
+ with open(LOG_README_PATH, "w") as f:
56
+ f.write("# Server Logs\n\nThis directory contains server log files.\n")
57
+
58
  # Get node name from URL or command line
59
  def get_node_name():
60
  parser = argparse.ArgumentParser(description='Start a chat node with a specific name')
 
65
  node_name = args.node_name
66
  port = args.port
67
 
 
68
  if not node_name:
69
  node_name = f"node-{uuid.uuid4().hex[:8]}"
70
 
71
  return node_name, port
72
 
73
  def get_room_history_file(room_id):
 
 
74
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
75
  return os.path.join(HISTORY_DIR, f"{room_id}_{timestamp}.jsonl")
76
 
77
+ def get_log_file():
78
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
79
+ return os.path.join(LOG_DIR, f"log_{timestamp}.jsonl")
80
+
81
  def get_all_room_history_files(room_id):
 
82
  files = []
83
  for file in os.listdir(HISTORY_DIR):
84
  if file.startswith(f"{room_id}_") and file.endswith(".jsonl"):
85
  files.append(os.path.join(HISTORY_DIR, file))
 
86
  files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
87
  return files
88
 
89
+ def list_all_history_files():
90
+ """List all history files with room IDs and modification times."""
91
+ files = []
92
+ for file in os.listdir(HISTORY_DIR):
93
+ if file.endswith(".jsonl") and file != "README.md":
94
+ parts = file.split('_', 1)
95
+ if len(parts) > 0:
96
+ room_id = parts[0]
97
+ file_path = os.path.join(HISTORY_DIR, file)
98
+ mod_time = os.path.getmtime(file_path)
99
+ files.append((room_id, file_path, mod_time))
100
+ files.sort(key=lambda x: x[2], reverse=True) # Sort by modification time
101
+ return files
102
+
103
  def load_room_history(room_id):
 
104
  if room_id not in chat_history:
105
  chat_history[room_id] = []
 
 
106
  history_files = get_all_room_history_files(room_id)
107
 
 
108
  for file in history_files:
109
  if file not in file_modification_times:
110
  file_modification_times[file] = os.path.getmtime(file)
111
 
 
112
  messages = []
113
  for history_file in history_files:
114
  try:
115
  with open(history_file, 'r') as f:
116
  for line in f:
117
  line = line.strip()
118
+ if line:
119
  try:
120
  data = json.loads(line)
121
  messages.append(data)
 
124
  except Exception as e:
125
  logger.error(f"Error loading history from {history_file}: {e}")
126
 
 
127
  messages.sort(key=lambda x: x.get("timestamp", ""), reverse=False)
128
  chat_history[room_id] = messages
 
129
  logger.info(f"Loaded {len(messages)} messages from {len(history_files)} files for room {room_id}")
130
 
 
131
  if room_id not in sector_users:
132
  sector_users[room_id] = set()
133
 
134
  return chat_history[room_id]
135
 
136
  def save_message_to_history(room_id, message):
 
 
137
  history_files = get_all_room_history_files(room_id)
 
138
  if not history_files:
 
139
  history_file = get_room_history_file(room_id)
140
  else:
 
141
  newest_file = history_files[0]
142
  if os.path.getsize(newest_file) > 1024 * 1024: # 1 MB
143
  history_file = get_room_history_file(room_id)
 
145
  history_file = newest_file
146
 
147
  try:
 
148
  with open(history_file, 'a') as f:
149
  f.write(json.dumps(message) + '\n')
 
 
150
  file_modification_times[history_file] = os.path.getmtime(history_file)
 
151
  logger.debug(f"Saved message to {history_file}")
152
  except Exception as e:
153
  logger.error(f"Error saving message to {history_file}: {e}")
154
 
155
+ def save_log_entry(entry):
156
+ """Save a log entry to the newest log file."""
157
+ log_files = [f for f in os.listdir(LOG_DIR) if f.startswith("log_") and f.endswith(".jsonl")]
158
+ log_files.sort(key=lambda x: os.path.getmtime(os.path.join(LOG_DIR, x)), reverse=True)
159
+
160
+ if not log_files:
161
+ log_file = get_log_file()
162
+ else:
163
+ newest_log = os.path.join(LOG_DIR, log_files[0])
164
+ if os.path.getsize(newest_log) > 1024 * 1024: # 1 MB
165
+ log_file = get_log_file()
166
+ else:
167
+ log_file = newest_log
168
+
169
+ try:
170
+ with open(log_file, 'a') as f:
171
+ f.write(json.dumps(entry) + '\n')
172
+ file_modification_times[log_file] = os.path.getmtime(log_file)
173
+ except Exception as e:
174
+ logger.error(f"Error saving log to {log_file}: {e}")
175
+
176
  def check_for_new_messages():
 
177
  updated_rooms = set()
 
 
178
  for file in os.listdir(HISTORY_DIR):
179
  if file.endswith(".jsonl"):
180
  file_path = os.path.join(HISTORY_DIR, file)
181
  current_mtime = os.path.getmtime(file_path)
 
 
182
  if file_path not in file_modification_times or current_mtime > file_modification_times[file_path]:
 
183
  parts = file.split('_', 1)
184
  if len(parts) > 0:
185
  room_id = parts[0]
186
  updated_rooms.add(room_id)
 
 
187
  file_modification_times[file_path] = current_mtime
188
 
 
189
  for room_id in updated_rooms:
190
  if room_id in chat_history:
 
191
  old_history_len = len(chat_history[room_id])
 
192
  chat_history[room_id] = []
193
  load_room_history(room_id)
194
  new_history_len = len(chat_history[room_id])
 
195
  if new_history_len > old_history_len:
196
  logger.info(f"Found {new_history_len - old_history_len} new messages for room {room_id}")
197
 
198
  return updated_rooms
199
 
200
+ def check_for_new_logs():
201
+ """Check for new log entries and broadcast them."""
202
+ global log_history
203
+ updated = False
204
+
205
+ for file in os.listdir(LOG_DIR):
206
+ if file.endswith(".jsonl"):
207
+ file_path = os.path.join(LOG_DIR, file)
208
+ current_mtime = os.path.getmtime(file_path)
209
+ if file_path not in file_modification_times or current_mtime > file_modification_times[file_path]:
210
+ updated = True
211
+ file_modification_times[file_path] = current_mtime
212
+
213
+ if updated:
214
+ log_history = []
215
+ for file in sorted([f for f in os.listdir(LOG_DIR) if f.endswith(".jsonl")], key=lambda x: os.path.getmtime(os.path.join(LOG_DIR, x))):
216
+ file_path = os.path.join(LOG_DIR, file)
217
+ try:
218
+ with open(file_path, 'r') as f:
219
+ for line in f:
220
+ line = line.strip()
221
+ if line:
222
+ try:
223
+ log_history.append(json.loads(line))
224
+ except json.JSONDecodeError:
225
+ logger.error(f"Error parsing log line in {file_path}")
226
+ except Exception as e:
227
+ logger.error(f"Error loading logs from {file_path}: {e}")
228
+
229
+ # Broadcast logs to all connected clients in a "logs" room
230
+ log_msg = {
231
+ "type": "log",
232
+ "content": "\n".join([entry["message"] for entry in log_history[-10:]]), # Last 10 entries
233
+ "timestamp": datetime.now().isoformat(),
234
+ "sender": "system",
235
+ "room_id": "logs"
236
+ }
237
+ asyncio.create_task(broadcast_message(log_msg, "logs"))
238
+
239
  def get_sector_coordinates(room_id):
 
240
  try:
 
241
  if ',' in room_id:
242
  x, y = map(int, room_id.split(','))
243
  return max(0, min(x, GRID_WIDTH-1)), max(0, min(y, GRID_HEIGHT-1))
244
  except:
245
  pass
 
 
246
  hash_val = hash(room_id)
247
  x = abs(hash_val) % GRID_WIDTH
248
  y = abs(hash_val >> 8) % GRID_HEIGHT
 
249
  return x, y
250
 
251
  def generate_sector_map():
 
 
252
  grid = [[' ' for _ in range(GRID_WIDTH)] for _ in range(GRID_HEIGHT)]
 
 
253
  for room_id, users in sector_users.items():
254
+ if users:
255
  x, y = get_sector_coordinates(room_id)
256
  user_count = len(users)
257
  grid[y][x] = str(min(user_count, 9)) if user_count < 10 else '+'
258
 
 
259
  header = ' ' + ''.join([str(i % 10) for i in range(GRID_WIDTH)])
260
  map_str = header + '\n'
 
261
  for y in range(GRID_HEIGHT):
262
+ row = f"{y % 10}|" + ''.join(grid[y]) + '|'
 
 
 
263
  map_str += row + '\n'
264
+ map_str += header
 
 
 
265
  return f"```\n{map_str}\n```\n\nLegend: Number indicates users in sector. '+' means 10+ users."
266
 
267
  async def clear_all_history():
 
268
  global chat_history
 
 
269
  chat_history = {}
 
 
270
  for file in os.listdir(HISTORY_DIR):
271
+ if file.endswith(".jsonl"):
272
  os.remove(os.path.join(HISTORY_DIR, file))
273
 
 
274
  clear_msg = {
275
  "type": "system",
276
  "content": "🧹 All chat history has been cleared by a user",
 
286
  return "All chat history cleared"
287
 
288
  async def websocket_handler(websocket, path):
 
289
  try:
 
290
  path_parts = path.strip('/').split('/')
291
  room_id = path_parts[0] if path_parts else "default"
292
 
 
293
  client_id = str(uuid.uuid4())
294
  if room_id not in active_connections:
295
  active_connections[room_id] = {}
 
296
  active_connections[room_id][client_id] = websocket
297
 
 
298
  if room_id not in sector_users:
299
  sector_users[room_id] = set()
300
  sector_users[room_id].add(client_id)
301
 
 
302
  x, y = get_sector_coordinates(room_id)
 
 
303
  room_history = load_room_history(room_id)
304
 
 
305
  welcome_msg = {
306
  "type": "system",
307
  "content": f"Welcome to room '{room_id}' (Sector {x},{y})! Connected from node '{NODE_NAME}'",
 
311
  }
312
  await websocket.send(json.dumps(welcome_msg))
313
 
 
314
  map_msg = {
315
  "type": "system",
316
  "content": f"Sector Map:\n{generate_sector_map()}",
 
320
  }
321
  await websocket.send(json.dumps(map_msg))
322
 
 
323
  for msg in room_history:
324
  await websocket.send(json.dumps(msg))
325
 
 
326
  join_msg = {
327
  "type": "system",
328
  "content": f"User joined the room (Sector {x},{y}) - {len(sector_users[room_id])} users now present",
 
335
 
336
  logger.info(f"New client {client_id} connected to room {room_id} (Sector {x},{y})")
337
 
338
+ if room_id == "logs":
339
+ for log_entry in log_history[-10:]: # Send last 10 log entries
340
+ log_msg = {
341
+ "type": "log",
342
+ "content": log_entry["message"],
343
+ "timestamp": log_entry["timestamp"],
344
+ "sender": "system",
345
+ "room_id": "logs"
346
+ }
347
+ await websocket.send(json.dumps(log_msg))
348
+
349
  async for message in websocket:
350
  try:
351
  data = json.loads(message)
352
 
 
353
  if data.get("type") == "command" and data.get("command") == "clear_history":
354
  result = await clear_all_history()
355
  continue
356
 
 
357
  if data.get("type") == "command" and data.get("command") == "show_map":
358
  map_msg = {
359
  "type": "system",
 
365
  await websocket.send(json.dumps(map_msg))
366
  continue
367
 
 
368
  data["timestamp"] = datetime.now().isoformat()
369
  data["sender_node"] = NODE_NAME
370
  data["room_id"] = room_id
371
 
 
372
  chat_history[room_id].append(data)
373
+ if len(chat_history[room_id]) > 500:
374
  chat_history[room_id] = chat_history[room_id][-500:]
375
 
 
376
  save_message_to_history(room_id, data)
 
 
377
  await broadcast_message(data, room_id)
378
 
379
  except json.JSONDecodeError:
 
389
  except websockets.exceptions.ConnectionClosed:
390
  logger.info(f"Client {client_id} disconnected from room {room_id}")
391
  finally:
 
392
  if room_id in active_connections and client_id in active_connections[room_id]:
393
  del active_connections[room_id][client_id]
 
 
394
  if room_id in sector_users and client_id in sector_users[room_id]:
395
  sector_users[room_id].remove(client_id)
396
 
 
397
  x, y = get_sector_coordinates(room_id)
 
 
398
  leave_msg = {
399
  "type": "system",
400
  "content": f"User left the room (Sector {x},{y}) - {len(sector_users.get(room_id, set()))} users remaining",
 
405
  await broadcast_message(leave_msg, room_id)
406
  save_message_to_history(room_id, leave_msg)
407
 
 
408
  if not active_connections[room_id]:
409
  del active_connections[room_id]
410
 
411
  async def broadcast_message(message, room_id):
 
412
  if room_id in active_connections:
413
  disconnected_clients = []
 
414
  for client_id, websocket in active_connections[room_id].items():
415
  try:
416
  await websocket.send(json.dumps(message))
417
  except websockets.exceptions.ConnectionClosed:
418
  disconnected_clients.append(client_id)
419
 
 
420
  for client_id in disconnected_clients:
421
  del active_connections[room_id][client_id]
422
 
423
  async def start_websocket_server(host='0.0.0.0', port=8765):
 
424
  server = await websockets.serve(websocket_handler, host, port)
425
  logger.info(f"WebSocket server started on ws://{host}:{port}")
426
  return server
427
 
 
428
  main_event_loop = None
429
  message_queue = []
430
 
431
  def send_message(message, username, room_id):
 
432
  if not message.strip():
433
  return None
434
 
435
  global message_queue
 
436
  msg_data = {
437
  "type": "chat",
438
  "content": message,
439
  "username": username,
440
  "room_id": room_id
441
  }
 
 
442
  message_queue.append(msg_data)
 
 
443
  formatted_msg = f"{username}: {message}"
444
  return formatted_msg
445
 
446
  def join_room(room_id, chat_history_output):
 
447
  if not room_id.strip():
448
  return "Please enter a valid room ID", chat_history_output
449
 
 
450
  room_id = urllib.parse.quote(room_id.strip())
 
 
451
  history = load_room_history(room_id)
452
 
 
453
  formatted_history = []
454
  for msg in history:
455
  if msg.get("type") == "chat":
 
468
  return f"Joined room: {room_id}", formatted_history
469
 
470
  def send_clear_command():
 
471
  global message_queue
 
472
  msg_data = {
473
  "type": "command",
474
  "command": "clear_history",
475
  "username": "System"
476
  }
 
 
477
  message_queue.append(msg_data)
 
478
  return "🧹 Clearing all chat history..."
479
 
480
  def list_available_rooms():
481
+ history_files = list_all_history_files()
 
482
 
483
  if not history_files:
484
  return "No chat rooms available yet. Create one by joining a room!"
 
491
  return room_list
492
 
493
  def create_gradio_interface():
 
494
  with gr.Blocks(title=f"Chat Node: {NODE_NAME}") as interface:
495
  gr.Markdown(f"# Chat Node: {NODE_NAME}")
496
  gr.Markdown("Join a room by entering a room ID below or create a new one.")
497
 
 
498
  with gr.Row():
499
  with gr.Column(scale=3):
500
  room_list = gr.Markdown(value="Loading available rooms...")
 
502
  with gr.Column(scale=1):
503
  clear_button = gr.Button("🧹 Clear All Chat History", variant="stop")
504
 
 
505
  with gr.Row():
506
  with gr.Column(scale=2):
507
  room_id_input = gr.Textbox(label="Room ID", placeholder="Enter room ID or use x,y coordinates")
 
512
  y_coord = gr.Number(label="Y", value=0, minimum=0, maximum=GRID_HEIGHT-1, step=1)
513
  grid_join_button = gr.Button("Join by Coordinates")
514
 
 
515
  chat_history_output = gr.Textbox(label="Chat History", lines=20, max_lines=20)
516
 
 
517
  with gr.Row():
518
  username_input = gr.Textbox(label="Username", placeholder="Enter your username", value="User")
519
  with gr.Column(scale=3):
520
  message_input = gr.Textbox(
521
+ label="Message",
522
+ placeholder="Type your message here. Press Shift+Enter for new line, Enter to send.",
523
  lines=3
524
  )
525
  with gr.Column(scale=1):
526
  send_button = gr.Button("Send")
527
  map_button = gr.Button("🗺️ Show Map")
528
 
 
529
  current_room_display = gr.Textbox(label="Current Room", value="Not joined any room yet")
530
 
531
+ refresh_button.click(list_available_rooms, inputs=[], outputs=[room_list])
532
+ clear_button.click(send_clear_command, inputs=[], outputs=[room_list])
 
 
 
 
 
 
 
 
 
 
533
 
534
  def join_by_coordinates(x, y):
535
+ return f"{int(x)},{int(y)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
536
 
537
+ grid_join_button.click(join_by_coordinates, inputs=[x_coord, y_coord], outputs=[room_id_input]).then(
538
+ join_room, inputs=[room_id_input, chat_history_output], outputs=[current_room_display, chat_history_output]
 
 
539
  )
540
 
541
+ join_button.click(join_room, inputs=[room_id_input, chat_history_output], outputs=[current_room_display, chat_history_output])
542
+
543
  def send_and_clear(message, username, room_id):
544
  if not room_id.startswith("Joined room:"):
545
  return "Please join a room first", message
546
 
547
  actual_room_id = room_id.replace("Joined room: ", "").strip()
 
 
548
  message_lines = message.strip().split("\n")
549
  formatted_msg = ""
550
 
551
  for line in message_lines:
552
+ if line.strip():
553
  sent_msg = send_message(line.strip(), username, actual_room_id)
554
  if sent_msg:
555
  formatted_msg += sent_msg + "\n"
556
 
557
+ return "", formatted_msg if formatted_msg else message, None
 
 
558
 
559
  send_button.click(
560
  send_and_clear,
 
565
  def show_sector_map(room_id):
566
  if not room_id.startswith("Joined room:"):
567
  return "Please join a room first to view the map"
 
568
  return generate_sector_map()
569
 
570
+ map_button.click(show_sector_map, inputs=[current_room_display], outputs=[chat_history_output])
 
 
 
 
571
 
 
572
  def on_message_submit(message, username, room_id):
 
573
  return send_and_clear(message, username, room_id)
574
 
575
  message_input.submit(
 
578
  outputs=[message_input, chat_history_output]
579
  )
580
 
581
+ interface.load(list_available_rooms, inputs=[], outputs=[room_list])
 
 
 
 
 
582
 
583
  return interface
584
 
585
  async def process_message_queue():
 
586
  global message_queue
 
587
  while True:
 
588
  if message_queue:
 
589
  msg_data = message_queue.pop(0)
 
590
  await broadcast_message(msg_data, msg_data["room_id"])
 
 
591
  await asyncio.sleep(0.1)
592
 
593
+ async def process_logs():
594
+ """Periodically check and broadcast new log entries."""
595
+ while True:
596
+ check_for_new_logs()
597
+ await asyncio.sleep(1) # Check every second
598
+
599
+ # Custom logging handler to save logs and broadcast them
600
+ class LogBroadcastHandler(logging.Handler):
601
+ def emit(self, record):
602
+ log_entry = {
603
+ "timestamp": datetime.now().isoformat(),
604
+ "level": record.levelname,
605
+ "message": self.format(record),
606
+ "name": record.name
607
+ }
608
+ save_log_entry(log_entry)
609
+ log_history.append(log_entry)
610
+
611
+ logger.addHandler(LogBroadcastHandler())
612
+
613
  async def main():
 
614
  global NODE_NAME, main_event_loop
615
  NODE_NAME, port = get_node_name()
 
 
616
  main_event_loop = asyncio.get_running_loop()
617
 
 
618
  server = await start_websocket_server()
 
 
619
  asyncio.create_task(process_message_queue())
620
+ asyncio.create_task(process_logs()) # Start log processor
621
 
 
622
  interface = create_gradio_interface()
623
 
 
624
  from starlette.middleware.base import BaseHTTPMiddleware
625
 
626
  class NodeNameMiddleware(BaseHTTPMiddleware):
 
630
  if "node_name" in query_params:
631
  NODE_NAME = query_params["node_name"]
632
  logger.info(f"Node name set to {NODE_NAME} from URL parameter")
 
633
  response = await call_next(request)
634
  return response
635
 
 
636
  app = gr.routes.App.create_app(interface)
637
  app.add_middleware(NodeNameMiddleware)
638
 
 
 
 
 
639
  import uvicorn
640
  config = uvicorn.Config(app, host="0.0.0.0", port=port)
641
  server = uvicorn.Server(config)
642
 
643
  logger.info(f"Starting Gradio interface on http://0.0.0.0:{port} with node name '{NODE_NAME}'")
 
 
 
 
 
644
  await server.serve()
645
 
646
  if __name__ == "__main__":