File size: 15,226 Bytes
b0a4866
 
 
 
 
30f7ccb
c51c470
 
30f7ccb
 
8d8ad71
c51c470
 
 
30f7ccb
c51c470
 
 
 
8d8ad71
c51c470
b0a4866
 
 
 
 
 
c51c470
 
 
b0a4866
 
8d8ad71
5188a55
 
 
 
 
 
 
 
 
 
 
b0a4866
5188a55
 
b0a4866
30f7ccb
b0a4866
 
30f7ccb
4e2b388
30f7ccb
 
b0a4866
30f7ccb
b0a4866
 
30f7ccb
 
 
9681381
 
 
 
 
 
 
 
 
 
 
 
b0a4866
9681381
 
 
 
30f7ccb
 
b0a4866
 
 
 
9681381
 
b0a4866
 
 
30f7ccb
 
 
 
 
 
 
b0a4866
 
30f7ccb
 
b0a4866
1ba3535
 
 
 
 
 
 
 
c51c470
30f7ccb
93dcbd8
 
 
2955a8f
93dcbd8
 
 
 
 
 
 
 
 
 
 
 
c114a53
93dcbd8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c114a53
93dcbd8
 
5188a55
93dcbd8
 
 
 
 
30f7ccb
93dcbd8
 
 
c114a53
93dcbd8
 
bda5a73
93dcbd8
 
 
30f7ccb
93dcbd8
 
 
 
 
 
c114a53
93dcbd8
 
 
 
 
 
c114a53
93dcbd8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c114a53
93dcbd8
c114a53
93dcbd8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
486eb58
93dcbd8
 
 
 
 
 
 
2955a8f
b0a4866
 
 
c51c470
 
b0a4866
30f7ccb
 
b0a4866
4e2b388
 
 
 
30f7ccb
29c5a3f
30f7ccb
4e2b388
 
 
 
b0a4866
30f7ccb
b0a4866
c51c470
b0a4866
93dcbd8
 
 
30f7ccb
93dcbd8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c51c470
b0a4866
c51c470
b0a4866
c51c470
b0a4866
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c51c470
b0a4866
c51c470
 
 
 
 
 
 
 
 
 
b0a4866
c51c470
b0a4866
 
c51c470
b0a4866
 
c51c470
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
"""
Google Cloud Speech-to-Text Implementation
"""
import os
import asyncio
from typing import AsyncIterator, Optional, List, Any
from datetime import datetime
import sys
import queue
import threading
from logger import log_info, log_error, log_debug, log_warning

# Import Google Cloud Speech only if available
try:
    from google.cloud import speech
    from google.api_core import exceptions
    GOOGLE_SPEECH_AVAILABLE = True
except ImportError:
    GOOGLE_SPEECH_AVAILABLE = False
    log_info("⚠️ Google Cloud Speech library not installed")

from stt_interface import STTInterface, STTConfig, TranscriptionResult

class GoogleCloudSTT(STTInterface):
    """Google Cloud Speech-to-Text implementation"""
    
    def __init__(self, credentials_path: str):
        if not GOOGLE_SPEECH_AVAILABLE:
            raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech")
            
        if credentials_path and os.path.exists(credentials_path):
            os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
            log_info(f"✅ Google credentials set from: {credentials_path}")
            
            # Test credential'ları
            try:
                self.client = speech.SpeechClient()
                # Basit bir test çağrısı
                log_info("🔐 Testing Google credentials...")
                # Bu sadece client'ın oluşturulabildiğini test eder
                log_info("✅ Google credentials valid")
            except Exception as e:
                log_error(f"❌ Google credentials error", error=str(e))
                raise
        else:
            log_error(f"❌ Google credentials path not found: {credentials_path}")
            raise FileNotFoundError(f"Credentials file not found: {credentials_path}")
            
        self.client = speech.SpeechClient()
        self.streaming_config = None
        self.is_streaming = False
        self.audio_queue = queue.Queue()
        self.responses_queue = queue.Queue()  # Normal Queue, asyncio.Queue değil!
        self.stream_thread = None
        self.stop_event = threading.Event()
        
    async def start_streaming(self, config: dict) -> None:
        """Initialize streaming session"""
        try:
            log_info(f"🎤 Starting Google STT streaming with config: {config}")
            
            # Convert dict to STTConfig if needed
            if isinstance(config, dict):
                stt_config = STTConfig(
                    language=config.get("language", "tr-TR"),
                    sample_rate=config.get("sample_rate", 16000),
                    encoding=config.get("encoding", "WEBM_OPUS"),
                    enable_punctuation=config.get("enable_punctuation", True),
                    interim_results=config.get("interim_results", True),
                    single_utterance=config.get("single_utterance", False)
                )
            else:
                stt_config = config
            
            recognition_config = speech.RecognitionConfig(
                encoding=self._get_encoding(stt_config.encoding),
                sample_rate_hertz=stt_config.sample_rate,
                language_code=stt_config.language,
                enable_automatic_punctuation=stt_config.enable_punctuation,
                model="latest_long",
                use_enhanced=True
            )
            
            self.streaming_config = speech.StreamingRecognitionConfig(
                config=recognition_config,
                interim_results=stt_config.interim_results,
                single_utterance=stt_config.single_utterance
            )
            
            self.is_streaming = True
            self.stop_event.clear()
            
            # Start streaming thread
            self.stream_thread = threading.Thread(target=self._run_stream)
            self.stream_thread.start()
            
            log_info("✅ Google STT streaming started successfully")
            
        except Exception as e:
            log_error(f"❌ Failed to start Google STT streaming", error=str(e))
            self.is_streaming = False
            raise

    def _put_result(self, result: TranscriptionResult):
        """Helper to put result in queue"""
        try:
            self.responses_queue.put(result)
            # Debug log'u kaldırdık
        except Exception as e:
            log_error(f"❌ Error queuing result: {e}")
    
    def _run_stream(self):
            """Run the streaming recognition in a separate thread"""
            try:
                log_info("🎤 Google STT stream thread started")
                
                def request_generator():
                    """Generate streaming requests"""
                    chunk_count = 0
                    start_time = datetime.now()
                    
                    while not self.stop_event.is_set():
                        try:
                            # 5 dakika sınırına yaklaşıyorsak stream'i sonlandır
                            elapsed = (datetime.now() - start_time).total_seconds()
                            if elapsed > 280:  # 4 dakika 40 saniye - güvenli margin
                                log_warning(f"⚠️ Approaching 5-minute limit ({elapsed:.1f}s), ending stream gracefully")
                                break
                            
                            # Get audio chunk with timeout
                            chunk = self.audio_queue.get(timeout=0.1)
                            if chunk is None:  # Poison pill
                                log_info("📛 Poison pill received, stopping request generator")
                                break
                            chunk_count += 1
                            
                            # Sadece önemli milestone'larda logla
                            if chunk_count == 1:
                                log_info(f"📤 First chunk sent to Google STT, size: {len(chunk)} bytes")
                            elif chunk_count % 100 == 0:
                                log_info(f"📤 Sent {chunk_count} chunks to Google STT (elapsed: {elapsed:.1f}s)")
                                
                            yield speech.StreamingRecognizeRequest(audio_content=chunk)
                        except queue.Empty:
                            continue
                        except Exception as e:
                            log_error(f"❌ Error in request generator: {e}")
                            break
                
                # Create streaming client
                requests = request_generator()
                
                log_info("🎤 Creating Google STT streaming client...")
                
                try:
                    responses = self.client.streaming_recognize(self.streaming_config, requests)
                    log_info("✅ Google STT streaming client created")
                    
                    # Process responses
                    response_count = 0
                    empty_response_count = 0
                    
                    for response in responses:
                        response_count += 1
                        
                        if self.stop_event.is_set():
                            log_info("🛑 Stop event detected, breaking response loop")
                            break
                        
                        # Boş response'ları say ama loglama
                        if not response.results:
                            empty_response_count += 1
                            if empty_response_count % 50 == 0:
                                log_warning(f"⚠️ Received {empty_response_count} empty responses from Google STT")
                            continue
                            
                        for result in response.results:
                            if not result.alternatives:
                                continue
                                
                            # İlk alternatifi al
                            alternative = result.alternatives[0]
                            
                            # Sadece anlamlı text'leri işle
                            if alternative.transcript.strip():
                                # Create transcription result
                                transcription = TranscriptionResult(
                                    text=alternative.transcript,
                                    is_final=result.is_final,
                                    confidence=alternative.confidence if hasattr(alternative, 'confidence') and alternative.confidence else 0.0,
                                    timestamp=datetime.now().timestamp()
                                )
                                
                                # Put result in queue
                                self._put_result(transcription)
                                
                                # SADECE final result'ları logla
                                if result.is_final:
                                    log_info(f"🎯 GOOGLE STT FINAL: '{alternative.transcript}'")
                            
                    log_info(f"📊 Google STT stream ended. Total responses: {response_count}, Empty: {empty_response_count}")
                            
                except Exception as e:
                    error_msg = str(e)
                    
                    # Detaylı hata mesajları
                    if "Exceeded maximum allowed stream duration" in error_msg:
                        log_warning("⚠️ Stream duration limit exceeded (5 minutes). This is expected for long sessions.")
                    elif "Bad language code" in error_msg:
                        log_error(f"❌ Invalid language code in STT config. Check locale settings.")
                    elif "invalid_argument" in error_msg:
                        log_error(f"❌ Invalid STT configuration. Check encoding and sample rate.")
                    elif "Deadline Exceeded" in error_msg:
                        log_error(f"❌ Google STT timeout - possibly network issue or slow connection")
                    elif "503" in error_msg or "Service Unavailable" in error_msg:
                        log_error(f"❌ Google STT service temporarily unavailable. Will retry...")
                    else:
                        log_error(f"❌ Google STT stream error: {error_msg}")
                        
            except Exception as e:
                import traceback
                log_error(f"❌ Fatal error in STT stream thread", error=str(e), traceback=traceback.format_exc())
            finally:
                log_info("🎤 Google STT stream thread ended")
                # Thread bittiğinde streaming flag'ini kapat
                self.is_streaming = False
            
    async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
        """Stream audio chunk and get transcription results"""
        if not self.is_streaming:
            raise RuntimeError("Streaming not started. Call start_streaming() first.")
        
        try:
            # Put audio in queue for streaming thread
            self.audio_queue.put(audio_chunk)
            
            # Check for any results in queue
            while True:
                try:
                    # Non-blocking get from normal queue
                    result = self.responses_queue.get_nowait()
                    # Debug log'u kaldırdık
                    yield result
                except queue.Empty:
                    # No more results in queue
                    break
                    
        except Exception as e:
            log_error(f"❌ Google STT streaming error", error=str(e))
            raise
    
    async def stop_streaming(self) -> Optional[TranscriptionResult]:
            """Stop streaming and get final result"""
            if not self.is_streaming:
                return None
                
            try:
                log_info("🛑 Stopping Google STT streaming...")
                
                self.is_streaming = False
                self.stop_event.set()
                
                # Send poison pill to queue
                self.audio_queue.put(None)
                
                # Wait for thread to finish with longer timeout
                if self.stream_thread and self.stream_thread.is_alive():
                    self.stream_thread.join(timeout=10.0)
                    if self.stream_thread.is_alive():
                        log_warning("⚠️ STT thread did not stop gracefully")
                
                # Clear queues
                while not self.audio_queue.empty():
                    try:
                        self.audio_queue.get_nowait()
                    except:
                        pass
                        
                final_result = None
                while not self.responses_queue.empty():
                    try:
                        result = self.responses_queue.get_nowait()
                        if result.is_final:
                            final_result = result
                    except:
                        pass
                
                # Reset thread reference
                self.stream_thread = None
                
                log_info("✅ Google STT streaming stopped")
                return final_result
                
            except Exception as e:
                log_error(f"❌ Failed to stop Google STT streaming", error=str(e))
                self.stream_thread = None
                return None
    
    def supports_realtime(self) -> bool:
        """Google Cloud STT supports real-time streaming"""
        return True
    
    def get_supported_languages(self) -> List[str]:
        """Get list of supported language codes"""
        return [
            "tr-TR",  # Turkish
            "en-US",  # English (US)
            "en-GB",  # English (UK)
            "de-DE",  # German
            "fr-FR",  # French
            "es-ES",  # Spanish
            "it-IT",  # Italian
            "pt-BR",  # Portuguese (Brazil)
            "ru-RU",  # Russian
            "ja-JP",  # Japanese
            "ko-KR",  # Korean
            "zh-CN",  # Chinese (Simplified)
            "ar-SA",  # Arabic
        ]
    
    def get_provider_name(self) -> str:
        """Get provider name"""
        return "google"
    
    def _get_encoding(self, encoding_str: str):
        """Convert encoding string to Google Speech enum"""
        if not GOOGLE_SPEECH_AVAILABLE:
            return None
            
        encoding_map = {
            "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS,
            "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16,
            "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC,
            "MP3": speech.RecognitionConfig.AudioEncoding.MP3,
            "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS,
        }
        return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS)