Spaces:
Paused
Paused
| """ | |
| Deepgram Speech-to-Text Implementation using Deepgram SDK | |
| """ | |
| import asyncio | |
| from typing import AsyncIterator, Optional, List, Any | |
| from datetime import datetime | |
| import queue | |
| import threading | |
| import traceback | |
| from deepgram import ( | |
| DeepgramClient, | |
| DeepgramClientOptions, | |
| LiveTranscriptionEvents, | |
| LiveOptions, | |
| Microphone, | |
| ) | |
| from utils.logger import log_info, log_error, log_debug, log_warning | |
| from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
| class DeepgramSTT(STTInterface): | |
| """Deepgram STT implementation using official SDK""" | |
| def __init__(self, api_key: str): | |
| if not api_key: | |
| raise ValueError("Deepgram API key is required") | |
| # Debug iΓ§in API key'in ilk 10 karakterini logla | |
| log_info(f"π Deepgram API key resolved: {api_key[:10]}... (length: {len(api_key)})") | |
| self.api_key = api_key | |
| self.deepgram_client = None | |
| self.live_connection = None | |
| self.is_streaming = False | |
| self.responses_queue = queue.Queue(maxsize=100) | |
| # Session tracking | |
| self.session_id = 0 | |
| self.total_audio_bytes = 0 | |
| self.total_chunks = 0 | |
| # Final result tracking | |
| self.final_result_received = False | |
| self.stop_event = threading.Event() | |
| # β Initial buffer for better VAD context | |
| self.initial_buffer = [] | |
| log_info(f"β Deepgram STT initialized (SDK version)") | |
| def _map_language_code(self, language: str) -> str: | |
| """Map language codes to Deepgram format""" | |
| language_map = { | |
| "tr-TR": "tr", | |
| "en-US": "en-US", | |
| "en-GB": "en-GB", | |
| "de-DE": "de", | |
| "fr-FR": "fr", | |
| "es-ES": "es", | |
| "it-IT": "it", | |
| "pt-BR": "pt-BR", | |
| "ru-RU": "ru", | |
| "ja-JP": "ja", | |
| "ko-KR": "ko", | |
| "zh-CN": "zh-CN", | |
| "ar-SA": "ar", | |
| } | |
| return language_map.get(language, language) | |
| async def start_streaming(self, config: STTConfig) -> None: | |
| """Initialize streaming session using SDK""" | |
| try: | |
| # Stop any existing stream | |
| if self.is_streaming: | |
| log_warning("β οΈ Previous stream still active, stopping it first") | |
| await self.stop_streaming() | |
| await asyncio.sleep(0.5) | |
| # Reset session data | |
| self._reset_session_data() | |
| log_info(f"π€ Starting Deepgram STT (SDK) - Session #{self.session_id}") | |
| # Create Deepgram client with more verbose logging for debugging | |
| config_options = DeepgramClientOptions( | |
| verbose=False, | |
| options={"keepalive": "true"} | |
| ) | |
| self.deepgram_client = DeepgramClient(self.api_key, config=config_options) | |
| # Try minimal configuration first | |
| options = LiveOptions( | |
| language="tr", | |
| model="nova-2", | |
| encoding="linear16", | |
| sample_rate=16000, | |
| interim_results=True, # Bu zorunlu, yoksa final result da gelmiyor... | |
| channels=1, | |
| utterance_end_ms=2000, # 2 second silence = end | |
| punctuate=True, | |
| smart_format=True, | |
| #numerals=True, | |
| #profanity_filter=False, | |
| #redact=False, | |
| no_delay=True, | |
| vad_events=True # Enable VAD events | |
| ) | |
| log_info(f"π§ Deepgram options: language=tr, model=nova-2, encoding=linear16, interim_results=True") | |
| # Create live connection | |
| self.live_connection = self.deepgram_client.listen.live.v("1") | |
| # Setup event handlers | |
| self._setup_event_handlers() | |
| try: | |
| # Log before connection attempt | |
| log_info("π Attempting to connect to Deepgram...") | |
| result = self.live_connection.start(options) | |
| log_info(f"π Connection start result: {result}") | |
| if result: | |
| self.is_streaming = True | |
| log_info(f"β Deepgram SDK connected - Ready for speech") | |
| else: | |
| # Try to get more error details | |
| if hasattr(self.live_connection, 'get_error') or hasattr(self.live_connection, 'error'): | |
| error_detail = getattr(self.live_connection, 'error', 'No error details') | |
| log_error(f"β Connection failed with details: {error_detail}") | |
| raise RuntimeError("Failed to start Deepgram connection") | |
| except Exception as e: | |
| log_error(f"β Connection error: {str(e)}") | |
| # Log more details about the exception | |
| if hasattr(e, 'response'): | |
| log_error(f"β Response: {e.response}") | |
| if hasattr(e, 'status_code'): | |
| log_error(f"β Status code: {e.status_code}") | |
| raise | |
| except Exception as e: | |
| log_error(f"β Failed to start Deepgram STT", error=str(e)) | |
| if hasattr(e, '__dict__'): | |
| log_error(f"β Error details: {e.__dict__}") | |
| self.is_streaming = False | |
| self.live_connection = None | |
| self.deepgram_client = None | |
| raise | |
| def _setup_event_handlers(self): | |
| """Setup event handlers for Deepgram events""" | |
| # Transcript received - use the existing class method | |
| self.live_connection.on(LiveTranscriptionEvents.Transcript, self._on_transcript) | |
| # Speech started | |
| self.live_connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started) | |
| # Utterance end | |
| self.live_connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end) | |
| # Metadata | |
| self.live_connection.on(LiveTranscriptionEvents.Metadata, self._on_metadata) | |
| # Error | |
| self.live_connection.on(LiveTranscriptionEvents.Error, self._on_error) | |
| # Connection closed | |
| self.live_connection.on(LiveTranscriptionEvents.Close, self._on_close) | |
| def _on_transcript(self, *args, **kwargs): | |
| """Handle transcript event - SDK calls this method directly""" | |
| try: | |
| # SDK passes the result as second argument | |
| result = args[1] if len(args) > 1 else kwargs.get("result") | |
| if not result: | |
| log_warning("β οΈ No result in transcript event") | |
| return | |
| # β Debug iΓ§in result objesini detaylΔ± inceleyin | |
| if self.total_chunks < 5: # Δ°lk birkaΓ§ event iΓ§in | |
| log_debug(f"π Result object type: {type(result)}") | |
| log_debug(f"π Result dir: {[attr for attr in dir(result) if not attr.startswith('_')]}") | |
| # Result'un tΓΌm property'lerini logla | |
| try: | |
| if hasattr(result, '__dict__'): | |
| log_debug(f"π Result dict: {result.__dict__}") | |
| except: | |
| pass | |
| # Access properties directly from the result object | |
| is_final = result.is_final if hasattr(result, 'is_final') else False | |
| # Get transcript from channel alternatives | |
| if hasattr(result, 'channel') and result.channel: | |
| alternatives = result.channel.alternatives | |
| if alternatives and len(alternatives) > 0: | |
| transcript = alternatives[0].transcript | |
| confidence = alternatives[0].confidence | |
| # Log all transcripts for debugging | |
| log_debug(f"π Raw transcript: '{transcript}' (is_final: {is_final}, confidence: {confidence})") | |
| # β ΓNEMLΔ° DEΔΔ°ΕΔ°KLΔ°K: Final result'larΔ± boΕ olsa bile kabul et | |
| if is_final: | |
| # Final transcript - boΕ olabilir ama yine de iΕle | |
| transcription_result = TranscriptionResult( | |
| text=transcript or "", # BoΕ string olabilir | |
| is_final=is_final, | |
| confidence=confidence, | |
| timestamp=datetime.now().timestamp() | |
| ) | |
| try: | |
| self.responses_queue.put(transcription_result) | |
| self.final_result_received = True | |
| if transcript and transcript.strip(): | |
| log_info(f"π― FINAL TRANSCRIPT: '{transcript}' (confidence: {confidence:.2f})") | |
| else: | |
| log_warning(f"β οΈ Empty final transcript received - but queued for state change") | |
| except queue.Full: | |
| log_warning("β οΈ Response queue full") | |
| elif transcript and transcript.strip(): | |
| # Interim result - sadece dolu olanlarΔ± kabul et | |
| transcription_result = TranscriptionResult( | |
| text=transcript, | |
| is_final=is_final, | |
| confidence=confidence, | |
| timestamp=datetime.now().timestamp() | |
| ) | |
| try: | |
| self.responses_queue.put(transcription_result) | |
| log_info(f"π Interim transcript: '{transcript}'") | |
| except queue.Full: | |
| log_warning("β οΈ Response queue full") | |
| except Exception as e: | |
| log_error(f"β Error processing transcript: {e}") | |
| log_error(f"β Args: {args}") | |
| log_error(f"β Kwargs: {kwargs}") | |
| import traceback | |
| log_error(f"β Traceback: {traceback.format_exc()}") | |
| def _on_speech_started(self, *args, **kwargs): | |
| """Handle speech started event""" | |
| log_info("π€ Speech detected - User started speaking") | |
| def _on_utterance_end(self, *args, **kwargs): | |
| """Handle utterance end event""" | |
| log_info("π Speech ended - User stopped speaking") | |
| # Deepgram will send final transcript after this | |
| def _on_metadata(self, *args, **kwargs): | |
| """Handle metadata event""" | |
| metadata = args[1] if len(args) > 1 else kwargs.get("metadata", {}) | |
| request_id = metadata.get("request_id", "") | |
| log_debug(f"π Deepgram metadata - Request ID: {request_id}") | |
| def _on_error(self, *args, **kwargs): | |
| """Handle error event""" | |
| error = args[1] if len(args) > 1 else kwargs.get("error", {}) | |
| log_error(f"β Deepgram error: {error}") | |
| def _on_close(self, *args, **kwargs): | |
| """Handle connection close event""" | |
| log_info("π Deepgram connection closed") | |
| self.is_streaming = False | |
| async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: | |
| """Stream audio chunk and get transcription results""" | |
| if not self.is_streaming or not self.live_connection: | |
| raise RuntimeError("Streaming not started. Call start_streaming() first.") | |
| try: | |
| # β Δ°lk birkaΓ§ chunk'Δ± biriktirip gΓΆnder (daha iyi context) | |
| if not hasattr(self, 'initial_buffer'): | |
| self.initial_buffer = [] | |
| # Δ°lk birkaΓ§ chunk iΓ§in audio formatΔ±nΔ± analiz et | |
| if self.total_chunks < 3: | |
| if len(audio_chunk) >= 4: | |
| import struct | |
| try: | |
| first_sample = struct.unpack('<h', audio_chunk[:2])[0] | |
| log_info(f"π Audio format check - Chunk #{self.total_chunks}: First sample={first_sample}, Size={len(audio_chunk)} bytes") | |
| except: | |
| log_warning("β οΈ Could not parse as Linear16") | |
| self.initial_buffer.append(audio_chunk) | |
| # 3. chunk'ta hepsini birden gΓΆnder | |
| if self.total_chunks == 2: | |
| combined_audio = b''.join(self.initial_buffer) | |
| self.live_connection.send(combined_audio) | |
| self.initial_buffer = [] | |
| log_info(f"π― Sent initial audio buffer: {len(combined_audio)} bytes") | |
| else: | |
| # Send audio to Deepgram (final result gelse bile gΓΆnder, Deepgram kendi handle edecek) | |
| self.live_connection.send(audio_chunk) | |
| self.total_chunks += 1 | |
| self.total_audio_bytes += len(audio_chunk) | |
| # Log progress | |
| if self.total_chunks % 50 == 0: | |
| log_debug(f"π Listening... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB") | |
| # Check queue for results | |
| while True: | |
| try: | |
| result = self.responses_queue.get_nowait() | |
| # Log for debugging | |
| log_debug(f"π― Yielding result: is_final={result.is_final}, text='{result.text}'") | |
| yield result | |
| except queue.Empty: | |
| break | |
| except Exception as e: | |
| log_error(f"β Error streaming audio", error=str(e)) | |
| self.is_streaming = False | |
| raise | |
| async def stop_streaming(self) -> Optional[TranscriptionResult]: | |
| """Stop streaming and clean up""" | |
| if not self.is_streaming: | |
| log_debug("Already stopped, nothing to do") | |
| return None | |
| try: | |
| log_info(f"π Stopping Deepgram STT session #{self.session_id}") | |
| self.is_streaming = False | |
| # Finish the stream to get final results | |
| if self.live_connection: | |
| try: | |
| # Finish the stream - this triggers final transcript | |
| self.live_connection.finish() | |
| # Wait a bit for final result | |
| await asyncio.sleep(0.5) | |
| except Exception as e: | |
| log_warning(f"β οΈ Error finishing stream: {e}") | |
| # Get final result from queue | |
| final_result = None | |
| while not self.responses_queue.empty(): | |
| try: | |
| result = self.responses_queue.get_nowait() | |
| if result.is_final: | |
| final_result = result | |
| except queue.Empty: | |
| break | |
| # Clean up | |
| self.live_connection = None | |
| self.deepgram_client = None | |
| self.final_result_received = False | |
| log_info(f"β Deepgram STT session #{self.session_id} stopped") | |
| return final_result | |
| except Exception as e: | |
| log_error(f"β Error during stop_streaming", error=str(e)) | |
| self.is_streaming = False | |
| self.live_connection = None | |
| self.deepgram_client = None | |
| return None | |
| def _reset_session_data(self): | |
| """Reset session-specific data""" | |
| # Clear queue | |
| while not self.responses_queue.empty(): | |
| try: | |
| self.responses_queue.get_nowait() | |
| except: | |
| pass | |
| # Reset counters | |
| self.total_audio_bytes = 0 | |
| self.total_chunks = 0 | |
| self.session_id += 1 | |
| self.final_result_received = False | |
| # β Clear initial buffer | |
| self.initial_buffer = [] | |
| log_debug(f"π Session data reset. New session ID: {self.session_id}") | |
| def supports_realtime(self) -> bool: | |
| """Deepgram supports real-time streaming""" | |
| return True | |
| def get_supported_languages(self) -> List[str]: | |
| """Get list of supported language codes""" | |
| return [ | |
| "tr-TR", # Turkish | |
| "en-US", # English (US) | |
| "en-GB", # English (UK) | |
| "de-DE", # German | |
| "fr-FR", # French | |
| "es-ES", # Spanish | |
| "it-IT", # Italian | |
| "pt-BR", # Portuguese (Brazil) | |
| "ru-RU", # Russian | |
| "ja-JP", # Japanese | |
| "ko-KR", # Korean | |
| "zh-CN", # Chinese (Simplified) | |
| "ar-SA", # Arabic | |
| "nl-NL", # Dutch | |
| "sv-SE", # Swedish | |
| "pl-PL", # Polish | |
| "hi-IN", # Hindi | |
| "cs-CZ", # Czech | |
| "da-DK", # Danish | |
| "fi-FI", # Finnish | |
| "el-GR", # Greek | |
| "he-IL", # Hebrew | |
| "hu-HU", # Hungarian | |
| "id-ID", # Indonesian | |
| "ms-MY", # Malay | |
| "no-NO", # Norwegian | |
| "ro-RO", # Romanian | |
| "sk-SK", # Slovak | |
| "th-TH", # Thai | |
| "uk-UA", # Ukrainian | |
| "vi-VN", # Vietnamese | |
| ] | |
| def get_provider_name(self) -> str: | |
| """Get provider name""" | |
| return "deepgram" |