flare / stt /stt_deepgram.py
ciyidogan's picture
Update stt/stt_deepgram.py
a4bca86 verified
raw
history blame
17.3 kB
"""
Deepgram Speech-to-Text Implementation using Deepgram SDK
"""
import asyncio
from typing import AsyncIterator, Optional, List, Any
from datetime import datetime
import queue
import threading
import traceback
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
Microphone,
)
from utils.logger import log_info, log_error, log_debug, log_warning
from .stt_interface import STTInterface, STTConfig, TranscriptionResult
class DeepgramSTT(STTInterface):
"""Deepgram STT implementation using official SDK"""
def __init__(self, api_key: str):
if not api_key:
raise ValueError("Deepgram API key is required")
# Debug için API key'in ilk 10 karakterini logla
log_info(f"🔑 Deepgram API key resolved: {api_key[:10]}... (length: {len(api_key)})")
self.api_key = api_key
self.deepgram_client = None
self.live_connection = None
self.is_streaming = False
self.responses_queue = queue.Queue(maxsize=100)
# Session tracking
self.session_id = 0
self.total_audio_bytes = 0
self.total_chunks = 0
# Final result tracking
self.final_result_received = False
self.stop_event = threading.Event()
# ✅ Initial buffer for better VAD context
self.initial_buffer = []
log_info(f"✅ Deepgram STT initialized (SDK version)")
def _map_language_code(self, language: str) -> str:
"""Map language codes to Deepgram format"""
language_map = {
"tr-TR": "tr",
"en-US": "en-US",
"en-GB": "en-GB",
"de-DE": "de",
"fr-FR": "fr",
"es-ES": "es",
"it-IT": "it",
"pt-BR": "pt-BR",
"ru-RU": "ru",
"ja-JP": "ja",
"ko-KR": "ko",
"zh-CN": "zh-CN",
"ar-SA": "ar",
}
return language_map.get(language, language)
async def start_streaming(self, config: STTConfig) -> None:
"""Initialize streaming session using SDK"""
try:
# Stop any existing stream
if self.is_streaming:
log_warning("⚠️ Previous stream still active, stopping it first")
await self.stop_streaming()
await asyncio.sleep(0.5)
# Reset session data
self._reset_session_data()
log_info(f"🎤 Starting Deepgram STT (SDK) - Session #{self.session_id}")
# Create Deepgram client with more verbose logging for debugging
config_options = DeepgramClientOptions(
verbose=False,
options={"keepalive": "true"}
)
self.deepgram_client = DeepgramClient(self.api_key, config=config_options)
# Try minimal configuration first
options = LiveOptions(
language="tr",
model="nova-2-general",
encoding="linear16",
sample_rate=16000,
interim_results=True, # Bu zorunlu, yoksa final result da gelmiyor...
channels=1,
#utterance_end_ms=2000, # 2 second silence = end
punctuate=True,
smart_format=True,
#numerals=True,
#profanity_filter=False,
#redact=False,
no_delay=True,
vad_events=True, # Enable VAD events
#endpointing=1000
utterance_end_ms=2000
)
log_info(f"🔧 Deepgram options: language=tr, model=nova-2, encoding=linear16, interim_results=True")
# Create live connection
self.live_connection = self.deepgram_client.listen.live.v("1")
# Setup event handlers
self._setup_event_handlers()
try:
# Log before connection attempt
log_info("🔌 Attempting to connect to Deepgram...")
result = self.live_connection.start(options)
log_info(f"🔌 Connection start result: {result}")
if result:
self.is_streaming = True
log_info(f"✅ Deepgram SDK connected - Ready for speech")
else:
# Try to get more error details
if hasattr(self.live_connection, 'get_error') or hasattr(self.live_connection, 'error'):
error_detail = getattr(self.live_connection, 'error', 'No error details')
log_error(f"❌ Connection failed with details: {error_detail}")
raise RuntimeError("Failed to start Deepgram connection")
except Exception as e:
log_error(f"❌ Connection error: {str(e)}")
# Log more details about the exception
if hasattr(e, 'response'):
log_error(f"❌ Response: {e.response}")
if hasattr(e, 'status_code'):
log_error(f"❌ Status code: {e.status_code}")
raise
except Exception as e:
log_error(f"❌ Failed to start Deepgram STT", error=str(e))
if hasattr(e, '__dict__'):
log_error(f"❌ Error details: {e.__dict__}")
self.is_streaming = False
self.live_connection = None
self.deepgram_client = None
raise
def _setup_event_handlers(self):
"""Setup event handlers for Deepgram events"""
# Transcript received - use the existing class method
self.live_connection.on(LiveTranscriptionEvents.Transcript, self._on_transcript)
# Speech started
self.live_connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)
# Utterance end
self.live_connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end)
# Metadata
self.live_connection.on(LiveTranscriptionEvents.Metadata, self._on_metadata)
# Error
self.live_connection.on(LiveTranscriptionEvents.Error, self._on_error)
# Connection closed
self.live_connection.on(LiveTranscriptionEvents.Close, self._on_close)
def _on_transcript(self, *args, **kwargs):
"""Handle transcript event - SDK calls this method directly"""
try:
# SDK passes the result as second argument
result = args[1] if len(args) > 1 else kwargs.get("result")
if not result:
log_warning("⚠️ No result in transcript event")
return
# ✅ Debug için result objesini detaylı inceleyin
if self.total_chunks < 5: # İlk birkaç event için
log_debug(f"🔍 Result object type: {type(result)}")
log_debug(f"🔍 Result dir: {[attr for attr in dir(result) if not attr.startswith('_')]}")
# Result'un tüm property'lerini logla
try:
if hasattr(result, '__dict__'):
log_debug(f"🔍 Result dict: {result.__dict__}")
except:
pass
# Access properties directly from the result object
is_final = result.is_final if hasattr(result, 'is_final') else False
# Get transcript from channel alternatives
if hasattr(result, 'channel') and result.channel:
alternatives = result.channel.alternatives
if alternatives and len(alternatives) > 0:
transcript = alternatives[0].transcript
confidence = alternatives[0].confidence
# Log all transcripts for debugging
log_debug(f"📝 Raw transcript: '{transcript}' (is_final: {is_final}, confidence: {confidence})")
# ✅ ÖNEMLİ DEĞİŞİKLİK: Final result'ları boş olsa bile kabul et
if is_final:
# Final transcript - boş olabilir ama yine de işle
transcription_result = TranscriptionResult(
text=transcript or "", # Boş string olabilir
is_final=is_final,
confidence=confidence,
timestamp=datetime.now().timestamp()
)
try:
self.responses_queue.put(transcription_result)
self.final_result_received = True
if transcript and transcript.strip():
log_info(f"🎯 FINAL TRANSCRIPT: '{transcript}' (confidence: {confidence:.2f})")
else:
log_warning(f"⚠️ Empty final transcript received - but queued for state change")
except queue.Full:
log_warning("⚠️ Response queue full")
elif transcript and transcript.strip():
# Interim result - sadece dolu olanları kabul et
transcription_result = TranscriptionResult(
text=transcript,
is_final=is_final,
confidence=confidence,
timestamp=datetime.now().timestamp()
)
try:
self.responses_queue.put(transcription_result)
log_info(f"📝 Interim transcript: '{transcript}'")
except queue.Full:
log_warning("⚠️ Response queue full")
except Exception as e:
log_error(f"❌ Error processing transcript: {e}")
log_error(f"❌ Args: {args}")
log_error(f"❌ Kwargs: {kwargs}")
import traceback
log_error(f"❌ Traceback: {traceback.format_exc()}")
def _on_speech_started(self, *args, **kwargs):
"""Handle speech started event"""
log_info("🎤 Speech detected - User started speaking")
def _on_utterance_end(self, *args, **kwargs):
"""Handle utterance end event"""
log_info("🔚 Speech ended - User stopped speaking")
# Deepgram will send final transcript after this
def _on_metadata(self, *args, **kwargs):
"""Handle metadata event"""
metadata = args[1] if len(args) > 1 else kwargs.get("metadata", {})
request_id = metadata.get("request_id", "")
log_debug(f"📋 Deepgram metadata - Request ID: {request_id}")
def _on_error(self, *args, **kwargs):
"""Handle error event"""
error = args[1] if len(args) > 1 else kwargs.get("error", {})
log_error(f"❌ Deepgram error: {error}")
def _on_close(self, *args, **kwargs):
"""Handle connection close event"""
log_info("🔌 Deepgram connection closed")
self.is_streaming = False
async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
"""Stream audio chunk and get transcription results"""
if not self.is_streaming or not self.live_connection:
raise RuntimeError("Streaming not started. Call start_streaming() first.")
try:
# ✅ İlk birkaç chunk'ı biriktirip gönder (daha iyi context)
if not hasattr(self, 'initial_buffer'):
self.initial_buffer = []
# İlk birkaç chunk için audio formatını analiz et
if self.total_chunks < 3:
if len(audio_chunk) >= 4:
import struct
try:
first_sample = struct.unpack('<h', audio_chunk[:2])[0]
log_info(f"🔊 Audio format check - Chunk #{self.total_chunks}: First sample={first_sample}, Size={len(audio_chunk)} bytes")
except:
log_warning("⚠️ Could not parse as Linear16")
self.initial_buffer.append(audio_chunk)
# 3. chunk'ta hepsini birden gönder
if self.total_chunks == 2:
combined_audio = b''.join(self.initial_buffer)
self.live_connection.send(combined_audio)
self.initial_buffer = []
log_info(f"🎯 Sent initial audio buffer: {len(combined_audio)} bytes")
else:
# Send audio to Deepgram (final result gelse bile gönder, Deepgram kendi handle edecek)
self.live_connection.send(audio_chunk)
self.total_chunks += 1
self.total_audio_bytes += len(audio_chunk)
# Log progress
if self.total_chunks % 50 == 0:
log_debug(f"📊 Listening... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB")
# Check queue for results
while True:
try:
result = self.responses_queue.get_nowait()
# Log for debugging
log_debug(f"🎯 Yielding result: is_final={result.is_final}, text='{result.text}'")
yield result
except queue.Empty:
break
except Exception as e:
log_error(f"❌ Error streaming audio", error=str(e))
self.is_streaming = False
raise
async def stop_streaming(self) -> Optional[TranscriptionResult]:
"""Stop streaming and clean up"""
if not self.is_streaming:
log_debug("Already stopped, nothing to do")
return None
try:
log_info(f"🛑 Stopping Deepgram STT session #{self.session_id}")
self.is_streaming = False
# Finish the stream to get final results
if self.live_connection:
try:
# Finish the stream - this triggers final transcript
self.live_connection.finish()
# Wait a bit for final result
await asyncio.sleep(0.5)
except Exception as e:
log_warning(f"⚠️ Error finishing stream: {e}")
# Get final result from queue
final_result = None
while not self.responses_queue.empty():
try:
result = self.responses_queue.get_nowait()
if result.is_final:
final_result = result
except queue.Empty:
break
# Clean up
self.live_connection = None
self.deepgram_client = None
self.final_result_received = False
log_info(f"✅ Deepgram STT session #{self.session_id} stopped")
return final_result
except Exception as e:
log_error(f"❌ Error during stop_streaming", error=str(e))
self.is_streaming = False
self.live_connection = None
self.deepgram_client = None
return None
def _reset_session_data(self):
"""Reset session-specific data"""
# Clear queue
while not self.responses_queue.empty():
try:
self.responses_queue.get_nowait()
except:
pass
# Reset counters
self.total_audio_bytes = 0
self.total_chunks = 0
self.session_id += 1
self.final_result_received = False
# ✅ Clear initial buffer
self.initial_buffer = []
log_debug(f"🔄 Session data reset. New session ID: {self.session_id}")
def supports_realtime(self) -> bool:
"""Deepgram supports real-time streaming"""
return True
def get_supported_languages(self) -> List[str]:
"""Get list of supported language codes"""
# Deepgram'ın desteklediği dil kodları
# Kaynak: https://developers.deepgram.com/docs/models-languages
return [
"en", "es", "fr", "de", "it", "pt", "ru", "tr", "pl", "nl",
"sv", "no", "fi", "da", "ja", "ko", "zh", "hi", "id", "th",
"uk", "cs", "el", "he", "ar", "fa", "ta", "tl"
]
def get_provider_name(self) -> str:
"""Get provider name"""
return "deepgram"