ciyidogan commited on
Commit
7d490e9
·
verified ·
1 Parent(s): 3c5e8b3

Update stt_google.py

Browse files
Files changed (1) hide show
  1. stt_google.py +65 -44
stt_google.py CHANGED
@@ -58,10 +58,17 @@ class GoogleCloudSTT(STTInterface):
58
  self.credentials_path = credentials_path # Sakla
59
 
60
  async def start_streaming(self, config: dict) -> None:
61
- """Initialize streaming session"""
62
  try:
63
  log_info(f"🎤 Starting Google STT streaming with config: {config}")
64
 
 
 
 
 
 
 
 
65
  # Her start_streaming'de yeni client oluştur
66
  self.client = speech.SpeechClient()
67
  log_info("✅ Created new Google Speech client")
@@ -94,18 +101,9 @@ class GoogleCloudSTT(STTInterface):
94
  single_utterance=stt_config.single_utterance
95
  )
96
 
97
- # Queue'ları temizle
98
- while not self.audio_queue.empty():
99
- try:
100
- self.audio_queue.get_nowait()
101
- except:
102
- pass
103
-
104
- while not self.responses_queue.empty():
105
- try:
106
- self.responses_queue.get_nowait()
107
- except:
108
- pass
109
 
110
  self.is_streaming = True
111
  self.stop_event.clear()
@@ -114,11 +112,12 @@ class GoogleCloudSTT(STTInterface):
114
  self.stream_thread = threading.Thread(target=self._run_stream)
115
  self.stream_thread.start()
116
 
117
- log_info("✅ Google STT streaming started successfully")
118
 
119
  except Exception as e:
120
  log_error(f"❌ Failed to start Google STT streaming", error=str(e))
121
  self.is_streaming = False
 
122
  raise
123
 
124
  def _put_result(self, result: TranscriptionResult):
@@ -308,59 +307,81 @@ class GoogleCloudSTT(STTInterface):
308
  raise
309
 
310
  async def stop_streaming(self) -> Optional[TranscriptionResult]:
311
- """Stop streaming and get final result"""
312
- if not self.is_streaming:
313
  return None
314
 
315
  try:
316
- log_info("🛑 Stopping Google STT streaming...")
317
 
 
318
  self.is_streaming = False
319
  self.stop_event.set()
320
 
321
- # Send poison pill to queue
322
- self.audio_queue.put(None)
 
323
 
324
- # Wait for thread to finish with longer timeout
325
  if self.stream_thread and self.stream_thread.is_alive():
326
  self.stream_thread.join(timeout=10.0)
327
  if self.stream_thread.is_alive():
328
- log_warning("⚠️ STT thread did not stop gracefully")
329
 
330
- # Clear queues
331
- while not self.audio_queue.empty():
332
- try:
333
- self.audio_queue.get_nowait()
334
- except:
335
- pass
336
-
 
 
 
337
  final_result = None
338
- while not self.responses_queue.empty():
339
- try:
340
- result = self.responses_queue.get_nowait()
341
- if result.is_final:
342
- final_result = result
343
- except:
344
- pass
345
-
346
- # Reset thread reference
347
- self.stream_thread = None
348
 
349
- # Client'ı kapat
350
  if self.client:
351
  try:
352
- self.client.transport.close()
353
- except:
354
- pass
355
- self.client = None
 
 
 
 
 
 
356
 
357
- log_info("✅ Google STT streaming stopped")
 
 
 
 
 
358
  return final_result
359
 
360
  except Exception as e:
361
  log_error(f"❌ Failed to stop Google STT streaming", error=str(e))
 
362
  self.stream_thread = None
363
  self.client = None
 
 
 
 
 
364
  return None
365
 
366
  def supports_realtime(self) -> bool:
 
58
  self.credentials_path = credentials_path # Sakla
59
 
60
  async def start_streaming(self, config: dict) -> None:
61
+ """Initialize streaming session with clean state"""
62
  try:
63
  log_info(f"🎤 Starting Google STT streaming with config: {config}")
64
 
65
+ # Önce mevcut stream'i temizle
66
+ if self.is_streaming or self.stream_thread:
67
+ log_warning("⚠️ Previous stream still active, stopping it first")
68
+ await self.stop_streaming()
69
+ # Temizlik için bekle
70
+ await asyncio.sleep(0.1)
71
+
72
  # Her start_streaming'de yeni client oluştur
73
  self.client = speech.SpeechClient()
74
  log_info("✅ Created new Google Speech client")
 
101
  single_utterance=stt_config.single_utterance
102
  )
103
 
104
+ # Queue'ları yeniden oluştur (tamamen temiz başla)
105
+ self.audio_queue = queue.Queue()
106
+ self.responses_queue = queue.Queue()
 
 
 
 
 
 
 
 
 
107
 
108
  self.is_streaming = True
109
  self.stop_event.clear()
 
112
  self.stream_thread = threading.Thread(target=self._run_stream)
113
  self.stream_thread.start()
114
 
115
+ log_info("✅ Google STT streaming started successfully with clean state")
116
 
117
  except Exception as e:
118
  log_error(f"❌ Failed to start Google STT streaming", error=str(e))
119
  self.is_streaming = False
120
+ self.client = None
121
  raise
122
 
123
  def _put_result(self, result: TranscriptionResult):
 
307
  raise
308
 
309
  async def stop_streaming(self) -> Optional[TranscriptionResult]:
310
+ """Stop streaming and clean up all resources"""
311
+ if not self.is_streaming and not self.stream_thread:
312
  return None
313
 
314
  try:
315
+ log_info("🛑 Stopping Google STT streaming and cleaning all resources...")
316
 
317
+ # Flag'i hemen kapat
318
  self.is_streaming = False
319
  self.stop_event.set()
320
 
321
+ # Send poison pill to stop request generator
322
+ if self.audio_queue:
323
+ self.audio_queue.put(None)
324
 
325
+ # Wait for thread to finish
326
  if self.stream_thread and self.stream_thread.is_alive():
327
  self.stream_thread.join(timeout=10.0)
328
  if self.stream_thread.is_alive():
329
+ log_warning("⚠️ STT thread did not stop gracefully, forcing termination")
330
 
331
+ # Clear all queues completely
332
+ if self.audio_queue:
333
+ while not self.audio_queue.empty():
334
+ try:
335
+ self.audio_queue.get_nowait()
336
+ except:
337
+ pass
338
+ # Queue'yu yeniden oluştur
339
+ self.audio_queue = queue.Queue()
340
+
341
  final_result = None
342
+ if self.responses_queue:
343
+ while not self.responses_queue.empty():
344
+ try:
345
+ result = self.responses_queue.get_nowait()
346
+ if result.is_final:
347
+ final_result = result
348
+ except:
349
+ pass
350
+ # Queue'yu yeniden oluştur
351
+ self.responses_queue = queue.Queue()
352
 
353
+ # Close and delete client
354
  if self.client:
355
  try:
356
+ # Transport'u kapat
357
+ if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'):
358
+ self.client.transport.close()
359
+ # gRPC channel'ı kapat
360
+ if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'):
361
+ self.client._transport._grpc_channel.close()
362
+ except Exception as e:
363
+ log_warning(f"⚠️ Error closing Google client: {e}")
364
+ finally:
365
+ self.client = None
366
 
367
+ # Reset all variables
368
+ self.stream_thread = None
369
+ self.streaming_config = None
370
+ self.stop_event.clear()
371
+
372
+ log_info("✅ Google STT streaming stopped and all resources cleaned")
373
  return final_result
374
 
375
  except Exception as e:
376
  log_error(f"❌ Failed to stop Google STT streaming", error=str(e))
377
+ # Force cleanup on error
378
  self.stream_thread = None
379
  self.client = None
380
+ self.streaming_config = None
381
+ self.is_streaming = False
382
+ self.stop_event.clear()
383
+ self.audio_queue = queue.Queue()
384
+ self.responses_queue = queue.Queue()
385
  return None
386
 
387
  def supports_realtime(self) -> bool: