File size: 15,863 Bytes
e90d3a0
709d8e0
e90d3a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
 
 
 
857ec80
 
 
 
e90d3a0
 
 
 
 
 
 
 
 
 
 
 
709d8e0
 
e90d3a0
709d8e0
e90d3a0
 
709d8e0
e90d3a0
 
709d8e0
e90d3a0
709d8e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e90d3a0
 
 
 
 
 
709d8e0
e90d3a0
 
 
 
 
 
 
 
 
 
709d8e0
 
e90d3a0
 
 
709d8e0
e90d3a0
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
 
 
 
709d8e0
e90d3a0
 
 
 
 
709d8e0
e90d3a0
 
709d8e0
e90d3a0
 
 
 
 
709d8e0
e90d3a0
 
 
709d8e0
 
 
 
 
e90d3a0
 
 
 
 
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
709d8e0
e90d3a0
709d8e0
e90d3a0
 
 
709d8e0
 
 
e90d3a0
 
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
 
 
 
 
 
709d8e0
e90d3a0
 
 
 
 
709d8e0
e90d3a0
 
 
 
 
709d8e0
e90d3a0
 
709d8e0
 
 
e90d3a0
 
 
 
 
 
709d8e0
 
e90d3a0
 
 
 
709d8e0
e90d3a0
709d8e0
e90d3a0
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
 
 
709d8e0
e90d3a0
709d8e0
e90d3a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
709d8e0
e90d3a0
709d8e0
e90d3a0
709d8e0
e90d3a0
 
709d8e0
 
e90d3a0
 
709d8e0
e90d3a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
709d8e0
 
 
 
e90d3a0
709d8e0
 
 
 
 
 
e90d3a0
 
709d8e0
e90d3a0
 
 
 
 
 
 
709d8e0
 
 
 
e90d3a0
 
 
 
 
 
709d8e0
e90d3a0
 
709d8e0
 
 
 
e90d3a0
 
 
 
 
 
 
 
709d8e0
 
e90d3a0
 
 
 
 
 
 
 
 
 
 
 
 
 
709d8e0
e90d3a0
709d8e0
e90d3a0
 
 
 
 
 
 
709d8e0
e90d3a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
"""
Deepgram Speech-to-Text Implementation - Optimized for Voice Agent
"""
import os
import asyncio
import websockets
import json
from typing import AsyncIterator, Optional, List, Any, Dict
from datetime import datetime
import queue
import threading
import time
import traceback
import base64
from urllib.parse import urlencode

from utils.logger import log_info, log_error, log_debug, log_warning
from .stt_interface import STTInterface, STTConfig, TranscriptionResult


class DeepgramSTT(STTInterface):
    """Deepgram STT - Single utterance mode with VAD"""

    def __init__(self, api_key: str):
        if not api_key:
            raise ValueError("Deepgram API key is required")

        # Debug için API key'in ilk 5 karakterini logla
        log_info(f"🔑 Deepgram API key resolved: {api_key[:5]}... (length: {len(api_key)})")

        self.api_key = api_key
        self.websocket = None
        self.is_streaming = False
        self.responses_queue = queue.Queue(maxsize=100)
        self.ws_thread = None
        self.stop_event = threading.Event()
        
        # Session tracking
        self.session_id = 0
        self.total_audio_bytes = 0
        self.total_chunks = 0
        
        # Final result tracking
        self.final_result_received = False
        
        log_info(f"✅ Deepgram STT initialized for single utterance mode")

    def _get_websocket_url(self, config: STTConfig) -> str:
        """Build Deepgram WebSocket URL with optimized parameters"""
        base_url = "wss://api.deepgram.com/v1/listen"
        
        # Manuel olarak optimize edilmiş parametreler
        params = {
            "language": config.language,  # Dil config'den alınır
            "model": "nova-2",           # En iyi model
            "punctuate": "true",         # Noktalama işaretleri açık
            "interim_results": "false",   # ❌ Interim results KAPALI
            "utterance_end_ms": "1000",   # 1 saniye sessizlik = konuşma sonu
            "vad_events": "true",         # VAD events AÇIK
            "smart_format": "true",       # Akıllı formatlama
            "no_delay": "true",          # Düşük gecikme modu
            "encoding": "webm-opus",      # WebM Opus encoding
            "sample_rate": "16000",       # 16kHz sample rate
            "endpointing": "1000",        # 1 saniye endpointing
            "diarize": "false",          # Speaker diarization kapalı
            "multichannel": "false",      # Tek kanal
            "alternatives": "1",          # Sadece en iyi alternatif
            "profanity_filter": "false",  # Küfür filtresi kapalı
            "redact": "false",           # Redaction kapalı
            "replace": "false",          # Replace kapalı
            "search": "false",           # Search kapalı
            "keywords": "false",         # Keywords kapalı
            "filler_words": "false",     # Filler words algılama kapalı
            "numerals": "true"           # Sayıları rakam olarak yaz
        }
        
        query_string = urlencode(params)
        return f"{base_url}?{query_string}"

    async def start_streaming(self, config: STTConfig) -> None:
        """Initialize streaming session - single utterance mode"""
        try:
            # Stop any existing stream
            if self.is_streaming or self.ws_thread:
                log_warning("⚠️ Previous stream still active, stopping it first")
                await self.stop_streaming()
                await asyncio.sleep(0.5)
                
            # Reset session data
            self._reset_session_data()
            
            log_info(f"🎤 Starting Deepgram STT - Single Utterance Mode #{self.session_id}")
            log_debug(f"Language: {config.language}, Sample Rate: 16kHz, Utterance End: 1000ms")
            
            # Clear stop event
            self.stop_event.clear()
            self.final_result_received = False
            
            # Store config
            self.config = config
            
            # Start WebSocket thread
            self.is_streaming = True
            self.ws_thread = threading.Thread(
                target=self._run_websocket,
                args=(config,),
                name=f"DeepgramSTT-SingleUtterance-{self.session_id}"
            )
            self.ws_thread.daemon = True
            self.ws_thread.start()
            
            # Wait for connection
            await asyncio.sleep(0.5)
            
            if not self.is_streaming:
                raise RuntimeError("Failed to establish WebSocket connection")
                
            log_info(f"✅ Deepgram STT ready - Listening for single utterance")
            
        except Exception as e:
            log_error(f"❌ Failed to start Deepgram STT", error=str(e))
            self.is_streaming = False
            self.websocket = None
            raise

    async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
        """Stream audio chunk - only returns final results"""
        if not self.is_streaming:
            raise RuntimeError("Streaming not started. Call start_streaming() first.")
            
        # Eğer final result alındıysa, daha fazla audio kabul etme
        if self.final_result_received:
            log_debug("Final result already received, ignoring audio chunk")
            return
            
        try:
            # Send audio to WebSocket
            if self.websocket and not self.websocket.closed:
                # Send as binary data
                await asyncio.get_event_loop().run_in_executor(
                    None, 
                    self._send_audio_sync,
                    audio_chunk
                )
                
                self.total_chunks += 1
                self.total_bytes += len(audio_chunk)
                
                # Log progress every 50 chunks
                if self.total_chunks % 50 == 0:
                    log_debug(f"📊 Listening... {self.total_chunks} chunks, {self.total_bytes/1024:.1f}KB")
            
            # Check for final results only
            while True:
                try:
                    result = self.responses_queue.get_nowait()
                    # Sadece final result'ları yield et
                    if result.is_final:
                        yield result
                except queue.Empty:
                    break
                    
        except Exception as e:
            log_error(f"❌ Deepgram STT streaming error", error=str(e))
            self.is_streaming = False
            raise

    def _send_audio_sync(self, audio_chunk: bytes):
        """Synchronous method to send audio"""
        if self.websocket and not self.websocket.closed and not self.final_result_received:
            try:
                asyncio.run(self.websocket.send(audio_chunk))
            except Exception as e:
                log_error(f"❌ Error sending audio chunk: {e}")

    async def stop_streaming(self) -> Optional[TranscriptionResult]:
        """Stop streaming and dispose"""
        if not self.is_streaming and not self.ws_thread:
            log_debug("Already stopped, nothing to do")
            return None
            
        try:
            log_info(f"🛑 Disposing Deepgram STT session #{self.session_id}")
            
            # Set stop flag
            self.is_streaming = False
            self.stop_event.set()
            
            # Close WebSocket with close frame
            if self.websocket and not self.websocket.closed:
                try:
                    # Send close frame to trigger final response
                    await self.websocket.send(json.dumps({"type": "CloseStream"}))
                    await asyncio.sleep(0.2)  # Wait for final response
                    await self.websocket.close()
                except:
                    pass
                    
            # Wait for thread
            if self.ws_thread and self.ws_thread.is_alive():
                log_debug("⏳ Waiting for WebSocket thread to finish...")
                self.ws_thread.join(timeout=3.0)
                
                if self.ws_thread.is_alive():
                    log_warning("⚠️ WebSocket thread did not stop gracefully")
                else:
                    log_debug("✅ WebSocket thread finished")
                    
            # Get the final result
            final_result = None
            while not self.responses_queue.empty():
                try:
                    result = self.responses_queue.get_nowait()
                    if result.is_final:
                        final_result = result
                except queue.Empty:
                    break
                    
            # Reset everything
            self.websocket = None
            self.ws_thread = None
            self.stop_event.clear()
            self.final_result_received = False
            
            log_info(f"✅ Deepgram STT session #{self.session_id} disposed")
            return final_result
            
        except Exception as e:
            log_error(f"❌ Error during stop_streaming", error=str(e))
            self.is_streaming = False
            self.websocket = None
            self.ws_thread = None
            return None

    def _run_websocket(self, config: STTConfig):
        """Run WebSocket connection in separate thread"""
        asyncio.set_event_loop(asyncio.new_event_loop())
        loop = asyncio.get_event_loop()
        
        try:
            loop.run_until_complete(self._websocket_handler(config))
        except Exception as e:
            log_error(f"❌ WebSocket thread error", error=str(e), traceback=traceback.format_exc())
        finally:
            loop.close()
            self.is_streaming = False

    async def _websocket_handler(self, config: STTConfig):
        """Handle WebSocket connection and messages"""
        url = self._get_websocket_url(config)
        headers = {
            "Authorization": f"Token {self.api_key}"
        }
        
        try:
            log_debug(f"🔌 Connecting to Deepgram WebSocket...")
            
            async with websockets.connect(url, extra_headers=headers, ping_interval=5) as websocket:
                self.websocket = websocket
                log_info(f"✅ Connected to Deepgram - Ready for speech")
                
                # Receive messages task only (no keepalive needed for short sessions)
                receive_task = asyncio.create_task(self._receive_messages())
                
                # Wait until stop event, final result, or connection closes
                while not self.stop_event.is_set() and not websocket.closed and not self.final_result_received:
                    await asyncio.sleep(0.1)
                    
                # Cancel task
                receive_task.cancel()
                
                try:
                    await receive_task
                except asyncio.CancelledError:
                    pass
                    
        except Exception as e:
            log_error(f"❌ WebSocket connection error", error=str(e))
            self.is_streaming = False

    async def _receive_messages(self):
        """Receive and process messages from WebSocket"""
        try:
            async for message in self.websocket:
                if self.stop_event.is_set() or self.final_result_received:
                    break
                    
                try:
                    data = json.loads(message)
                    self._process_deepgram_message(data)
                except json.JSONDecodeError as e:
                    log_error(f"❌ Failed to parse message: {e}")
                    
        except websockets.exceptions.ConnectionClosed:
            log_info("WebSocket connection closed")
        except Exception as e:
            log_error(f"❌ Error receiving messages: {e}")

    def _process_deepgram_message(self, data: Dict[str, Any]):
        """Process Deepgram response message"""
        msg_type = data.get("type", "")
        
        if msg_type == "Results":
            # Transcription result
            is_final = data.get("is_final", False)
            
            # Sadece final result'ları işle
            if is_final:
                channel = data.get("channel", {})
                alternatives = channel.get("alternatives", [])
                
                if alternatives:
                    alt = alternatives[0]
                    transcript = alt.get("transcript", "")
                    confidence = alt.get("confidence", 0.0)
                    
                    # Create final result
                    result = TranscriptionResult(
                        text=transcript,
                        is_final=True,
                        confidence=confidence,
                        timestamp=datetime.now().timestamp()
                    )
                    
                    # Queue result
                    try:
                        self.responses_queue.put(result)
                        self.final_result_received = True
                        
                        log_info(f"🎯 FINAL RESULT: '{transcript}' (confidence: {confidence:.2f})")
                        log_info(f"📊 Session stats: {self.total_chunks} chunks, {self.total_bytes/1024:.1f}KB")
                        
                    except queue.Full:
                        log_warning("⚠️ Response queue full")
                        
        elif msg_type == "SpeechStarted":
            # VAD: Speech started
            log_info("🎤 Speech detected - User started speaking")
            
        elif msg_type == "UtteranceEnd":
            # VAD: Utterance ended - kullanıcı konuşmayı bitirdi
            log_info("🔚 Speech ended - User stopped speaking")
            
            # Bu noktada Deepgram final result gönderecek
            
        elif msg_type == "Error":
            # Error message
            error = data.get("error", {})
            log_error(f"❌ Deepgram error: {error}")
            
        elif msg_type == "Metadata":
            # Connection metadata
            request_id = data.get("request_id", "")
            log_debug(f"📋 Connected with request_id: {request_id}")

    def _reset_session_data(self):
        """Reset session-specific data"""
        # Clear queue
        while not self.responses_queue.empty():
            try:
                self.responses_queue.get_nowait()
            except:
                pass
                
        # Reset counters
        self.total_audio_bytes = 0
        self.total_chunks = 0
        self.session_id += 1
        self.final_result_received = False
        
        log_debug(f"🔄 Session data reset. New session ID: {self.session_id}")

    def supports_realtime(self) -> bool:
        """Deepgram supports real-time streaming"""
        return True

    def get_supported_languages(self) -> List[str]:
        """Get list of supported language codes"""
        # Deepgram Nova-2 supported languages
        return [
            "tr",      # Turkish
            "en",      # English
            "en-US",   # English (US)
            "en-GB",   # English (UK)
            "de",      # German
            "fr",      # French
            "es",      # Spanish
            "it",      # Italian
            "pt",      # Portuguese
            "ru",      # Russian
            "ja",      # Japanese
            "ko",      # Korean
            "zh",      # Chinese
            "ar",      # Arabic
            "nl",      # Dutch
            "sv",      # Swedish
            "pl",      # Polish
            "hi",      # Hindi
        ]

    def get_provider_name(self) -> str:
        """Get provider name"""
        return "deepgram"