Spaces:
Running
Running
""" | |
Deepgram Speech-to-Text Implementation using Deepgram SDK | |
""" | |
import asyncio | |
from typing import AsyncIterator, Optional, List, Any | |
from datetime import datetime | |
import queue | |
import threading | |
import traceback | |
from deepgram import ( | |
DeepgramClient, | |
DeepgramClientOptions, | |
LiveTranscriptionEvents, | |
LiveOptions, | |
Microphone, | |
) | |
from utils.logger import log_info, log_error, log_debug, log_warning | |
from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
class DeepgramSTT(STTInterface): | |
"""Deepgram STT implementation using official SDK""" | |
def __init__(self, api_key: str): | |
if not api_key: | |
raise ValueError("Deepgram API key is required") | |
# Debug için API key'in ilk 10 karakterini logla | |
log_info(f"🔑 Deepgram API key resolved: {api_key[:10]}... (length: {len(api_key)})") | |
self.api_key = api_key | |
self.deepgram_client = None | |
self.live_connection = None | |
self.is_streaming = False | |
self.responses_queue = queue.Queue(maxsize=100) | |
# Session tracking | |
self.session_id = 0 | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
# Final result tracking | |
self.final_result_received = False | |
self.stop_event = threading.Event() | |
# ✅ Initial buffer for better VAD context | |
self.initial_buffer = [] | |
log_info(f"✅ Deepgram STT initialized (SDK version)") | |
def _map_language_code(self, language: str) -> str: | |
"""Map language codes to Deepgram format""" | |
language_map = { | |
"tr-TR": "tr", | |
"en-US": "en-US", | |
"en-GB": "en-GB", | |
"de-DE": "de", | |
"fr-FR": "fr", | |
"es-ES": "es", | |
"it-IT": "it", | |
"pt-BR": "pt-BR", | |
"ru-RU": "ru", | |
"ja-JP": "ja", | |
"ko-KR": "ko", | |
"zh-CN": "zh-CN", | |
"ar-SA": "ar", | |
} | |
return language_map.get(language, language) | |
async def start_streaming(self, config: STTConfig) -> None: | |
"""Initialize streaming session using SDK""" | |
try: | |
# Stop any existing stream | |
if self.is_streaming: | |
log_warning("⚠️ Previous stream still active, stopping it first") | |
await self.stop_streaming() | |
await asyncio.sleep(0.5) | |
# Reset session data | |
self._reset_session_data() | |
log_info(f"🎤 Starting Deepgram STT (SDK) - Session #{self.session_id}") | |
# Create Deepgram client with more verbose logging for debugging | |
config_options = DeepgramClientOptions( | |
verbose=False, | |
options={"keepalive": "true"} | |
) | |
self.deepgram_client = DeepgramClient(self.api_key, config=config_options) | |
# Try minimal configuration first | |
options = LiveOptions( | |
language="tr", | |
model="nova-2-general", | |
encoding="linear16", | |
sample_rate=16000, | |
interim_results=True, # Bu zorunlu, yoksa final result da gelmiyor... | |
channels=1, | |
#utterance_end_ms=2000, # 2 second silence = end | |
punctuate=True, | |
smart_format=True, | |
#numerals=True, | |
#profanity_filter=False, | |
#redact=False, | |
no_delay=True, | |
vad_events=True, # Enable VAD events | |
#endpointing=1000 | |
utterance_end_ms=2000 | |
) | |
log_info(f"🔧 Deepgram options: language=tr, model=nova-2, encoding=linear16, interim_results=True") | |
# Create live connection | |
self.live_connection = self.deepgram_client.listen.live.v("1") | |
# Setup event handlers | |
self._setup_event_handlers() | |
try: | |
# Log before connection attempt | |
log_info("🔌 Attempting to connect to Deepgram...") | |
result = self.live_connection.start(options) | |
log_info(f"🔌 Connection start result: {result}") | |
if result: | |
self.is_streaming = True | |
log_info(f"✅ Deepgram SDK connected - Ready for speech") | |
else: | |
# Try to get more error details | |
if hasattr(self.live_connection, 'get_error') or hasattr(self.live_connection, 'error'): | |
error_detail = getattr(self.live_connection, 'error', 'No error details') | |
log_error(f"❌ Connection failed with details: {error_detail}") | |
raise RuntimeError("Failed to start Deepgram connection") | |
except Exception as e: | |
log_error(f"❌ Connection error: {str(e)}") | |
# Log more details about the exception | |
if hasattr(e, 'response'): | |
log_error(f"❌ Response: {e.response}") | |
if hasattr(e, 'status_code'): | |
log_error(f"❌ Status code: {e.status_code}") | |
raise | |
except Exception as e: | |
log_error(f"❌ Failed to start Deepgram STT", error=str(e)) | |
if hasattr(e, '__dict__'): | |
log_error(f"❌ Error details: {e.__dict__}") | |
self.is_streaming = False | |
self.live_connection = None | |
self.deepgram_client = None | |
raise | |
def _setup_event_handlers(self): | |
"""Setup event handlers for Deepgram events""" | |
# Transcript received - use the existing class method | |
self.live_connection.on(LiveTranscriptionEvents.Transcript, self._on_transcript) | |
# Speech started | |
self.live_connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started) | |
# Utterance end | |
self.live_connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end) | |
# Metadata | |
self.live_connection.on(LiveTranscriptionEvents.Metadata, self._on_metadata) | |
# Error | |
self.live_connection.on(LiveTranscriptionEvents.Error, self._on_error) | |
# Connection closed | |
self.live_connection.on(LiveTranscriptionEvents.Close, self._on_close) | |
def _on_transcript(self, *args, **kwargs): | |
"""Handle transcript event - SDK calls this method directly""" | |
try: | |
# SDK passes the result as second argument | |
result = args[1] if len(args) > 1 else kwargs.get("result") | |
if not result: | |
log_warning("⚠️ No result in transcript event") | |
return | |
# ✅ Debug için result objesini detaylı inceleyin | |
if self.total_chunks < 5: # İlk birkaç event için | |
log_debug(f"🔍 Result object type: {type(result)}") | |
log_debug(f"🔍 Result dir: {[attr for attr in dir(result) if not attr.startswith('_')]}") | |
# Result'un tüm property'lerini logla | |
try: | |
if hasattr(result, '__dict__'): | |
log_debug(f"🔍 Result dict: {result.__dict__}") | |
except: | |
pass | |
# Access properties directly from the result object | |
is_final = result.is_final if hasattr(result, 'is_final') else False | |
# Get transcript from channel alternatives | |
if hasattr(result, 'channel') and result.channel: | |
alternatives = result.channel.alternatives | |
if alternatives and len(alternatives) > 0: | |
transcript = alternatives[0].transcript | |
confidence = alternatives[0].confidence | |
# Log all transcripts for debugging | |
log_debug(f"📝 Raw transcript: '{transcript}' (is_final: {is_final}, confidence: {confidence})") | |
# ✅ ÖNEMLİ DEĞİŞİKLİK: Final result'ları boş olsa bile kabul et | |
if is_final: | |
# Final transcript - boş olabilir ama yine de işle | |
transcription_result = TranscriptionResult( | |
text=transcript or "", # Boş string olabilir | |
is_final=is_final, | |
confidence=confidence, | |
timestamp=datetime.now().timestamp() | |
) | |
try: | |
self.responses_queue.put(transcription_result) | |
self.final_result_received = True | |
if transcript and transcript.strip(): | |
log_info(f"🎯 FINAL TRANSCRIPT: '{transcript}' (confidence: {confidence:.2f})") | |
else: | |
log_warning(f"⚠️ Empty final transcript received - but queued for state change") | |
except queue.Full: | |
log_warning("⚠️ Response queue full") | |
elif transcript and transcript.strip(): | |
# Interim result - sadece dolu olanları kabul et | |
transcription_result = TranscriptionResult( | |
text=transcript, | |
is_final=is_final, | |
confidence=confidence, | |
timestamp=datetime.now().timestamp() | |
) | |
try: | |
self.responses_queue.put(transcription_result) | |
log_info(f"📝 Interim transcript: '{transcript}'") | |
except queue.Full: | |
log_warning("⚠️ Response queue full") | |
except Exception as e: | |
log_error(f"❌ Error processing transcript: {e}") | |
log_error(f"❌ Args: {args}") | |
log_error(f"❌ Kwargs: {kwargs}") | |
import traceback | |
log_error(f"❌ Traceback: {traceback.format_exc()}") | |
def _on_speech_started(self, *args, **kwargs): | |
"""Handle speech started event""" | |
log_info("🎤 Speech detected - User started speaking") | |
def _on_utterance_end(self, *args, **kwargs): | |
"""Handle utterance end event""" | |
log_info("🔚 Speech ended - User stopped speaking") | |
# Deepgram will send final transcript after this | |
def _on_metadata(self, *args, **kwargs): | |
"""Handle metadata event""" | |
metadata = args[1] if len(args) > 1 else kwargs.get("metadata", {}) | |
request_id = metadata.get("request_id", "") | |
log_debug(f"📋 Deepgram metadata - Request ID: {request_id}") | |
def _on_error(self, *args, **kwargs): | |
"""Handle error event""" | |
error = args[1] if len(args) > 1 else kwargs.get("error", {}) | |
log_error(f"❌ Deepgram error: {error}") | |
def _on_close(self, *args, **kwargs): | |
"""Handle connection close event""" | |
log_info("🔌 Deepgram connection closed") | |
self.is_streaming = False | |
async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: | |
"""Stream audio chunk and get transcription results""" | |
if not self.is_streaming or not self.live_connection: | |
raise RuntimeError("Streaming not started. Call start_streaming() first.") | |
try: | |
# ✅ İlk birkaç chunk'ı biriktirip gönder (daha iyi context) | |
if not hasattr(self, 'initial_buffer'): | |
self.initial_buffer = [] | |
# İlk birkaç chunk için audio formatını analiz et | |
if self.total_chunks < 3: | |
if len(audio_chunk) >= 4: | |
import struct | |
try: | |
first_sample = struct.unpack('<h', audio_chunk[:2])[0] | |
log_info(f"🔊 Audio format check - Chunk #{self.total_chunks}: First sample={first_sample}, Size={len(audio_chunk)} bytes") | |
except: | |
log_warning("⚠️ Could not parse as Linear16") | |
self.initial_buffer.append(audio_chunk) | |
# 3. chunk'ta hepsini birden gönder | |
if self.total_chunks == 2: | |
combined_audio = b''.join(self.initial_buffer) | |
self.live_connection.send(combined_audio) | |
self.initial_buffer = [] | |
log_info(f"🎯 Sent initial audio buffer: {len(combined_audio)} bytes") | |
else: | |
# Send audio to Deepgram (final result gelse bile gönder, Deepgram kendi handle edecek) | |
self.live_connection.send(audio_chunk) | |
self.total_chunks += 1 | |
self.total_audio_bytes += len(audio_chunk) | |
# Log progress | |
if self.total_chunks % 50 == 0: | |
log_debug(f"📊 Listening... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB") | |
# Check queue for results | |
while True: | |
try: | |
result = self.responses_queue.get_nowait() | |
# Log for debugging | |
log_debug(f"🎯 Yielding result: is_final={result.is_final}, text='{result.text}'") | |
yield result | |
except queue.Empty: | |
break | |
except Exception as e: | |
log_error(f"❌ Error streaming audio", error=str(e)) | |
self.is_streaming = False | |
raise | |
async def stop_streaming(self) -> Optional[TranscriptionResult]: | |
"""Stop streaming and clean up""" | |
if not self.is_streaming: | |
log_debug("Already stopped, nothing to do") | |
return None | |
try: | |
log_info(f"🛑 Stopping Deepgram STT session #{self.session_id}") | |
self.is_streaming = False | |
# Finish the stream to get final results | |
if self.live_connection: | |
try: | |
# Finish the stream - this triggers final transcript | |
self.live_connection.finish() | |
# Wait a bit for final result | |
await asyncio.sleep(0.5) | |
except Exception as e: | |
log_warning(f"⚠️ Error finishing stream: {e}") | |
# Get final result from queue | |
final_result = None | |
while not self.responses_queue.empty(): | |
try: | |
result = self.responses_queue.get_nowait() | |
if result.is_final: | |
final_result = result | |
except queue.Empty: | |
break | |
# Clean up | |
self.live_connection = None | |
self.deepgram_client = None | |
self.final_result_received = False | |
log_info(f"✅ Deepgram STT session #{self.session_id} stopped") | |
return final_result | |
except Exception as e: | |
log_error(f"❌ Error during stop_streaming", error=str(e)) | |
self.is_streaming = False | |
self.live_connection = None | |
self.deepgram_client = None | |
return None | |
def _reset_session_data(self): | |
"""Reset session-specific data""" | |
# Clear queue | |
while not self.responses_queue.empty(): | |
try: | |
self.responses_queue.get_nowait() | |
except: | |
pass | |
# Reset counters | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
self.session_id += 1 | |
self.final_result_received = False | |
# ✅ Clear initial buffer | |
self.initial_buffer = [] | |
log_debug(f"🔄 Session data reset. New session ID: {self.session_id}") | |
def supports_realtime(self) -> bool: | |
"""Deepgram supports real-time streaming""" | |
return True | |
def get_supported_languages(self) -> List[str]: | |
"""Get list of supported language codes""" | |
# Deepgram'ın desteklediği dil kodları | |
# Kaynak: https://developers.deepgram.com/docs/models-languages | |
return [ | |
"en", "es", "fr", "de", "it", "pt", "ru", "tr", "pl", "nl", | |
"sv", "no", "fi", "da", "ja", "ko", "zh", "hi", "id", "th", | |
"uk", "cs", "el", "he", "ar", "fa", "ta", "tl" | |
] | |
def get_provider_name(self) -> str: | |
"""Get provider name""" | |
return "deepgram" |