ciyidogan commited on
Commit
90bb830
·
verified ·
1 Parent(s): 8a08517

Update stt/stt_google.py

Browse files
Files changed (1) hide show
  1. stt/stt_google.py +495 -495
stt/stt_google.py CHANGED
@@ -1,496 +1,496 @@
1
- """
2
- Google Cloud Speech-to-Text Implementation
3
- """
4
- import os
5
- import asyncio
6
- from typing import AsyncIterator, Optional, List, Any
7
- from datetime import datetime
8
- import sys
9
- import queue
10
- import threading
11
- import time
12
- import traceback
13
- from utils.logger import log_info, log_error, log_debug, log_warning
14
-
15
- # Import Google Cloud Speech only if available
16
- try:
17
- from google.cloud import speech
18
- from google.api_core import exceptions
19
- GOOGLE_SPEECH_AVAILABLE = True
20
- except ImportError:
21
- GOOGLE_SPEECH_AVAILABLE = False
22
- log_info("⚠️ Google Cloud Speech library not installed")
23
-
24
- from stt_interface import STTInterface, STTConfig, TranscriptionResult
25
-
26
- class GoogleCloudSTT(STTInterface):
27
- """Google Cloud Speech-to-Text implementation"""
28
-
29
- def __init__(self, credentials_path: str):
30
- if not GOOGLE_SPEECH_AVAILABLE:
31
- raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech")
32
-
33
- if credentials_path and os.path.exists(credentials_path):
34
- os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
35
- log_info(f"✅ Google credentials set from: {credentials_path}")
36
-
37
- # Test credential'ları
38
- try:
39
- # Client'ı burada oluşturma, her seferinde yeni instance oluştur
40
- test_client = speech.SpeechClient()
41
- log_info("🔐 Testing Google credentials...")
42
- log_info("✅ Google credentials valid")
43
- # Test client'ı kapat
44
- if hasattr(test_client, 'transport') and hasattr(test_client.transport, 'close'):
45
- test_client.transport.close()
46
- except Exception as e:
47
- log_error(f"❌ Google credentials error", error=str(e))
48
- raise
49
- else:
50
- log_error(f"❌ Google credentials path not found: {credentials_path}")
51
- raise FileNotFoundError(f"Credentials file not found: {credentials_path}")
52
-
53
- # Client'ı burada oluşturma, start_streaming'de oluştur
54
- self.client = None
55
- self.streaming_config = None
56
- self.is_streaming = False
57
- self.audio_queue = None # Queue'ları None olarak başlat
58
- self.responses_queue = None
59
- self.stream_thread = None
60
- self.stop_event = threading.Event()
61
- self.credentials_path = credentials_path
62
-
63
- # Session tracking
64
- self.session_id = 0
65
- self.total_audio_bytes = 0
66
- self.total_chunks = 0
67
-
68
- def _reset_session_data(self):
69
- """Reset all session-specific data"""
70
- # Queue'ları temizle
71
- if self.audio_queue:
72
- while not self.audio_queue.empty():
73
- try:
74
- self.audio_queue.get_nowait()
75
- except:
76
- pass
77
-
78
- if self.responses_queue:
79
- while not self.responses_queue.empty():
80
- try:
81
- self.responses_queue.get_nowait()
82
- except:
83
- pass
84
-
85
- # Counters'ı sıfırla
86
- self.total_audio_bytes = 0
87
- self.total_chunks = 0
88
-
89
- # Yeni session ID
90
- self.session_id += 1
91
-
92
- log_info(f"🔄 Google STT session data reset. New session ID: {self.session_id}")
93
-
94
- def _create_fresh_queues(self):
95
- """Create fresh queue instances"""
96
- # Eski queue'ları temizle
97
- if self.audio_queue:
98
- while not self.audio_queue.empty():
99
- try:
100
- self.audio_queue.get_nowait()
101
- except:
102
- pass
103
-
104
- if self.responses_queue:
105
- while not self.responses_queue.empty():
106
- try:
107
- self.responses_queue.get_nowait()
108
- except:
109
- pass
110
-
111
- # Yeni queue'lar oluştur
112
- self.audio_queue = queue.Queue(maxsize=1000) # Max size ekle
113
- self.responses_queue = queue.Queue(maxsize=100)
114
- log_debug("✅ Created fresh queues")
115
-
116
- async def start_streaming(self, config: dict) -> None:
117
- """Initialize streaming session with clean state"""
118
- try:
119
- # Önce mevcut stream'i temizle
120
- if self.is_streaming or self.stream_thread:
121
- log_warning("⚠️ Previous stream still active, stopping it first")
122
- await self.stop_streaming()
123
- # Temizlik için bekle
124
- await asyncio.sleep(0.5)
125
-
126
- # Session verilerini resetle ve ID'yi artır
127
- self._reset_session_data()
128
-
129
- log_info(f"🎤 Starting Google STT streaming session #{self.session_id} with config: {config}")
130
-
131
- # Fresh queue'lar oluştur
132
- self._create_fresh_queues()
133
-
134
- # Stop event'i temizle
135
- self.stop_event.clear()
136
-
137
- # Yeni client oluştur (TEK SEFER)
138
- self.client = speech.SpeechClient()
139
- log_info("✅ Created new Google Speech client")
140
-
141
- # Convert dict to STTConfig if needed
142
- if isinstance(config, dict):
143
- stt_config = STTConfig(
144
- language=config.get("language", "tr-TR"),
145
- sample_rate=config.get("sample_rate", 16000),
146
- encoding=config.get("encoding", "WEBM_OPUS"),
147
- enable_punctuation=config.get("enable_punctuation", True),
148
- interim_results=config.get("interim_results", True),
149
- single_utterance=config.get("single_utterance", False)
150
- )
151
- else:
152
- stt_config = config
153
-
154
- recognition_config = speech.RecognitionConfig(
155
- encoding=self._get_encoding(stt_config.encoding),
156
- sample_rate_hertz=stt_config.sample_rate,
157
- language_code=stt_config.language,
158
- enable_automatic_punctuation=stt_config.enable_punctuation,
159
- model="latest_long",
160
- use_enhanced=True
161
- )
162
-
163
- self.streaming_config = speech.StreamingRecognitionConfig(
164
- config=recognition_config,
165
- interim_results=stt_config.interim_results,
166
- single_utterance=stt_config.single_utterance
167
- )
168
-
169
- self.is_streaming = True
170
-
171
- # Start streaming thread with unique name
172
- self.stream_thread = threading.Thread(
173
- target=self._run_stream,
174
- name=f"GoogleSTT-Session-{self.session_id}"
175
- )
176
- self.stream_thread.daemon = True # Daemon thread olarak işaretle
177
- self.stream_thread.start()
178
-
179
- log_info(f"✅ Google STT streaming session #{self.session_id} started successfully")
180
-
181
- except Exception as e:
182
- log_error(f"❌ Failed to start Google STT streaming", error=str(e))
183
- self.is_streaming = False
184
- self.client = None
185
- self._create_fresh_queues() # Hata durumunda da queue'ları temizle
186
- raise
187
-
188
- def _put_result(self, result: TranscriptionResult):
189
- """Helper to put result in queue"""
190
- try:
191
- self.responses_queue.put(result)
192
- # Debug log'u kaldırdık
193
- except Exception as e:
194
- log_error(f"❌ Error queuing result: {e}")
195
-
196
- def _run_stream(self):
197
- """Run the streaming recognition in a separate thread"""
198
- try:
199
- log_info("🎤 Google STT stream thread started")
200
-
201
- def request_generator():
202
- """Generate streaming requests"""
203
- chunk_count = 0
204
- total_bytes = 0
205
- first_chunk_processed = False
206
-
207
- while not self.stop_event.is_set():
208
- try:
209
- chunk = self.audio_queue.get(timeout=0.1)
210
- if chunk is None:
211
- log_info("📛 Poison pill received, stopping request generator")
212
- break
213
-
214
- chunk_count += 1
215
- total_bytes += len(chunk)
216
-
217
- # İlk chunk'ta audio format kontrolü
218
- if chunk_count == 1:
219
- log_info(f"📤 First chunk - size: {len(chunk)} bytes")
220
- # Audio header kontrolü (WEBM magic bytes)
221
- if len(chunk) >= 4:
222
- if chunk[:4] == b'\x1a\x45\xdf\xa3':
223
- log_info("✅ Valid WEBM header detected")
224
- first_chunk_processed = True
225
- else:
226
- log_error(f"❌ Invalid audio format, first 4 bytes: {chunk[:4].hex()}")
227
- # Hatalı format, stream'i durdur
228
- break
229
-
230
- # İlk chunk geçerliyse devam et
231
- if chunk_count == 1 and not first_chunk_processed:
232
- break
233
-
234
- # Her 100 chunk'ta durum raporu
235
- if chunk_count % 100 == 0:
236
- avg_chunk_size = total_bytes / chunk_count
237
- log_info(f"📤 Progress: {chunk_count} chunks, {total_bytes/1024:.1f}KB total, avg {avg_chunk_size:.0f} bytes/chunk")
238
-
239
- yield speech.StreamingRecognizeRequest(audio_content=chunk)
240
-
241
- except queue.Empty:
242
- continue
243
- except Exception as e:
244
- log_error(f"❌ Error in request generator: {e}")
245
- break
246
-
247
- # Create streaming client
248
- requests = request_generator()
249
-
250
- log_info("🎤 Creating Google STT streaming client...")
251
-
252
- try:
253
- responses = self.client.streaming_recognize(
254
- self.streaming_config,
255
- requests,
256
- timeout=300 # 5 dakika timeout
257
- )
258
-
259
- log_info("✅ Google STT streaming client created")
260
-
261
- # Response timeout kontrolü
262
- last_response_time = time.time()
263
- RESPONSE_TIMEOUT = 30 # 30 saniye içinde response gelmezse
264
-
265
- # Process responses
266
- response_count = 0
267
- empty_response_count = 0
268
-
269
- for response in responses:
270
- last_response_time = time.time()
271
- response_count += 1
272
-
273
- # Response type'ı logla
274
- if response_count == 1:
275
- log_info(f"📨 First response received from Google STT")
276
-
277
- if self.stop_event.is_set():
278
- log_info("🛑 Stop event detected, breaking response loop")
279
- break
280
-
281
- # Response içeriğini kontrol et
282
- if not response.results:
283
- empty_response_count += 1
284
- if empty_response_count == 1:
285
- log_debug("📭 Received empty response (no results)")
286
- continue
287
-
288
- for i, result in enumerate(response.results):
289
- log_debug(f"📋 Result {i}: is_final={result.is_final}, alternatives={len(result.alternatives)}")
290
-
291
- if not result.alternatives:
292
- log_debug(f"📋 Result {i} has no alternatives")
293
- continue
294
-
295
- # İlk alternatifi al
296
- alternative = result.alternatives[0]
297
-
298
- # Sadece anlamlı text'leri işle
299
- if alternative.transcript.strip():
300
- # Create transcription result
301
- transcription = TranscriptionResult(
302
- text=alternative.transcript,
303
- is_final=result.is_final,
304
- confidence=alternative.confidence if hasattr(alternative, 'confidence') and alternative.confidence else 0.0,
305
- timestamp=datetime.now().timestamp()
306
- )
307
-
308
- # Put result in queue
309
- self._put_result(transcription)
310
-
311
- # SADECE final result'ları logla
312
- if result.is_final:
313
- log_info(f"🎯 GOOGLE STT FINAL: '{alternative.transcript}'")
314
- else:
315
- log_debug(f"📋 Result {i} has empty transcript")
316
- continue
317
-
318
- if time.time() - last_response_time > RESPONSE_TIMEOUT:
319
- log_error(f"❌ No response from Google STT for {RESPONSE_TIMEOUT} seconds")
320
-
321
- log_info(f"📊 Google STT stream ended. Total responses: {response_count}, Empty: {empty_response_count}")
322
-
323
- except Exception as e:
324
- error_msg = str(e)
325
-
326
- # Detaylı hata mesajları
327
- if "Exceeded maximum allowed stream duration" in error_msg:
328
- log_warning("⚠️ Stream duration limit exceeded (5 minutes). This is expected for long sessions.")
329
- elif "Bad language code" in error_msg:
330
- log_error(f"❌ Invalid language code in STT config. Check locale settings.")
331
- elif "invalid_argument" in error_msg:
332
- log_error(f"❌ Invalid STT configuration. Check encoding and sample rate.")
333
- elif "Deadline Exceeded" in error_msg:
334
- log_error(f"❌ Google STT response timeout - possibly network issue or slow connection")
335
- elif "503" in error_msg or "Service Unavailable" in error_msg:
336
- log_error(f"❌ Google STT service temporarily unavailable. Will retry...")
337
- else:
338
- log_error(f"❌ Google STT stream error: {error_msg}")
339
-
340
- except Exception as e:
341
- log_error(f"❌ Fatal error in STT stream thread", error=str(e), traceback=traceback.format_exc())
342
- finally:
343
- log_info("🎤 Google STT stream thread ended")
344
- # Thread bittiğinde streaming flag'ini kapat
345
- self.is_streaming = False
346
-
347
- async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
348
- """Stream audio chunk and get transcription results"""
349
- if not self.is_streaming:
350
- # Daha detaylı hata mesajı
351
- log_error(f"❌ STT not streaming - is_streaming: {self.is_streaming}, thread alive: {self.stream_thread and self.stream_thread.is_alive() if hasattr(self, 'stream_thread') else 'No thread'}")
352
- raise RuntimeError("Streaming not started. Call start_streaming() first.")
353
-
354
- try:
355
- # Put audio in queue for streaming thread
356
- self.audio_queue.put(audio_chunk)
357
-
358
- # Check for any results in queue
359
- while True:
360
- try:
361
- # Non-blocking get from normal queue
362
- result = self.responses_queue.get_nowait()
363
- yield result
364
- except queue.Empty:
365
- # No more results in queue
366
- break
367
-
368
- except Exception as e:
369
- log_error(f"❌ Google STT streaming error", error=str(e))
370
- # Stream'i tekrar başlatmayı tetikle
371
- self.is_streaming = False
372
- raise
373
-
374
- async def stop_streaming(self) -> Optional[TranscriptionResult]:
375
- """Stop streaming and clean up all resources"""
376
- if not self.is_streaming and not self.stream_thread:
377
- log_debug("Already stopped, nothing to do")
378
- return None
379
-
380
- try:
381
- log_info(f"🛑 Stopping Google STT streaming session #{self.session_id}")
382
-
383
- # Flag'i hemen kapat
384
- self.is_streaming = False
385
- self.stop_event.set()
386
-
387
- # Send poison pill to stop request generator
388
- if self.audio_queue:
389
- try:
390
- self.audio_queue.put(None)
391
- except:
392
- pass
393
-
394
- # Thread'i durdur
395
- if self.stream_thread and self.stream_thread.is_alive():
396
- log_info("⏳ Waiting for stream thread to finish...")
397
- self.stream_thread.join(timeout=5.0) # 5 saniye bekle
398
-
399
- if self.stream_thread.is_alive():
400
- log_warning("⚠️ STT thread did not stop gracefully after 5s")
401
- # Thread'i zorla sonlandıramayız Python'da, ama daemon olduğu için
402
- # ana program kapanınca otomatik kapanacak
403
- else:
404
- log_info("✅ Stream thread finished")
405
-
406
- # Final result'ı al
407
- final_result = None
408
- if self.responses_queue:
409
- while not self.responses_queue.empty():
410
- try:
411
- result = self.responses_queue.get_nowait()
412
- if result.is_final:
413
- final_result = result
414
- except:
415
- pass
416
-
417
- # Client'ı kapat
418
- if self.client:
419
- try:
420
- # Transport'u kapat
421
- if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'):
422
- self.client.transport.close()
423
- log_debug("✅ Client transport closed")
424
-
425
- # gRPC channel'ı kapat
426
- if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'):
427
- self.client._transport._grpc_channel.close()
428
- log_debug("✅ gRPC channel closed")
429
- except Exception as e:
430
- log_warning(f"⚠️ Error closing Google client: {e}")
431
- finally:
432
- self.client = None
433
-
434
- # Queue'ları None yap (yeniden kullanım için fresh queue gerekecek)
435
- self.audio_queue = None
436
- self.responses_queue = None
437
-
438
- # Diğer değişkenleri resetle
439
- self.stream_thread = None
440
- self.streaming_config = None
441
- self.stop_event.clear()
442
-
443
- log_info(f"✅ Google STT streaming session #{self.session_id} stopped and cleaned")
444
- return final_result
445
-
446
- except Exception as e:
447
- log_error(f"❌ Error during stop_streaming", error=str(e))
448
- # Force cleanup on error
449
- self.is_streaming = False
450
- self.stream_thread = None
451
- self.client = None
452
- self.streaming_config = None
453
- self.stop_event.clear()
454
- self.audio_queue = None
455
- self.responses_queue = None
456
- return None
457
-
458
- def supports_realtime(self) -> bool:
459
- """Google Cloud STT supports real-time streaming"""
460
- return True
461
-
462
- def get_supported_languages(self) -> List[str]:
463
- """Get list of supported language codes"""
464
- return [
465
- "tr-TR", # Turkish
466
- "en-US", # English (US)
467
- "en-GB", # English (UK)
468
- "de-DE", # German
469
- "fr-FR", # French
470
- "es-ES", # Spanish
471
- "it-IT", # Italian
472
- "pt-BR", # Portuguese (Brazil)
473
- "ru-RU", # Russian
474
- "ja-JP", # Japanese
475
- "ko-KR", # Korean
476
- "zh-CN", # Chinese (Simplified)
477
- "ar-SA", # Arabic
478
- ]
479
-
480
- def get_provider_name(self) -> str:
481
- """Get provider name"""
482
- return "google"
483
-
484
- def _get_encoding(self, encoding_str: str):
485
- """Convert encoding string to Google Speech enum"""
486
- if not GOOGLE_SPEECH_AVAILABLE:
487
- return None
488
-
489
- encoding_map = {
490
- "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS,
491
- "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16,
492
- "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC,
493
- "MP3": speech.RecognitionConfig.AudioEncoding.MP3,
494
- "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS,
495
- }
496
  return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS)
 
1
+ """
2
+ Google Cloud Speech-to-Text Implementation
3
+ """
4
+ import os
5
+ import asyncio
6
+ from typing import AsyncIterator, Optional, List, Any
7
+ from datetime import datetime
8
+ import sys
9
+ import queue
10
+ import threading
11
+ import time
12
+ import traceback
13
+ from utils.logger import log_info, log_error, log_debug, log_warning
14
+
15
+ # Import Google Cloud Speech only if available
16
+ try:
17
+ from google.cloud import speech
18
+ from google.api_core import exceptions
19
+ GOOGLE_SPEECH_AVAILABLE = True
20
+ except ImportError:
21
+ GOOGLE_SPEECH_AVAILABLE = False
22
+ log_info("⚠️ Google Cloud Speech library not installed")
23
+
24
+ from .stt_interface import STTInterface, STTConfig, TranscriptionResult
25
+
26
+ class GoogleCloudSTT(STTInterface):
27
+ """Google Cloud Speech-to-Text implementation"""
28
+
29
+ def __init__(self, credentials_path: str):
30
+ if not GOOGLE_SPEECH_AVAILABLE:
31
+ raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech")
32
+
33
+ if credentials_path and os.path.exists(credentials_path):
34
+ os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
35
+ log_info(f"✅ Google credentials set from: {credentials_path}")
36
+
37
+ # Test credential'ları
38
+ try:
39
+ # Client'ı burada oluşturma, her seferinde yeni instance oluştur
40
+ test_client = speech.SpeechClient()
41
+ log_info("🔐 Testing Google credentials...")
42
+ log_info("✅ Google credentials valid")
43
+ # Test client'ı kapat
44
+ if hasattr(test_client, 'transport') and hasattr(test_client.transport, 'close'):
45
+ test_client.transport.close()
46
+ except Exception as e:
47
+ log_error(f"❌ Google credentials error", error=str(e))
48
+ raise
49
+ else:
50
+ log_error(f"❌ Google credentials path not found: {credentials_path}")
51
+ raise FileNotFoundError(f"Credentials file not found: {credentials_path}")
52
+
53
+ # Client'ı burada oluşturma, start_streaming'de oluştur
54
+ self.client = None
55
+ self.streaming_config = None
56
+ self.is_streaming = False
57
+ self.audio_queue = None # Queue'ları None olarak başlat
58
+ self.responses_queue = None
59
+ self.stream_thread = None
60
+ self.stop_event = threading.Event()
61
+ self.credentials_path = credentials_path
62
+
63
+ # Session tracking
64
+ self.session_id = 0
65
+ self.total_audio_bytes = 0
66
+ self.total_chunks = 0
67
+
68
+ def _reset_session_data(self):
69
+ """Reset all session-specific data"""
70
+ # Queue'ları temizle
71
+ if self.audio_queue:
72
+ while not self.audio_queue.empty():
73
+ try:
74
+ self.audio_queue.get_nowait()
75
+ except:
76
+ pass
77
+
78
+ if self.responses_queue:
79
+ while not self.responses_queue.empty():
80
+ try:
81
+ self.responses_queue.get_nowait()
82
+ except:
83
+ pass
84
+
85
+ # Counters'ı sıfırla
86
+ self.total_audio_bytes = 0
87
+ self.total_chunks = 0
88
+
89
+ # Yeni session ID
90
+ self.session_id += 1
91
+
92
+ log_info(f"🔄 Google STT session data reset. New session ID: {self.session_id}")
93
+
94
+ def _create_fresh_queues(self):
95
+ """Create fresh queue instances"""
96
+ # Eski queue'ları temizle
97
+ if self.audio_queue:
98
+ while not self.audio_queue.empty():
99
+ try:
100
+ self.audio_queue.get_nowait()
101
+ except:
102
+ pass
103
+
104
+ if self.responses_queue:
105
+ while not self.responses_queue.empty():
106
+ try:
107
+ self.responses_queue.get_nowait()
108
+ except:
109
+ pass
110
+
111
+ # Yeni queue'lar oluştur
112
+ self.audio_queue = queue.Queue(maxsize=1000) # Max size ekle
113
+ self.responses_queue = queue.Queue(maxsize=100)
114
+ log_debug("✅ Created fresh queues")
115
+
116
+ async def start_streaming(self, config: dict) -> None:
117
+ """Initialize streaming session with clean state"""
118
+ try:
119
+ # Önce mevcut stream'i temizle
120
+ if self.is_streaming or self.stream_thread:
121
+ log_warning("⚠️ Previous stream still active, stopping it first")
122
+ await self.stop_streaming()
123
+ # Temizlik için bekle
124
+ await asyncio.sleep(0.5)
125
+
126
+ # Session verilerini resetle ve ID'yi artır
127
+ self._reset_session_data()
128
+
129
+ log_info(f"🎤 Starting Google STT streaming session #{self.session_id} with config: {config}")
130
+
131
+ # Fresh queue'lar oluştur
132
+ self._create_fresh_queues()
133
+
134
+ # Stop event'i temizle
135
+ self.stop_event.clear()
136
+
137
+ # Yeni client oluştur (TEK SEFER)
138
+ self.client = speech.SpeechClient()
139
+ log_info("✅ Created new Google Speech client")
140
+
141
+ # Convert dict to STTConfig if needed
142
+ if isinstance(config, dict):
143
+ stt_config = STTConfig(
144
+ language=config.get("language", "tr-TR"),
145
+ sample_rate=config.get("sample_rate", 16000),
146
+ encoding=config.get("encoding", "WEBM_OPUS"),
147
+ enable_punctuation=config.get("enable_punctuation", True),
148
+ interim_results=config.get("interim_results", True),
149
+ single_utterance=config.get("single_utterance", False)
150
+ )
151
+ else:
152
+ stt_config = config
153
+
154
+ recognition_config = speech.RecognitionConfig(
155
+ encoding=self._get_encoding(stt_config.encoding),
156
+ sample_rate_hertz=stt_config.sample_rate,
157
+ language_code=stt_config.language,
158
+ enable_automatic_punctuation=stt_config.enable_punctuation,
159
+ model="latest_long",
160
+ use_enhanced=True
161
+ )
162
+
163
+ self.streaming_config = speech.StreamingRecognitionConfig(
164
+ config=recognition_config,
165
+ interim_results=stt_config.interim_results,
166
+ single_utterance=stt_config.single_utterance
167
+ )
168
+
169
+ self.is_streaming = True
170
+
171
+ # Start streaming thread with unique name
172
+ self.stream_thread = threading.Thread(
173
+ target=self._run_stream,
174
+ name=f"GoogleSTT-Session-{self.session_id}"
175
+ )
176
+ self.stream_thread.daemon = True # Daemon thread olarak işaretle
177
+ self.stream_thread.start()
178
+
179
+ log_info(f"✅ Google STT streaming session #{self.session_id} started successfully")
180
+
181
+ except Exception as e:
182
+ log_error(f"❌ Failed to start Google STT streaming", error=str(e))
183
+ self.is_streaming = False
184
+ self.client = None
185
+ self._create_fresh_queues() # Hata durumunda da queue'ları temizle
186
+ raise
187
+
188
+ def _put_result(self, result: TranscriptionResult):
189
+ """Helper to put result in queue"""
190
+ try:
191
+ self.responses_queue.put(result)
192
+ # Debug log'u kaldırdık
193
+ except Exception as e:
194
+ log_error(f"❌ Error queuing result: {e}")
195
+
196
+ def _run_stream(self):
197
+ """Run the streaming recognition in a separate thread"""
198
+ try:
199
+ log_info("🎤 Google STT stream thread started")
200
+
201
+ def request_generator():
202
+ """Generate streaming requests"""
203
+ chunk_count = 0
204
+ total_bytes = 0
205
+ first_chunk_processed = False
206
+
207
+ while not self.stop_event.is_set():
208
+ try:
209
+ chunk = self.audio_queue.get(timeout=0.1)
210
+ if chunk is None:
211
+ log_info("📛 Poison pill received, stopping request generator")
212
+ break
213
+
214
+ chunk_count += 1
215
+ total_bytes += len(chunk)
216
+
217
+ # İlk chunk'ta audio format kontrolü
218
+ if chunk_count == 1:
219
+ log_info(f"📤 First chunk - size: {len(chunk)} bytes")
220
+ # Audio header kontrolü (WEBM magic bytes)
221
+ if len(chunk) >= 4:
222
+ if chunk[:4] == b'\x1a\x45\xdf\xa3':
223
+ log_info("✅ Valid WEBM header detected")
224
+ first_chunk_processed = True
225
+ else:
226
+ log_error(f"❌ Invalid audio format, first 4 bytes: {chunk[:4].hex()}")
227
+ # Hatalı format, stream'i durdur
228
+ break
229
+
230
+ # İlk chunk geçerliyse devam et
231
+ if chunk_count == 1 and not first_chunk_processed:
232
+ break
233
+
234
+ # Her 100 chunk'ta durum raporu
235
+ if chunk_count % 100 == 0:
236
+ avg_chunk_size = total_bytes / chunk_count
237
+ log_info(f"📤 Progress: {chunk_count} chunks, {total_bytes/1024:.1f}KB total, avg {avg_chunk_size:.0f} bytes/chunk")
238
+
239
+ yield speech.StreamingRecognizeRequest(audio_content=chunk)
240
+
241
+ except queue.Empty:
242
+ continue
243
+ except Exception as e:
244
+ log_error(f"❌ Error in request generator: {e}")
245
+ break
246
+
247
+ # Create streaming client
248
+ requests = request_generator()
249
+
250
+ log_info("🎤 Creating Google STT streaming client...")
251
+
252
+ try:
253
+ responses = self.client.streaming_recognize(
254
+ self.streaming_config,
255
+ requests,
256
+ timeout=300 # 5 dakika timeout
257
+ )
258
+
259
+ log_info("✅ Google STT streaming client created")
260
+
261
+ # Response timeout kontrolü
262
+ last_response_time = time.time()
263
+ RESPONSE_TIMEOUT = 30 # 30 saniye içinde response gelmezse
264
+
265
+ # Process responses
266
+ response_count = 0
267
+ empty_response_count = 0
268
+
269
+ for response in responses:
270
+ last_response_time = time.time()
271
+ response_count += 1
272
+
273
+ # Response type'ı logla
274
+ if response_count == 1:
275
+ log_info(f"📨 First response received from Google STT")
276
+
277
+ if self.stop_event.is_set():
278
+ log_info("🛑 Stop event detected, breaking response loop")
279
+ break
280
+
281
+ # Response içeriğini kontrol et
282
+ if not response.results:
283
+ empty_response_count += 1
284
+ if empty_response_count == 1:
285
+ log_debug("📭 Received empty response (no results)")
286
+ continue
287
+
288
+ for i, result in enumerate(response.results):
289
+ log_debug(f"📋 Result {i}: is_final={result.is_final}, alternatives={len(result.alternatives)}")
290
+
291
+ if not result.alternatives:
292
+ log_debug(f"📋 Result {i} has no alternatives")
293
+ continue
294
+
295
+ # İlk alternatifi al
296
+ alternative = result.alternatives[0]
297
+
298
+ # Sadece anlamlı text'leri işle
299
+ if alternative.transcript.strip():
300
+ # Create transcription result
301
+ transcription = TranscriptionResult(
302
+ text=alternative.transcript,
303
+ is_final=result.is_final,
304
+ confidence=alternative.confidence if hasattr(alternative, 'confidence') and alternative.confidence else 0.0,
305
+ timestamp=datetime.now().timestamp()
306
+ )
307
+
308
+ # Put result in queue
309
+ self._put_result(transcription)
310
+
311
+ # SADECE final result'ları logla
312
+ if result.is_final:
313
+ log_info(f"🎯 GOOGLE STT FINAL: '{alternative.transcript}'")
314
+ else:
315
+ log_debug(f"📋 Result {i} has empty transcript")
316
+ continue
317
+
318
+ if time.time() - last_response_time > RESPONSE_TIMEOUT:
319
+ log_error(f"❌ No response from Google STT for {RESPONSE_TIMEOUT} seconds")
320
+
321
+ log_info(f"📊 Google STT stream ended. Total responses: {response_count}, Empty: {empty_response_count}")
322
+
323
+ except Exception as e:
324
+ error_msg = str(e)
325
+
326
+ # Detaylı hata mesajları
327
+ if "Exceeded maximum allowed stream duration" in error_msg:
328
+ log_warning("⚠️ Stream duration limit exceeded (5 minutes). This is expected for long sessions.")
329
+ elif "Bad language code" in error_msg:
330
+ log_error(f"❌ Invalid language code in STT config. Check locale settings.")
331
+ elif "invalid_argument" in error_msg:
332
+ log_error(f"❌ Invalid STT configuration. Check encoding and sample rate.")
333
+ elif "Deadline Exceeded" in error_msg:
334
+ log_error(f"❌ Google STT response timeout - possibly network issue or slow connection")
335
+ elif "503" in error_msg or "Service Unavailable" in error_msg:
336
+ log_error(f"❌ Google STT service temporarily unavailable. Will retry...")
337
+ else:
338
+ log_error(f"❌ Google STT stream error: {error_msg}")
339
+
340
+ except Exception as e:
341
+ log_error(f"❌ Fatal error in STT stream thread", error=str(e), traceback=traceback.format_exc())
342
+ finally:
343
+ log_info("🎤 Google STT stream thread ended")
344
+ # Thread bittiğinde streaming flag'ini kapat
345
+ self.is_streaming = False
346
+
347
+ async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
348
+ """Stream audio chunk and get transcription results"""
349
+ if not self.is_streaming:
350
+ # Daha detaylı hata mesajı
351
+ log_error(f"❌ STT not streaming - is_streaming: {self.is_streaming}, thread alive: {self.stream_thread and self.stream_thread.is_alive() if hasattr(self, 'stream_thread') else 'No thread'}")
352
+ raise RuntimeError("Streaming not started. Call start_streaming() first.")
353
+
354
+ try:
355
+ # Put audio in queue for streaming thread
356
+ self.audio_queue.put(audio_chunk)
357
+
358
+ # Check for any results in queue
359
+ while True:
360
+ try:
361
+ # Non-blocking get from normal queue
362
+ result = self.responses_queue.get_nowait()
363
+ yield result
364
+ except queue.Empty:
365
+ # No more results in queue
366
+ break
367
+
368
+ except Exception as e:
369
+ log_error(f"❌ Google STT streaming error", error=str(e))
370
+ # Stream'i tekrar başlatmayı tetikle
371
+ self.is_streaming = False
372
+ raise
373
+
374
+ async def stop_streaming(self) -> Optional[TranscriptionResult]:
375
+ """Stop streaming and clean up all resources"""
376
+ if not self.is_streaming and not self.stream_thread:
377
+ log_debug("Already stopped, nothing to do")
378
+ return None
379
+
380
+ try:
381
+ log_info(f"🛑 Stopping Google STT streaming session #{self.session_id}")
382
+
383
+ # Flag'i hemen kapat
384
+ self.is_streaming = False
385
+ self.stop_event.set()
386
+
387
+ # Send poison pill to stop request generator
388
+ if self.audio_queue:
389
+ try:
390
+ self.audio_queue.put(None)
391
+ except:
392
+ pass
393
+
394
+ # Thread'i durdur
395
+ if self.stream_thread and self.stream_thread.is_alive():
396
+ log_info("⏳ Waiting for stream thread to finish...")
397
+ self.stream_thread.join(timeout=5.0) # 5 saniye bekle
398
+
399
+ if self.stream_thread.is_alive():
400
+ log_warning("⚠️ STT thread did not stop gracefully after 5s")
401
+ # Thread'i zorla sonlandıramayız Python'da, ama daemon olduğu için
402
+ # ana program kapanınca otomatik kapanacak
403
+ else:
404
+ log_info("✅ Stream thread finished")
405
+
406
+ # Final result'ı al
407
+ final_result = None
408
+ if self.responses_queue:
409
+ while not self.responses_queue.empty():
410
+ try:
411
+ result = self.responses_queue.get_nowait()
412
+ if result.is_final:
413
+ final_result = result
414
+ except:
415
+ pass
416
+
417
+ # Client'ı kapat
418
+ if self.client:
419
+ try:
420
+ # Transport'u kapat
421
+ if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'):
422
+ self.client.transport.close()
423
+ log_debug("✅ Client transport closed")
424
+
425
+ # gRPC channel'ı kapat
426
+ if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'):
427
+ self.client._transport._grpc_channel.close()
428
+ log_debug("✅ gRPC channel closed")
429
+ except Exception as e:
430
+ log_warning(f"⚠️ Error closing Google client: {e}")
431
+ finally:
432
+ self.client = None
433
+
434
+ # Queue'ları None yap (yeniden kullanım için fresh queue gerekecek)
435
+ self.audio_queue = None
436
+ self.responses_queue = None
437
+
438
+ # Diğer değişkenleri resetle
439
+ self.stream_thread = None
440
+ self.streaming_config = None
441
+ self.stop_event.clear()
442
+
443
+ log_info(f"✅ Google STT streaming session #{self.session_id} stopped and cleaned")
444
+ return final_result
445
+
446
+ except Exception as e:
447
+ log_error(f"❌ Error during stop_streaming", error=str(e))
448
+ # Force cleanup on error
449
+ self.is_streaming = False
450
+ self.stream_thread = None
451
+ self.client = None
452
+ self.streaming_config = None
453
+ self.stop_event.clear()
454
+ self.audio_queue = None
455
+ self.responses_queue = None
456
+ return None
457
+
458
+ def supports_realtime(self) -> bool:
459
+ """Google Cloud STT supports real-time streaming"""
460
+ return True
461
+
462
+ def get_supported_languages(self) -> List[str]:
463
+ """Get list of supported language codes"""
464
+ return [
465
+ "tr-TR", # Turkish
466
+ "en-US", # English (US)
467
+ "en-GB", # English (UK)
468
+ "de-DE", # German
469
+ "fr-FR", # French
470
+ "es-ES", # Spanish
471
+ "it-IT", # Italian
472
+ "pt-BR", # Portuguese (Brazil)
473
+ "ru-RU", # Russian
474
+ "ja-JP", # Japanese
475
+ "ko-KR", # Korean
476
+ "zh-CN", # Chinese (Simplified)
477
+ "ar-SA", # Arabic
478
+ ]
479
+
480
+ def get_provider_name(self) -> str:
481
+ """Get provider name"""
482
+ return "google"
483
+
484
+ def _get_encoding(self, encoding_str: str):
485
+ """Convert encoding string to Google Speech enum"""
486
+ if not GOOGLE_SPEECH_AVAILABLE:
487
+ return None
488
+
489
+ encoding_map = {
490
+ "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS,
491
+ "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16,
492
+ "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC,
493
+ "MP3": speech.RecognitionConfig.AudioEncoding.MP3,
494
+ "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS,
495
+ }
496
  return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS)