File size: 20,844 Bytes
90bb830
 
 
 
 
9070a82
cc4dbca
90bb830
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec49027
975aa9d
 
 
 
90bb830
 
 
975aa9d
 
 
 
 
90bb830
975aa9d
 
cc4dbca
 
 
 
 
 
975aa9d
ec49027
 
 
 
 
975aa9d
ec49027
 
 
 
 
 
 
 
 
 
 
 
975aa9d
 
 
 
 
 
 
 
 
90bb830
6476752
 
 
 
 
 
 
 
 
 
 
 
 
fd94fce
 
 
 
57b160d
 
e1f0fed
57b160d
 
e1f0fed
57b160d
 
 
 
 
 
 
 
 
 
 
 
 
cc4dbca
 
 
 
 
 
 
57b160d
 
 
 
e1f0fed
57b160d
 
 
 
 
 
 
cc4dbca
 
57b160d
 
 
e1f0fed
 
57b160d
e1f0fed
 
57b160d
 
 
e1f0fed
57b160d
e1f0fed
57b160d
e1f0fed
 
 
9f5a0c1
 
 
e1f0fed
9f5a0c1
e1f0fed
9f5a0c1
 
cc4dbca
e1f0fed
cc4dbca
e1f0fed
9f5a0c1
cc4dbca
9f5a0c1
ac36ecd
9f5a0c1
 
 
 
cc4dbca
9f5a0c1
 
 
 
 
 
 
 
 
cc4dbca
e1f0fed
9f5a0c1
 
 
cc4dbca
9f5a0c1
 
cc4dbca
9f5a0c1
 
cc4dbca
9f5a0c1
e1f0fed
9f5a0c1
 
 
 
 
 
 
 
 
 
 
 
cc4dbca
9f5a0c1
 
 
 
 
 
 
e1f0fed
9f5a0c1
e1f0fed
9f5a0c1
 
 
 
 
 
 
 
e1f0fed
 
 
 
 
 
 
 
 
cc4dbca
 
e1f0fed
 
 
 
 
 
975aa9d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90bb830
975aa9d
 
 
 
90bb830
975aa9d
cc4dbca
975aa9d
 
 
90bb830
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc4dbca
90bb830
 
40f81e2
 
 
cc4dbca
40f81e2
 
cc4dbca
40f81e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc4dbca
90bb830
 
bdc222a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f327841
bdc222a
 
 
 
 
 
 
 
 
 
edc0485
bdc222a
 
 
 
 
 
 
 
 
 
 
 
 
 
48cbaf2
bdc222a
 
 
 
 
 
 
 
 
 
 
edc0485
bdc222a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f327841
90bb830
 
 
 
cc4dbca
90bb830
 
 
57b160d
90bb830
bdc222a
 
57b160d
 
 
 
90bb830
bdc222a
57b160d
cc4dbca
c5bf788
cc4dbca
 
57b160d
 
 
cc4dbca
57b160d
cc4dbca
 
 
 
57b160d
 
 
 
 
 
 
cc4dbca
 
 
 
57b160d
cc4dbca
 
7022f3d
cc4dbca
 
fd94fce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
"""
Google Cloud Speech-to-Text Implementation
"""
import os
import asyncio
from typing import AsyncIterator, AsyncGenerator, Optional, List, Any
import numpy as np
from datetime import datetime
import sys
import queue
import threading
import time
import traceback
from utils.logger import log_info, log_error, log_debug, log_warning

# Import Google Cloud Speech only if available
try:
    from google.cloud import speech
    from google.api_core import exceptions
    GOOGLE_SPEECH_AVAILABLE = True
except ImportError:
    GOOGLE_SPEECH_AVAILABLE = False
    log_info("⚠️ Google Cloud Speech library not installed")

from .stt_interface import STTInterface, STTConfig, TranscriptionResult

class GoogleCloudSTT(STTInterface):
    """Google Cloud Speech-to-Text implementation"""

    def __init__(self, credentials_path: Optional[str] = None):
        """Initialize Google Cloud STT"""
        log_info("🎀 Creating STT provider: google")
        
        # Initialize all required attributes
        self.client = None
        self.streaming_config = None
        self.stream_thread = None
        self.audio_queue = queue.Queue()
        self.responses_queue = queue.Queue()
        self.is_streaming = False
        self.should_stop = False
        self.error_message = None
        self.session_id = 0
        self.stream_start_time = None
        
        # Additional attributes
        self.lock = threading.Lock()
        self.single_utterance = False
        self.chunk_count = 0
        self.total_bytes = 0
        self.stop_event = threading.Event()
        
        # Set Google credentials
        if credentials_path:
            if os.path.exists(credentials_path):
                os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path
                log_info(f"βœ… Google credentials set from: {credentials_path}")
            else:
                log_error(f"❌ Credentials file not found: {credentials_path}")
                raise ValueError(f"Google credentials file not found: {credentials_path}")
        else:
            # Fallback to environment variable
            creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
            if not creds_path:
                creds_path = "./credentials/google-service-account.json"
                if os.path.exists(creds_path):
                    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path
                    log_info(f"βœ… Google credentials set from default: {creds_path}")
                else:
                    raise ValueError("Google credentials not found. Please provide credentials_path")
        
        # Test credentials
        try:
            log_info("πŸ” Testing Google credentials...")
            test_client = speech.SpeechClient()
            log_info("βœ… Google credentials valid")
        except Exception as e:
            log_error(f"❌ Invalid Google credentials: {e}")
            raise

    def _get_encoding(self, encoding_str: str):
        """Convert encoding string to Google Speech enum"""
        if not GOOGLE_SPEECH_AVAILABLE:
            return None
    
        encoding_map = {
            "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS,
            "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16,
            "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC,
            "MP3": speech.RecognitionConfig.AudioEncoding.MP3,
            "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS,
        }
        return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS)
    
    # Alias for compatibility
    _get_google_encoding = _get_encoding
    
    async def stream_audio(self, audio_chunk: bytes) -> AsyncGenerator[TranscriptionResult, None]:
        """Stream audio chunk and get results"""
        if not self.is_streaming:
            raise Exception("Streaming not started")
        
        try:
            chunk_size = len(audio_chunk)
            
            # Log first chunk details
            if self.chunk_count == 0:
                log_info(f"πŸ“€ First chunk - size: {chunk_size} bytes")
                if audio_chunk.startswith(b'\x1a\x45\xdf\xa3'):
                    log_info("βœ… Valid WEBM header detected")
                else:
                    hex_preview = audio_chunk[:20].hex()
                    log_warning(f"⚠️ Unexpected audio format. First 20 bytes: {hex_preview}")
            
            # Try to measure audio level (if it's raw PCM)
            try:
                if encoding_str == "LINEAR16":  # Only for raw PCM
                    audio_array = np.frombuffer(audio_chunk, dtype=np.int16)
                    if len(audio_array) > 0:
                        rms = np.sqrt(np.mean(audio_array.astype(float) ** 2))
                        db = 20 * np.log10(max(rms, 1) / 32768.0)
                        if self.chunk_count % 50 == 0:
                            log_info(f"πŸ”Š Audio level: {db:.1f} dB")
            except:
                pass
            
            # Put chunk in queue
            self.audio_queue.put(audio_chunk)
            self.chunk_count += 1
            self.total_bytes += chunk_size
            
            # Log progress
            if self.chunk_count % 50 == 0:
                log_info(f"πŸ“€ Progress: {self.chunk_count} chunks, {self.total_bytes/1024:.1f}KB total")
            
            # Check for responses
            timeout = 0.1
            end_time = time.time() + timeout
            
            while time.time() < end_time:
                try:
                    result = self.responses_queue.get_nowait()
                    log_info(f"🎯 Got result from queue: is_final={result.is_final}, text='{result.text[:30]}...'")
                    yield result
                except queue.Empty:
                    await asyncio.sleep(0.01)
                except Exception as e:
                    log_error(f"Error getting result from queue: {e}")
                    break
                    
        except Exception as e:
            log_error(f"❌ Error in stream_audio: {e}")
            raise

    async def stop_streaming(self) -> Optional[TranscriptionResult]:
        """Stop streaming and clean up all resources"""
        if not self.is_streaming and not self.stream_thread:
            log_debug("Already stopped, nothing to do")
            return None
    
        try:
            log_info(f"πŸ›‘ Stopping Google STT streaming session #{self.session_id}")
    
            # Set flags
            self.is_streaming = False
            self.should_stop = True
            self.stop_event.set()
    
            # Send poison pill
            if self.audio_queue:
                try:
                    self.audio_queue.put(None)
                except:
                    pass
    
            # Wait for thread
            if self.stream_thread and self.stream_thread.is_alive():
                log_info("⏳ Waiting for stream thread to finish...")
                self.stream_thread.join(timeout=5.0)
    
                if self.stream_thread.is_alive():
                    log_warning("⚠️ STT thread did not stop gracefully after 5s")
                else:
                    log_info("βœ… Stream thread finished")
    
            # Get final result
            final_result = None
            if self.responses_queue:
                while not self.responses_queue.empty():
                    try:
                        result = self.responses_queue.get_nowait()
                        if result.is_final:
                            final_result = result
                    except queue.Empty:
                        break
    
            # Close client
            if self.client:
                try:
                    if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'):
                        self.client.transport.close()
                        log_debug("βœ… Client transport closed")
    
                    if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'):
                        self.client._transport._grpc_channel.close()
                        log_debug("βœ… gRPC channel closed")
                except Exception as e:
                    log_warning(f"⚠️ Error closing Google client: {e}")
                finally:
                    self.client = None
    
            # Reset state
            self.audio_queue = None
            self.responses_queue = None
            self.stream_thread = None
            self.streaming_config = None
            self.stop_event.clear()
    
            log_info(f"βœ… Google STT streaming session #{self.session_id} stopped and cleaned")
            return final_result
    
        except Exception as e:
            log_error(f"❌ Error during stop_streaming", error=str(e))
            self.is_streaming = False
            self.stream_thread = None
            self.client = None
            self.streaming_config = None
            self.stop_event.clear()
            self.audio_queue = None
            self.responses_queue = None
            return None
    
    def supports_realtime(self) -> bool:
        """Google Cloud STT supports real-time streaming"""
        return True

    def get_supported_languages(self) -> List[str]:
        """Get list of supported language codes"""
        return [
            "tr-TR", "en-US", "en-GB", "de-DE", "fr-FR", "es-ES",
            "it-IT", "pt-BR", "ru-RU", "ja-JP", "ko-KR", "zh-CN", "ar-SA"
        ]

    def get_provider_name(self) -> str:
        """Get provider name"""
        return "google"
        
    def _reset_session(self):
        """Reset session data"""
        # Clear queues
        while not self.audio_queue.empty():
            try:
                self.audio_queue.get_nowait()
            except queue.Empty:
                break
                
        while not self.responses_queue.empty():
            try:
                self.responses_queue.get_nowait()
            except queue.Empty:
                break
        
        # Reset state
        self.should_stop = False
        self.error_message = None
        self.session_id += 1
        self.stream_start_time = time.time()
        self.chunk_count = 0
        self.total_bytes = 0
        
        log_info(f"πŸ”„ Google STT session data reset. New session ID: {self.session_id}")
        
        # Create fresh queues
        self.audio_queue = queue.Queue()
        self.responses_queue = queue.Queue()
        log_debug("βœ… Created fresh queues")

    def _create_fresh_queues(self):
        """Create fresh queue instances"""
        if self.audio_queue:
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except:
                    pass

        if self.responses_queue:
            while not self.responses_queue.empty():
                try:
                    self.responses_queue.get_nowait()
                except:
                    pass

        self.audio_queue = queue.Queue(maxsize=1000)
        self.responses_queue = queue.Queue(maxsize=100)
        log_debug("βœ… Created fresh queues")
    
    def _request_generator(self):
        """Generate requests for the streaming recognize API"""
        # First request with config
        yield speech.StreamingRecognizeRequest(streaming_config=self.streaming_config)
        
        # Audio chunks
        while not self.should_stop:
            try:
                audio_chunk = self.audio_queue.get(timeout=0.1)
                
                if audio_chunk is None:
                    log_info("πŸ“› Poison pill received, stopping request generator")
                    break
                    
                yield speech.StreamingRecognizeRequest(audio_content=audio_chunk)
                
            except queue.Empty:
                continue
            except Exception as e:
                log_error(f"Error in request generator: {e}")
                break
                
        log_info(f"πŸ“Š Request generator finished. Total chunks: {self.chunk_count}, Total bytes: {self.total_bytes}")
    
    async def start_streaming(self, config: STTConfig) -> None:
        """Initialize streaming session with clean state"""
        try:
            # Thread safety iΓ§in lock kullan
            async with asyncio.Lock():
                # Clean up any existing stream
                if self.is_streaming or self.stream_thread:
                    log_warning("⚠️ Previous stream still active, stopping it first")
                    await self.stop_streaming()
                    await asyncio.sleep(0.5)
                
                # Double-check after cleanup
                if self.stream_thread and self.stream_thread.is_alive():
                    log_error(f"❌ Stream thread STILL running after cleanup! Thread: {self.stream_thread.name}")
                    raise Exception("Failed to stop previous stream thread")
            
                # Reset session
                self._reset_session()
                self.single_utterance = config.single_utterance
                self.current_encoding = config.encoding
    
                log_info(f"🎀 Starting Google STT streaming session #{self.session_id} with config: {config}")
        
                # Create fresh queues
                self._create_fresh_queues()
                self.stop_event.clear()
                self.should_stop = False
        
                # Create new client
                self.client = speech.SpeechClient()
                log_info("βœ… Created new Google Speech client")
            
                # Create recognition config
                recognition_config = speech.RecognitionConfig(
                    encoding=speech.RecognitionConfig.AudioEncoding.WEBM_OPUS,
                    sample_rate_hertz=16000,
                    language_code="tr-TR",
                    enable_automatic_punctuation=True,
                    model="latest_long",
                    use_enhanced=True,
                    max_alternatives=1,
                    metadata=speech.RecognitionMetadata(
                        interaction_type=speech.RecognitionMetadata.InteractionType.VOICE_SEARCH,
                        microphone_distance=speech.RecognitionMetadata.MicrophoneDistance.NEARFIELD,
                        recording_device_type=speech.RecognitionMetadata.RecordingDeviceType.PC,
                    )
                )
                
                # Create streaming config with VAD
                self.streaming_config = speech.StreamingRecognitionConfig(
                    config=recognition_config,
                    interim_results=True,
                    single_utterance=False,
                    enable_voice_activity_events=True  # βœ… VAD events enabled
                )
                
                self.is_streaming = True
                self.stop_event.clear()
            
                # Thread başlatmadan ânce son kontrol
                if self.stream_thread is not None:
                    log_error("❌ stream_thread should be None at this point!")
                    self.stream_thread = None
                    
                self.is_streaming = True
                
                # Start streaming thread with unique ID
                thread_id = f"GoogleSTT-Session-{self.session_id}-{int(time.time()*1000)}"
                self.stream_thread = threading.Thread(
                    target=self._run_stream,
                    name=thread_id
                )
                self.stream_thread.daemon = True
                
                log_info(f"πŸš€ Starting thread: {thread_id}")
                self.stream_thread.start()
                
                log_info(f"βœ… Google STT streaming session #{self.session_id} started successfully")
    
        except Exception as e:
            log_error(f"❌ Failed to start Google STT streaming", error=str(e))
            self.is_streaming = False
            self.client = None
            self._create_fresh_queues()
            raise

    def _run_stream(self):
        """Run the streaming recognition loop in a separate thread"""
        try:
            thread_id = threading.current_thread().ident
            log_info(f"🎀 Google STT stream thread started - Thread ID: {thread_id}, Session: {self.session_id}")
            
            # Create request generator
            requests = self._request_generator()
            
            # Create streaming client
            log_info(f"🎀 Creating Google STT streaming client... Thread ID: {thread_id}")
            
            # Get responses (no timeout parameter!)
            responses = self.client.streaming_recognize(requests)
            
            # Track responses
            first_response_time = None
            response_count = 0
            
            # Process responses
            for response in responses:
                if self.should_stop:
                    log_info("πŸ›‘ Stop flag detected, ending stream")
                    break
                    
                response_count += 1
                
                if first_response_time is None:
                    first_response_time = time.time()
                    elapsed = first_response_time - self.stream_start_time
                    log_info(f"πŸŽ‰ FIRST RESPONSE from Google STT after {elapsed:.2f}s")
                
                # Check for VAD events
                if hasattr(response, 'speech_event_type') and response.speech_event_type:
                    event_type = response.speech_event_type
                    log_info(f"πŸŽ™οΈ VAD Event: {event_type}")
                    
                    if event_type == speech.StreamingRecognizeResponse.SpeechEventType.END_OF_SINGLE_UTTERANCE:
                        log_info("πŸ”š End of utterance detected by VAD")
                
                # Log response
                has_results = len(response.results) > 0 if hasattr(response, 'results') else False
                log_info(f"πŸ“¨ Google STT Response #{response_count}: has_results={has_results}")
                
                if not response.results:
                    continue
                
                # Process results
                for result_idx, result in enumerate(response.results):
                    # Check result type
                    result_type = "πŸ”„ INTERIM" if not result.is_final else "βœ… FINAL"
                    stability = getattr(result, 'stability', 0.0)
                    
                    log_info(f"{result_type} Result #{result_idx}: "
                            f"alternatives={len(result.alternatives)}, "
                            f"stability={stability:.3f}")
                    
                    if result.alternatives:
                        best_alternative = result.alternatives[0]
                        transcript = best_alternative.transcript
                        confidence = best_alternative.confidence if result.is_final else stability
                        
                        # Log transcript
                        if result.is_final:
                            log_info(f"βœ… FINAL TRANSCRIPT: '{transcript}' "
                                    f"(confidence: {confidence:.3f})")
                        else:
                            log_info(f"πŸ”„ INTERIM TRANSCRIPT: '{transcript[:100]}...' "
                                    f"(stability: {stability:.3f})")
                        
                        # Queue result
                        result_obj = TranscriptionResult(
                            text=transcript,
                            is_final=result.is_final,
                            confidence=confidence,
                            timestamp=datetime.utcnow()
                        )
                        
                        self.responses_queue.put(result_obj)
                        log_info(f"πŸ“₯ {'FINAL' if result.is_final else 'INTERIM'} result queued")
            
            # Log completion
            if response_count == 0:
                log_error("❌ Google STT stream ended without ANY responses!")
            else:
                log_info(f"βœ… Google STT stream ended normally after {response_count} responses")
                
        except Exception as e:
            log_error(f"❌ Google STT error: {e}")
            if hasattr(e, 'details'):
                log_error(f"Error details: {e.details}")
            self.error_message = str(e)
        finally:
            log_info("🎀 Google STT stream thread ended")
            with self.lock:
                self.is_streaming = False