""" Deepgram Speech-to-Text Implementation using Deepgram SDK """ import asyncio from typing import AsyncIterator, Optional, List, Any from datetime import datetime import queue import threading import traceback from deepgram import ( DeepgramClient, DeepgramClientOptions, LiveTranscriptionEvents, LiveOptions, Microphone, ) from utils.logger import log_info, log_error, log_debug, log_warning from .stt_interface import STTInterface, STTConfig, TranscriptionResult class DeepgramSTT(STTInterface): """Deepgram STT implementation using official SDK""" def __init__(self, api_key: str): if not api_key: raise ValueError("Deepgram API key is required") # Debug için API key'in ilk 10 karakterini logla log_info(f"🔑 Deepgram API key resolved: {api_key[:10]}... (length: {len(api_key)})") self.api_key = api_key self.deepgram_client = None self.live_connection = None self.is_streaming = False self.responses_queue = queue.Queue(maxsize=100) # Session tracking self.session_id = 0 self.total_audio_bytes = 0 self.total_chunks = 0 # Final result tracking self.final_result_received = False self.stop_event = threading.Event() log_info(f"✅ Deepgram STT initialized (SDK version)") def _map_language_code(self, language: str) -> str: """Map language codes to Deepgram format""" language_map = { "tr-TR": "tr", "en-US": "en-US", "en-GB": "en-GB", "de-DE": "de", "fr-FR": "fr", "es-ES": "es", "it-IT": "it", "pt-BR": "pt-BR", "ru-RU": "ru", "ja-JP": "ja", "ko-KR": "ko", "zh-CN": "zh-CN", "ar-SA": "ar", } return language_map.get(language, language) async def start_streaming(self, config: STTConfig) -> None: """Initialize streaming session using SDK""" try: # Stop any existing stream if self.is_streaming: log_warning("⚠️ Previous stream still active, stopping it first") await self.stop_streaming() await asyncio.sleep(0.5) # Reset session data self._reset_session_data() log_info(f"🎤 Starting Deepgram STT (SDK) - Session #{self.session_id}") # Create Deepgram client with more verbose logging for debugging config_options = DeepgramClientOptions( verbose=False, options={"keepalive": "true"} ) self.deepgram_client = DeepgramClient(self.api_key, config=config_options) # Try minimal configuration first options = LiveOptions( language="tr", model="nova-2", encoding="linear16", sample_rate=16000, interim_results=True, # Bu zorunlu, yoksa final result da gelmiyor... channels=1, utterance_end_ms=2000, # 2 second silence = end punctuate=True, smart_format=True, #numerals=True, #profanity_filter=False, #redact=False, no_delay=True, vad_events=True # Enable VAD events ) log_info(f"🔧 Deepgram options: language=tr, model=nova-2, encoding=linear16, interim_results=True") # Create live connection self.live_connection = self.deepgram_client.listen.live.v("1") # Setup event handlers self._setup_event_handlers() try: # Log before connection attempt log_info("🔌 Attempting to connect to Deepgram...") result = self.live_connection.start(options) log_info(f"🔌 Connection start result: {result}") if result: self.is_streaming = True log_info(f"✅ Deepgram SDK connected - Ready for speech") else: # Try to get more error details if hasattr(self.live_connection, 'get_error') or hasattr(self.live_connection, 'error'): error_detail = getattr(self.live_connection, 'error', 'No error details') log_error(f"❌ Connection failed with details: {error_detail}") raise RuntimeError("Failed to start Deepgram connection") except Exception as e: log_error(f"❌ Connection error: {str(e)}") # Log more details about the exception if hasattr(e, 'response'): log_error(f"❌ Response: {e.response}") if hasattr(e, 'status_code'): log_error(f"❌ Status code: {e.status_code}") raise except Exception as e: log_error(f"❌ Failed to start Deepgram STT", error=str(e)) if hasattr(e, '__dict__'): log_error(f"❌ Error details: {e.__dict__}") self.is_streaming = False self.live_connection = None self.deepgram_client = None raise def _setup_event_handlers(self): """Setup event handlers for Deepgram events""" # Transcript received - use the existing class method self.live_connection.on(LiveTranscriptionEvents.Transcript, self._on_transcript) # Speech started self.live_connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started) # Utterance end self.live_connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end) # Metadata self.live_connection.on(LiveTranscriptionEvents.Metadata, self._on_metadata) # Error self.live_connection.on(LiveTranscriptionEvents.Error, self._on_error) # Connection closed self.live_connection.on(LiveTranscriptionEvents.Close, self._on_close) def _on_transcript(self, *args, **kwargs): """Handle transcript event - SDK calls this method directly""" try: # SDK passes the result as second argument result = args[1] if len(args) > 1 else kwargs.get("result") if not result: log_warning("⚠️ No result in transcript event") return # ✅ Debug için result objesini detaylı inceleyin if self.total_chunks < 5: # İlk birkaç event için log_debug(f"🔍 Result object type: {type(result)}") log_debug(f"🔍 Result dir: {[attr for attr in dir(result) if not attr.startswith('_')]}") # Result'un tüm property'lerini logla try: if hasattr(result, '__dict__'): log_debug(f"🔍 Result dict: {result.__dict__}") except: pass # Access properties directly from the result object is_final = result.is_final if hasattr(result, 'is_final') else False # Get transcript from channel alternatives if hasattr(result, 'channel') and result.channel: alternatives = result.channel.alternatives if alternatives and len(alternatives) > 0: transcript = alternatives[0].transcript confidence = alternatives[0].confidence # Log all transcripts for debugging log_debug(f"📝 Raw transcript: '{transcript}' (is_final: {is_final}, confidence: {confidence})") # ✅ ÖNEMLİ DEĞİŞİKLİK: Final result'ları boş olsa bile kabul et if is_final: # Final transcript - boş olabilir ama yine de işle transcription_result = TranscriptionResult( text=transcript or "", # Boş string olabilir is_final=is_final, confidence=confidence, timestamp=datetime.now().timestamp() ) try: self.responses_queue.put(transcription_result) self.final_result_received = True if transcript and transcript.strip(): log_info(f"🎯 FINAL TRANSCRIPT: '{transcript}' (confidence: {confidence:.2f})") else: log_warning(f"⚠️ Empty final transcript received - but queued for state change") except queue.Full: log_warning("⚠️ Response queue full") elif transcript and transcript.strip(): # Interim result - sadece dolu olanları kabul et transcription_result = TranscriptionResult( text=transcript, is_final=is_final, confidence=confidence, timestamp=datetime.now().timestamp() ) try: self.responses_queue.put(transcription_result) log_info(f"📝 Interim transcript: '{transcript}'") except queue.Full: log_warning("⚠️ Response queue full") except Exception as e: log_error(f"❌ Error processing transcript: {e}") log_error(f"❌ Args: {args}") log_error(f"❌ Kwargs: {kwargs}") import traceback log_error(f"❌ Traceback: {traceback.format_exc()}") def _on_speech_started(self, *args, **kwargs): """Handle speech started event""" log_info("🎤 Speech detected - User started speaking") def _on_utterance_end(self, *args, **kwargs): """Handle utterance end event""" log_info("🔚 Speech ended - User stopped speaking") # Deepgram will send final transcript after this def _on_metadata(self, *args, **kwargs): """Handle metadata event""" metadata = args[1] if len(args) > 1 else kwargs.get("metadata", {}) request_id = metadata.get("request_id", "") log_debug(f"📋 Deepgram metadata - Request ID: {request_id}") def _on_error(self, *args, **kwargs): """Handle error event""" error = args[1] if len(args) > 1 else kwargs.get("error", {}) log_error(f"❌ Deepgram error: {error}") def _on_close(self, *args, **kwargs): """Handle connection close event""" log_info("🔌 Deepgram connection closed") self.is_streaming = False async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: """Stream audio chunk and get transcription results""" if not self.is_streaming or not self.live_connection: raise RuntimeError("Streaming not started. Call start_streaming() first.") try: # İlk birkaç chunk için audio formatını analiz et if self.total_chunks < 3: if len(audio_chunk) >= 4: import struct try: first_sample = struct.unpack(' Optional[TranscriptionResult]: """Stop streaming and clean up""" if not self.is_streaming: log_debug("Already stopped, nothing to do") return None try: log_info(f"🛑 Stopping Deepgram STT session #{self.session_id}") self.is_streaming = False # Finish the stream to get final results if self.live_connection: try: # Finish the stream - this triggers final transcript self.live_connection.finish() # Wait a bit for final result await asyncio.sleep(0.5) except Exception as e: log_warning(f"⚠️ Error finishing stream: {e}") # Get final result from queue final_result = None while not self.responses_queue.empty(): try: result = self.responses_queue.get_nowait() if result.is_final: final_result = result except queue.Empty: break # Clean up self.live_connection = None self.deepgram_client = None self.final_result_received = False log_info(f"✅ Deepgram STT session #{self.session_id} stopped") return final_result except Exception as e: log_error(f"❌ Error during stop_streaming", error=str(e)) self.is_streaming = False self.live_connection = None self.deepgram_client = None return None def _reset_session_data(self): """Reset session-specific data""" # Clear queue while not self.responses_queue.empty(): try: self.responses_queue.get_nowait() except: pass # Reset counters self.total_audio_bytes = 0 self.total_chunks = 0 self.session_id += 1 self.final_result_received = False log_debug(f"🔄 Session data reset. New session ID: {self.session_id}") def supports_realtime(self) -> bool: """Deepgram supports real-time streaming""" return True def get_supported_languages(self) -> List[str]: """Get list of supported language codes""" return [ "tr-TR", # Turkish "en-US", # English (US) "en-GB", # English (UK) "de-DE", # German "fr-FR", # French "es-ES", # Spanish "it-IT", # Italian "pt-BR", # Portuguese (Brazil) "ru-RU", # Russian "ja-JP", # Japanese "ko-KR", # Korean "zh-CN", # Chinese (Simplified) "ar-SA", # Arabic "nl-NL", # Dutch "sv-SE", # Swedish "pl-PL", # Polish "hi-IN", # Hindi "cs-CZ", # Czech "da-DK", # Danish "fi-FI", # Finnish "el-GR", # Greek "he-IL", # Hebrew "hu-HU", # Hungarian "id-ID", # Indonesian "ms-MY", # Malay "no-NO", # Norwegian "ro-RO", # Romanian "sk-SK", # Slovak "th-TH", # Thai "uk-UA", # Ukrainian "vi-VN", # Vietnamese ] def get_provider_name(self) -> str: """Get provider name""" return "deepgram"