File size: 17,311 Bytes
5d50ed0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4e4dedc
 
 
5d50ed0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4c78db1
f0f6d01
5d50ed0
 
a4bca86
68eb4f3
26bf42c
c2548fb
 
5c0e40d
 
 
c2548fb
26bf42c
f0f6d01
 
5d50ed0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3db99d
1340afa
 
 
 
5d50ed0
1340afa
 
 
 
 
 
c3db99d
5d50ed0
 
 
 
 
 
 
 
 
 
 
 
 
c3db99d
 
 
5d50ed0
c3db99d
5d50ed0
 
 
 
 
 
 
c3db99d
 
 
5d50ed0
 
c3db99d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d50ed0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6472323
5d50ed0
4e4dedc
 
 
 
6472323
1340afa
 
 
 
 
 
 
 
4e4dedc
 
 
 
 
 
 
 
 
 
 
 
b459df5
 
 
5d50ed0
b459df5
 
 
 
 
5d50ed0
 
 
6472323
 
 
 
b459df5
6472323
5d50ed0
 
6472323
5d50ed0
 
 
 
b459df5
5d50ed0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4e4dedc
 
 
5d50ed0
 
 
 
 
 
 
 
 
fa1c68b
 
5d50ed0
fa1c68b
 
 
5d50ed0
 
 
 
e90d3a0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
"""
Deepgram Speech-to-Text Implementation using Deepgram SDK
"""
import asyncio
from typing import AsyncIterator, Optional, List, Any
from datetime import datetime
import queue
import threading
import traceback

from deepgram import (
    DeepgramClient,
    DeepgramClientOptions,
    LiveTranscriptionEvents,
    LiveOptions,
    Microphone,
)

from utils.logger import log_info, log_error, log_debug, log_warning
from .stt_interface import STTInterface, STTConfig, TranscriptionResult


class DeepgramSTT(STTInterface):
    """Deepgram STT implementation using official SDK"""

    def __init__(self, api_key: str):
        if not api_key:
            raise ValueError("Deepgram API key is required")

        # Debug için API key'in ilk 10 karakterini logla
        log_info(f"🔑 Deepgram API key resolved: {api_key[:10]}... (length: {len(api_key)})")
    
        self.api_key = api_key
        self.deepgram_client = None
        self.live_connection = None
        self.is_streaming = False
        self.responses_queue = queue.Queue(maxsize=100)
        
        # Session tracking
        self.session_id = 0
        self.total_audio_bytes = 0
        self.total_chunks = 0
        
        # Final result tracking
        self.final_result_received = False
        self.stop_event = threading.Event()

        # ✅ Initial buffer for better VAD context
        self.initial_buffer = []
        
        log_info(f"✅ Deepgram STT initialized (SDK version)")

    def _map_language_code(self, language: str) -> str:
        """Map language codes to Deepgram format"""
        language_map = {
            "tr-TR": "tr",
            "en-US": "en-US",
            "en-GB": "en-GB",
            "de-DE": "de",
            "fr-FR": "fr",
            "es-ES": "es",
            "it-IT": "it",
            "pt-BR": "pt-BR",
            "ru-RU": "ru",
            "ja-JP": "ja",
            "ko-KR": "ko",
            "zh-CN": "zh-CN",
            "ar-SA": "ar",
        }
        return language_map.get(language, language)

    async def start_streaming(self, config: STTConfig) -> None:
        """Initialize streaming session using SDK"""
        try:
            # Stop any existing stream
            if self.is_streaming:
                log_warning("⚠️ Previous stream still active, stopping it first")
                await self.stop_streaming()
                await asyncio.sleep(0.5)
                
            # Reset session data
            self._reset_session_data()
            
            log_info(f"🎤 Starting Deepgram STT (SDK) - Session #{self.session_id}")
            
            # Create Deepgram client with more verbose logging for debugging
            config_options = DeepgramClientOptions(
                verbose=False,
                options={"keepalive": "true"}
            )
            self.deepgram_client = DeepgramClient(self.api_key, config=config_options)
            
            # Try minimal configuration first
            options = LiveOptions(
                language="tr",
                model="nova-2-general",
                encoding="linear16", 
                sample_rate=16000,
                interim_results=True,   # Bu zorunlu, yoksa final result da gelmiyor...
                channels=1,
                #utterance_end_ms=2000,  # 2 second silence = end
                punctuate=True,
                smart_format=True,
                #numerals=True,
                #profanity_filter=False,
                #redact=False,
                no_delay=True,
                vad_events=True,         # Enable VAD events
                #endpointing=1000
                utterance_end_ms=2000
            )
            
            log_info(f"🔧 Deepgram options: language=tr, model=nova-2, encoding=linear16, interim_results=True") 
            
            # Create live connection
            self.live_connection = self.deepgram_client.listen.live.v("1")
            
            # Setup event handlers
            self._setup_event_handlers()
            
            try:
                # Log before connection attempt
                log_info("🔌 Attempting to connect to Deepgram...")
                
                result = self.live_connection.start(options)
                log_info(f"🔌 Connection start result: {result}")
                
                if result:
                    self.is_streaming = True
                    log_info(f"✅ Deepgram SDK connected - Ready for speech")
                else:
                    # Try to get more error details
                    if hasattr(self.live_connection, 'get_error') or hasattr(self.live_connection, 'error'):
                        error_detail = getattr(self.live_connection, 'error', 'No error details')
                        log_error(f"❌ Connection failed with details: {error_detail}")
                    raise RuntimeError("Failed to start Deepgram connection")
                    
            except Exception as e:
                log_error(f"❌ Connection error: {str(e)}")
                # Log more details about the exception
                if hasattr(e, 'response'):
                    log_error(f"❌ Response: {e.response}")
                if hasattr(e, 'status_code'):
                    log_error(f"❌ Status code: {e.status_code}")
                raise
                
        except Exception as e:
            log_error(f"❌ Failed to start Deepgram STT", error=str(e))
            if hasattr(e, '__dict__'):
                log_error(f"❌ Error details: {e.__dict__}")
            self.is_streaming = False
            self.live_connection = None
            self.deepgram_client = None
            raise

    def _setup_event_handlers(self):
        """Setup event handlers for Deepgram events"""
        
        # Transcript received - use the existing class method
        self.live_connection.on(LiveTranscriptionEvents.Transcript, self._on_transcript)
        
        # Speech started
        self.live_connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)
        
        # Utterance end
        self.live_connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end)
        
        # Metadata
        self.live_connection.on(LiveTranscriptionEvents.Metadata, self._on_metadata)
        
        # Error
        self.live_connection.on(LiveTranscriptionEvents.Error, self._on_error)
        
        # Connection closed
        self.live_connection.on(LiveTranscriptionEvents.Close, self._on_close)
        
    def _on_transcript(self, *args, **kwargs):
        """Handle transcript event - SDK calls this method directly"""
        try:
            # SDK passes the result as second argument
            result = args[1] if len(args) > 1 else kwargs.get("result")
            
            if not result:
                log_warning("⚠️ No result in transcript event")
                return
    
            # ✅ Debug için result objesini detaylı inceleyin
            if self.total_chunks < 5:  # İlk birkaç event için
                log_debug(f"🔍 Result object type: {type(result)}")
                log_debug(f"🔍 Result dir: {[attr for attr in dir(result) if not attr.startswith('_')]}")
                
                # Result'un tüm property'lerini logla
                try:
                    if hasattr(result, '__dict__'):
                        log_debug(f"🔍 Result dict: {result.__dict__}")
                except:
                    pass
    
            # Access properties directly from the result object
            is_final = result.is_final if hasattr(result, 'is_final') else False
            
            # Get transcript from channel alternatives
            if hasattr(result, 'channel') and result.channel:
                alternatives = result.channel.alternatives
                if alternatives and len(alternatives) > 0:
                    transcript = alternatives[0].transcript
                    confidence = alternatives[0].confidence
                    
                    # Log all transcripts for debugging
                    log_debug(f"📝 Raw transcript: '{transcript}' (is_final: {is_final}, confidence: {confidence})")
                    
                    # ✅ ÖNEMLİ DEĞİŞİKLİK: Final result'ları boş olsa bile kabul et
                    if is_final:
                        # Final transcript - boş olabilir ama yine de işle
                        transcription_result = TranscriptionResult(
                            text=transcript or "",  # Boş string olabilir
                            is_final=is_final,
                            confidence=confidence,
                            timestamp=datetime.now().timestamp()
                        )
                        
                        try:
                            self.responses_queue.put(transcription_result)
                            self.final_result_received = True
                            
                            if transcript and transcript.strip():
                                log_info(f"🎯 FINAL TRANSCRIPT: '{transcript}' (confidence: {confidence:.2f})")
                            else:
                                log_warning(f"⚠️ Empty final transcript received - but queued for state change")
                                
                        except queue.Full:
                            log_warning("⚠️ Response queue full")
                            
                    elif transcript and transcript.strip():
                        # Interim result - sadece dolu olanları kabul et
                        transcription_result = TranscriptionResult(
                            text=transcript,
                            is_final=is_final,
                            confidence=confidence,
                            timestamp=datetime.now().timestamp()
                        )
                        
                        try:
                            self.responses_queue.put(transcription_result)
                            log_info(f"📝 Interim transcript: '{transcript}'")
                        except queue.Full:
                            log_warning("⚠️ Response queue full")
                            
        except Exception as e:
            log_error(f"❌ Error processing transcript: {e}")
            log_error(f"❌ Args: {args}")
            log_error(f"❌ Kwargs: {kwargs}")
            import traceback
            log_error(f"❌ Traceback: {traceback.format_exc()}")

    def _on_speech_started(self, *args, **kwargs):
        """Handle speech started event"""
        log_info("🎤 Speech detected - User started speaking")

    def _on_utterance_end(self, *args, **kwargs):
        """Handle utterance end event"""
        log_info("🔚 Speech ended - User stopped speaking")
        # Deepgram will send final transcript after this

    def _on_metadata(self, *args, **kwargs):
        """Handle metadata event"""
        metadata = args[1] if len(args) > 1 else kwargs.get("metadata", {})
        request_id = metadata.get("request_id", "")
        log_debug(f"📋 Deepgram metadata - Request ID: {request_id}")

    def _on_error(self, *args, **kwargs):
        """Handle error event"""
        error = args[1] if len(args) > 1 else kwargs.get("error", {})
        log_error(f"❌ Deepgram error: {error}")

    def _on_close(self, *args, **kwargs):
        """Handle connection close event"""
        log_info("🔌 Deepgram connection closed")
        self.is_streaming = False

    async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
        """Stream audio chunk and get transcription results"""
        if not self.is_streaming or not self.live_connection:
            raise RuntimeError("Streaming not started. Call start_streaming() first.")
        
        try:
            # ✅ İlk birkaç chunk'ı biriktirip gönder (daha iyi context)
            if not hasattr(self, 'initial_buffer'):
                self.initial_buffer = []
            
            # İlk birkaç chunk için audio formatını analiz et
            if self.total_chunks < 3:
                if len(audio_chunk) >= 4:
                    import struct
                    try:
                        first_sample = struct.unpack('<h', audio_chunk[:2])[0]
                        log_info(f"🔊 Audio format check - Chunk #{self.total_chunks}: First sample={first_sample}, Size={len(audio_chunk)} bytes")
                    except:
                        log_warning("⚠️ Could not parse as Linear16")

                self.initial_buffer.append(audio_chunk)
                
                # 3. chunk'ta hepsini birden gönder
                if self.total_chunks == 2:
                    combined_audio = b''.join(self.initial_buffer)
                    self.live_connection.send(combined_audio)
                    self.initial_buffer = []
                    log_info(f"🎯 Sent initial audio buffer: {len(combined_audio)} bytes")
            else:        
                # Send audio to Deepgram (final result gelse bile gönder, Deepgram kendi handle edecek)
                self.live_connection.send(audio_chunk)
            
            self.total_chunks += 1
            self.total_audio_bytes += len(audio_chunk)
            
            # Log progress
            if self.total_chunks % 50 == 0:
                log_debug(f"📊 Listening... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB")
            
            # Check queue for results
            while True:
                try:
                    result = self.responses_queue.get_nowait()
                    
                    # Log for debugging
                    log_debug(f"🎯 Yielding result: is_final={result.is_final}, text='{result.text}'")
                    
                    yield result
                        
                except queue.Empty:
                    break
                        
        except Exception as e:
            log_error(f"❌ Error streaming audio", error=str(e))
            self.is_streaming = False
            raise
            
    async def stop_streaming(self) -> Optional[TranscriptionResult]:
        """Stop streaming and clean up"""
        if not self.is_streaming:
            log_debug("Already stopped, nothing to do")
            return None
            
        try:
            log_info(f"🛑 Stopping Deepgram STT session #{self.session_id}")
            
            self.is_streaming = False
            
            # Finish the stream to get final results
            if self.live_connection:
                try:
                    # Finish the stream - this triggers final transcript
                    self.live_connection.finish()
                    
                    # Wait a bit for final result
                    await asyncio.sleep(0.5)
                    
                except Exception as e:
                    log_warning(f"⚠️ Error finishing stream: {e}")
                    
            # Get final result from queue
            final_result = None
            while not self.responses_queue.empty():
                try:
                    result = self.responses_queue.get_nowait()
                    if result.is_final:
                        final_result = result
                except queue.Empty:
                    break
                    
            # Clean up
            self.live_connection = None
            self.deepgram_client = None
            self.final_result_received = False
            
            log_info(f"✅ Deepgram STT session #{self.session_id} stopped")
            return final_result
            
        except Exception as e:
            log_error(f"❌ Error during stop_streaming", error=str(e))
            self.is_streaming = False
            self.live_connection = None
            self.deepgram_client = None
            return None

    def _reset_session_data(self):
        """Reset session-specific data"""
        # Clear queue
        while not self.responses_queue.empty():
            try:
                self.responses_queue.get_nowait()
            except:
                pass
                
        # Reset counters
        self.total_audio_bytes = 0
        self.total_chunks = 0
        self.session_id += 1
        self.final_result_received = False

        # ✅ Clear initial buffer
        self.initial_buffer = []
        
        log_debug(f"🔄 Session data reset. New session ID: {self.session_id}")

    def supports_realtime(self) -> bool:
        """Deepgram supports real-time streaming"""
        return True

    def get_supported_languages(self) -> List[str]:
        """Get list of supported language codes"""
        # Deepgram'ın desteklediği dil kodları
        # Kaynak: https://developers.deepgram.com/docs/models-languages
        return [
            "en", "es", "fr", "de", "it", "pt", "ru", "tr", "pl", "nl",
            "sv", "no", "fi", "da", "ja", "ko", "zh", "hi", "id", "th",
            "uk", "cs", "el", "he", "ar", "fa", "ta", "tl"
        ]

    def get_provider_name(self) -> str:
        """Get provider name"""
        return "deepgram"