amaye15 commited on
Commit
4107020
·
1 Parent(s): a7ff4ea

debug - web socket

Browse files
Files changed (2) hide show
  1. app/main.py +24 -13
  2. requirements.txt +1 -0
app/main.py CHANGED
@@ -114,7 +114,6 @@ async def make_api_request(method: str, endpoint: str, **kwargs):
114
 
115
  async def listen_to_websockets(token: str, notification_state: list):
116
  """Connects to WS and updates state list when a message arrives."""
117
- # <<< Add Logging >>>
118
  ws_listener_id = f"WSListener-{os.getpid()}-{asyncio.current_task().get_name()}"
119
  logger.info(f"[{ws_listener_id}] Starting WebSocket listener task.")
120
 
@@ -127,12 +126,23 @@ async def listen_to_websockets(token: str, notification_state: list):
127
  logger.info(f"[{ws_listener_id}] Attempting to connect to WebSocket: {ws_url}")
128
 
129
  try:
130
- async with asyncio.wait_for(websockets.connect(ws_url), timeout=15.0) as websocket: # Increased timeout slightly
 
 
 
 
131
  logger.info(f"[{ws_listener_id}] WebSocket connected successfully to {ws_url}")
 
 
 
 
 
 
 
132
  while True:
133
  try:
134
- message_str = await websocket.recv()
135
- # <<< Add Logging >>>
136
  logger.info(f"[{ws_listener_id}] Received raw message: {message_str}")
137
  try:
138
  message_data = json.loads(message_str)
@@ -140,12 +150,9 @@ async def listen_to_websockets(token: str, notification_state: list):
140
 
141
  if message_data.get("type") == "new_user":
142
  notification = schemas.Notification(**message_data)
143
- # <<< Add Logging >>>
144
  logger.info(f"[{ws_listener_id}] Processing 'new_user' notification: {notification.message}")
145
- # Modify the list in place
146
  notification_state.insert(0, notification.message)
147
- logger.info(f"[{ws_listener_id}] State list updated. New length: {len(notification_state)}. Content: {notification_state[:5]}") # Log first few items
148
- # Limit state history
149
  if len(notification_state) > 10:
150
  notification_state.pop()
151
  else:
@@ -156,7 +163,11 @@ async def listen_to_websockets(token: str, notification_state: list):
156
  except Exception as parse_err:
157
  logger.error(f"[{ws_listener_id}] Error processing received message: {parse_err}")
158
 
159
-
 
 
 
 
160
  except websockets.ConnectionClosedOK:
161
  logger.info(f"[{ws_listener_id}] WebSocket connection closed normally.")
162
  break
@@ -165,13 +176,13 @@ async def listen_to_websockets(token: str, notification_state: list):
165
  break
166
  except Exception as e:
167
  logger.error(f"[{ws_listener_id}] Error in WebSocket listener receive loop: {e}")
168
- await asyncio.sleep(1) # Avoid tight loop on errors
169
 
170
- except asyncio.TimeoutError:
171
- logger.error(f"[{ws_listener_id}] WebSocket connection timed out: {ws_url}")
172
  except websockets.exceptions.InvalidURI:
173
  logger.error(f"[{ws_listener_id}] Invalid WebSocket URI: {ws_url}")
174
- except websockets.exceptions.WebSocketException as e:
175
  logger.error(f"[{ws_listener_id}] WebSocket connection failed: {e}")
176
  except Exception as e:
177
  logger.error(f"[{ws_listener_id}] Unexpected error in WebSocket listener task: {e}")
 
114
 
115
  async def listen_to_websockets(token: str, notification_state: list):
116
  """Connects to WS and updates state list when a message arrives."""
 
117
  ws_listener_id = f"WSListener-{os.getpid()}-{asyncio.current_task().get_name()}"
118
  logger.info(f"[{ws_listener_id}] Starting WebSocket listener task.")
119
 
 
126
  logger.info(f"[{ws_listener_id}] Attempting to connect to WebSocket: {ws_url}")
127
 
128
  try:
129
+ # --- CORRECTED CONTEXT MANAGER USAGE ---
130
+ # Use websockets.connect directly, passing timeout parameters
131
+ # open_timeout controls connection establishment timeout
132
+ async with websockets.connect(ws_url, open_timeout=15.0) as websocket:
133
+ # --- END CORRECTION ---
134
  logger.info(f"[{ws_listener_id}] WebSocket connected successfully to {ws_url}")
135
+
136
+ # --- Add a check after connection to see active connections ---
137
+ # This helps confirm the backend sees the connection before receiving
138
+ await asyncio.sleep(0.5) # Small delay to allow backend registration
139
+ logger.info(f"[{ws_listener_id}] Connections according to manager after connect: {manager.active_connections}")
140
+ # --- End check ---
141
+
142
  while True:
143
  try:
144
+ # Add recv timeout (optional, but good practice)
145
+ message_str = await asyncio.wait_for(websocket.recv(), timeout=300.0) # e.g., 5 min timeout
146
  logger.info(f"[{ws_listener_id}] Received raw message: {message_str}")
147
  try:
148
  message_data = json.loads(message_str)
 
150
 
151
  if message_data.get("type") == "new_user":
152
  notification = schemas.Notification(**message_data)
 
153
  logger.info(f"[{ws_listener_id}] Processing 'new_user' notification: {notification.message}")
 
154
  notification_state.insert(0, notification.message)
155
+ logger.info(f"[{ws_listener_id}] State list updated. New length: {len(notification_state)}. Content: {notification_state[:5]}")
 
156
  if len(notification_state) > 10:
157
  notification_state.pop()
158
  else:
 
163
  except Exception as parse_err:
164
  logger.error(f"[{ws_listener_id}] Error processing received message: {parse_err}")
165
 
166
+ except asyncio.TimeoutError:
167
+ # No message received within timeout, keep connection alive (ping might be needed for long silences)
168
+ logger.debug(f"[{ws_listener_id}] WebSocket recv timed out, continuing loop.")
169
+ # Consider sending a ping if needed: await websocket.ping()
170
+ continue
171
  except websockets.ConnectionClosedOK:
172
  logger.info(f"[{ws_listener_id}] WebSocket connection closed normally.")
173
  break
 
176
  break
177
  except Exception as e:
178
  logger.error(f"[{ws_listener_id}] Error in WebSocket listener receive loop: {e}")
179
+ await asyncio.sleep(1)
180
 
181
+ except asyncio.TimeoutError: # Catch timeout during initial connect
182
+ logger.error(f"[{ws_listener_id}] WebSocket initial connection timed out: {ws_url}")
183
  except websockets.exceptions.InvalidURI:
184
  logger.error(f"[{ws_listener_id}] Invalid WebSocket URI: {ws_url}")
185
+ except websockets.exceptions.WebSocketException as e: # Catch connection errors
186
  logger.error(f"[{ws_listener_id}] WebSocket connection failed: {e}")
187
  except Exception as e:
188
  logger.error(f"[{ws_listener_id}] Unexpected error in WebSocket listener task: {e}")
requirements.txt CHANGED
@@ -2,6 +2,7 @@ fastapi==0.111.0
2
  uvicorn[standard]==0.29.0
3
  gradio==4.29.0
4
  passlib[bcrypt]==1.7.4
 
5
  python-dotenv==1.0.1
6
  databases[sqlite]==0.9.0 # Async DB access
7
  sqlalchemy==2.0.29 # Core needed by `databases` or for potential future use
 
2
  uvicorn[standard]==0.29.0
3
  gradio==4.29.0
4
  passlib[bcrypt]==1.7.4
5
+ bcrypt==4.1.3 # Add this line (or latest compatible version)
6
  python-dotenv==1.0.1
7
  databases[sqlite]==0.9.0 # Async DB access
8
  sqlalchemy==2.0.29 # Core needed by `databases` or for potential future use