da03 commited on
Commit
2da2765
·
1 Parent(s): 6466ec0
Files changed (1) hide show
  1. main.py +38 -30
main.py CHANGED
@@ -176,8 +176,8 @@ async def websocket_endpoint(websocket: WebSocket):
176
  connection_start_time = time.perf_counter()
177
  frame_count = 0
178
 
179
- # Input queue management
180
- input_queue = []
181
  is_processing = False
182
 
183
  async def process_input(data):
@@ -185,7 +185,8 @@ async def websocket_endpoint(websocket: WebSocket):
185
 
186
  try:
187
  process_start_time = time.perf_counter()
188
- print(f"[{process_start_time:.3f}] Starting to process input. Queue size before: {len(input_queue)}")
 
189
  frame_num += 1
190
  frame_count += 1 # Increment total frame counter
191
 
@@ -213,7 +214,7 @@ async def websocket_endpoint(websocket: WebSocket):
213
  previous_frame, sample_img, hidden_states, timing_info = process_frame(model, inputs)
214
  timing_info['full_frame'] = time.perf_counter() - process_start_time
215
 
216
- print(f"[{time.perf_counter():.3f}] Model inference complete. Queue size now: {len(input_queue)}")
217
  # Use the provided function to print timing statistics
218
  print_timing_stats(timing_info, frame_num)
219
 
@@ -229,18 +230,18 @@ async def websocket_endpoint(websocket: WebSocket):
229
  # Send the generated frame back to the client
230
  print(f"[{time.perf_counter():.3f}] Sending image to client...")
231
  await websocket.send_json({"image": img_str})
232
- print(f"[{time.perf_counter():.3f}] Image sent. Queue size before next_input: {len(input_queue)}")
233
  finally:
234
  is_processing = False
235
- print(f"[{time.perf_counter():.3f}] Processing complete. Queue size before checking next input: {len(input_queue)}")
236
  # Check if we have more inputs to process after this one
237
- process_next_input()
238
 
239
- def process_next_input():
240
- nonlocal input_queue, is_processing
241
 
242
  current_time = time.perf_counter()
243
- if not input_queue:
244
  print(f"[{current_time:.3f}] No inputs to process. Queue is empty.")
245
  return
246
 
@@ -251,30 +252,37 @@ async def websocket_endpoint(websocket: WebSocket):
251
  # Set is_processing to True BEFORE creating the task
252
  is_processing = True
253
 
254
- print(f"[{current_time:.3f}] Processing next input. Queue size: {len(input_queue)}")
 
255
 
256
- # Find the most recent interesting input (click or key event)
257
- interesting_indices = [i for i, data in enumerate(input_queue)
258
- if data.get("is_left_click") or
259
- data.get("is_right_click") or
260
- (data.get("keys_down") and len(data.get("keys_down")) > 0) or
261
- (data.get("keys_up") and len(data.get("keys_up")) > 0)]
 
 
 
 
 
 
262
 
263
  if interesting_indices:
264
  # There are interesting events - take the most recent one
265
  idx = interesting_indices[-1]
266
- next_input = input_queue[idx]
267
- skipped = idx # Number of events we're skipping
268
-
269
- # Clear all inputs up to and including this one
270
- input_queue = input_queue[idx+1:]
271
 
272
- print(f"[{current_time:.3f}] Processing interesting input (skipped {skipped} events). Queue size now: {len(input_queue)}")
 
 
 
 
273
  else:
274
  # No interesting events - just take the most recent movement
275
- skipped = len(input_queue) - 1 # We're processing one, so skipped = total - 1
276
- next_input = input_queue[-1]
277
- input_queue = []
278
  print(f"[{current_time:.3f}] Processing latest movement (skipped {skipped} events). Queue now empty.")
279
 
280
  # Process the selected input asynchronously
@@ -284,7 +292,7 @@ async def websocket_endpoint(websocket: WebSocket):
284
  while True:
285
  try:
286
  # Receive user input
287
- print(f"[{time.perf_counter():.3f}] Waiting for input... Queue size: {len(input_queue)}, is_processing: {is_processing}")
288
  data = await websocket.receive_json()
289
  receive_time = time.perf_counter()
290
 
@@ -293,13 +301,13 @@ async def websocket_endpoint(websocket: WebSocket):
293
  continue
294
 
295
  # Add the input to our queue
296
- input_queue.append(data)
297
- print(f"[{receive_time:.3f}] Received input. Queue size now: {len(input_queue)}")
298
 
299
  # If we're not currently processing, start processing this input
300
  if not is_processing:
301
  print(f"[{receive_time:.3f}] Not currently processing, will call process_next_input()")
302
- process_next_input()
303
  else:
304
  print(f"[{receive_time:.3f}] Currently processing, new input queued for later")
305
 
 
176
  connection_start_time = time.perf_counter()
177
  frame_count = 0
178
 
179
+ # Input queue management - use asyncio.Queue instead of a list
180
+ input_queue = asyncio.Queue()
181
  is_processing = False
182
 
183
  async def process_input(data):
 
185
 
186
  try:
187
  process_start_time = time.perf_counter()
188
+ queue_size = input_queue.qsize()
189
+ print(f"[{process_start_time:.3f}] Starting to process input. Queue size before: {queue_size}")
190
  frame_num += 1
191
  frame_count += 1 # Increment total frame counter
192
 
 
214
  previous_frame, sample_img, hidden_states, timing_info = process_frame(model, inputs)
215
  timing_info['full_frame'] = time.perf_counter() - process_start_time
216
 
217
+ print(f"[{time.perf_counter():.3f}] Model inference complete. Queue size now: {input_queue.qsize()}")
218
  # Use the provided function to print timing statistics
219
  print_timing_stats(timing_info, frame_num)
220
 
 
230
  # Send the generated frame back to the client
231
  print(f"[{time.perf_counter():.3f}] Sending image to client...")
232
  await websocket.send_json({"image": img_str})
233
+ print(f"[{time.perf_counter():.3f}] Image sent. Queue size before next_input: {input_queue.qsize()}")
234
  finally:
235
  is_processing = False
236
+ print(f"[{time.perf_counter():.3f}] Processing complete. Queue size before checking next input: {input_queue.qsize()}")
237
  # Check if we have more inputs to process after this one
238
+ asyncio.create_task(process_next_input())
239
 
240
+ async def process_next_input():
241
+ nonlocal is_processing
242
 
243
  current_time = time.perf_counter()
244
+ if input_queue.empty():
245
  print(f"[{current_time:.3f}] No inputs to process. Queue is empty.")
246
  return
247
 
 
252
  # Set is_processing to True BEFORE creating the task
253
  is_processing = True
254
 
255
+ queue_size = input_queue.qsize()
256
+ print(f"[{current_time:.3f}] Processing next input. Queue size: {queue_size}")
257
 
258
+ # Collect all inputs for analysis
259
+ all_inputs = []
260
+ while not input_queue.empty():
261
+ all_inputs.append(await input_queue.get())
262
+ input_queue.task_done()
263
+
264
+ # Find all interesting inputs
265
+ interesting_indices = [i for i, data in enumerate(all_inputs)
266
+ if data.get("is_left_click") or
267
+ data.get("is_right_click") or
268
+ (data.get("keys_down") and len(data.get("keys_down")) > 0) or
269
+ (data.get("keys_up") and len(data.get("keys_up")) > 0)]
270
 
271
  if interesting_indices:
272
  # There are interesting events - take the most recent one
273
  idx = interesting_indices[-1]
274
+ next_input = all_inputs[idx]
275
+ skipped = len(all_inputs) - 1 # We're processing one, so skipped = total - 1
 
 
 
276
 
277
+ # Put back any inputs after this one
278
+ for i in range(idx + 1, len(all_inputs)):
279
+ await input_queue.put(all_inputs[i])
280
+
281
+ print(f"[{current_time:.3f}] Processing interesting input (skipped {skipped} events). Queue size now: {input_queue.qsize()}")
282
  else:
283
  # No interesting events - just take the most recent movement
284
+ next_input = all_inputs[-1]
285
+ skipped = len(all_inputs) - 1
 
286
  print(f"[{current_time:.3f}] Processing latest movement (skipped {skipped} events). Queue now empty.")
287
 
288
  # Process the selected input asynchronously
 
292
  while True:
293
  try:
294
  # Receive user input
295
+ print(f"[{time.perf_counter():.3f}] Waiting for input... Queue size: {input_queue.qsize()}, is_processing: {is_processing}")
296
  data = await websocket.receive_json()
297
  receive_time = time.perf_counter()
298
 
 
301
  continue
302
 
303
  # Add the input to our queue
304
+ await input_queue.put(data)
305
+ print(f"[{receive_time:.3f}] Received input. Queue size now: {input_queue.qsize()}")
306
 
307
  # If we're not currently processing, start processing this input
308
  if not is_processing:
309
  print(f"[{receive_time:.3f}] Not currently processing, will call process_next_input()")
310
+ asyncio.create_task(process_next_input())
311
  else:
312
  print(f"[{receive_time:.3f}] Currently processing, new input queued for later")
313