flare / stt /stt_deepgram.py
ciyidogan's picture
Update stt/stt_deepgram.py
c3db99d verified
raw
history blame
17.6 kB
"""
Deepgram Speech-to-Text Implementation using Deepgram SDK
"""
import asyncio
from typing import AsyncIterator, Optional, List, Any
from datetime import datetime
import queue
import threading
import traceback
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
Microphone,
)
from utils.logger import log_info, log_error, log_debug, log_warning
from .stt_interface import STTInterface, STTConfig, TranscriptionResult
class DeepgramSTT(STTInterface):
"""Deepgram STT implementation using official SDK"""
def __init__(self, api_key: str):
if not api_key:
raise ValueError("Deepgram API key is required")
# Debug iΓ§in API key'in ilk 10 karakterini logla
log_info(f"πŸ”‘ Deepgram API key resolved: {api_key[:10]}... (length: {len(api_key)})")
self.api_key = api_key
self.deepgram_client = None
self.live_connection = None
self.is_streaming = False
self.responses_queue = queue.Queue(maxsize=100)
# Session tracking
self.session_id = 0
self.total_audio_bytes = 0
self.total_chunks = 0
# Final result tracking
self.final_result_received = False
self.stop_event = threading.Event()
log_info(f"βœ… Deepgram STT initialized (SDK version)")
def _map_language_code(self, language: str) -> str:
"""Map language codes to Deepgram format"""
language_map = {
"tr-TR": "tr",
"en-US": "en-US",
"en-GB": "en-GB",
"de-DE": "de",
"fr-FR": "fr",
"es-ES": "es",
"it-IT": "it",
"pt-BR": "pt-BR",
"ru-RU": "ru",
"ja-JP": "ja",
"ko-KR": "ko",
"zh-CN": "zh-CN",
"ar-SA": "ar",
}
return language_map.get(language, language)
async def start_streaming(self, config: STTConfig) -> None:
"""Initialize streaming session using SDK"""
try:
# Stop any existing stream
if self.is_streaming:
log_warning("⚠️ Previous stream still active, stopping it first")
await self.stop_streaming()
await asyncio.sleep(0.5)
# Reset session data
self._reset_session_data()
log_info(f"🎀 Starting Deepgram STT (SDK) - Session #{self.session_id}")
# Create Deepgram client with more verbose logging for debugging
config_options = DeepgramClientOptions(
verbose=False,
options={"keepalive": "true"}
)
self.deepgram_client = DeepgramClient(self.api_key, config=config_options)
# Try minimal configuration first
options = LiveOptions(
language="tr",
model="nova-2",
encoding="linear16",
sample_rate=16000,
interim_results=True, # Bu zorunlu, yoksa final result da gelmiyor...
channels=1,
utterance_end_ms=1000, # 1 second silence = end
punctuate=True,
smart_format=True,
#numerals=True,
#profanity_filter=False,
#redact=False,
no_delay=True,
vad_events=True # Enable VAD events
)
log_info(f"πŸ”§ Deepgram options: language=tr, model=nova-2, encoding=linear16, interim_results=True")
# Create live connection
self.live_connection = self.deepgram_client.listen.live.v("1")
# Setup event handlers
self._setup_event_handlers()
try:
# Log before connection attempt
log_info("πŸ”Œ Attempting to connect to Deepgram...")
result = self.live_connection.start(options)
log_info(f"πŸ”Œ Connection start result: {result}")
if result:
self.is_streaming = True
log_info(f"βœ… Deepgram SDK connected - Ready for speech")
else:
# Try to get more error details
if hasattr(self.live_connection, 'get_error') or hasattr(self.live_connection, 'error'):
error_detail = getattr(self.live_connection, 'error', 'No error details')
log_error(f"❌ Connection failed with details: {error_detail}")
raise RuntimeError("Failed to start Deepgram connection")
except Exception as e:
log_error(f"❌ Connection error: {str(e)}")
# Log more details about the exception
if hasattr(e, 'response'):
log_error(f"❌ Response: {e.response}")
if hasattr(e, 'status_code'):
log_error(f"❌ Status code: {e.status_code}")
raise
except Exception as e:
log_error(f"❌ Failed to start Deepgram STT", error=str(e))
if hasattr(e, '__dict__'):
log_error(f"❌ Error details: {e.__dict__}")
self.is_streaming = False
self.live_connection = None
self.deepgram_client = None
raise
def _setup_event_handlers(self):
"""Setup event handlers for Deepgram events"""
# Transcript received - use the existing class method
self.live_connection.on(LiveTranscriptionEvents.Transcript, self._on_transcript)
# Speech started
self.live_connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)
# Utterance end
self.live_connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end)
# Metadata
self.live_connection.on(LiveTranscriptionEvents.Metadata, self._on_metadata)
# Error
self.live_connection.on(LiveTranscriptionEvents.Error, self._on_error)
# Connection closed
self.live_connection.on(LiveTranscriptionEvents.Close, self._on_close)
def _on_transcript(self, *args, **kwargs):
"""Handle transcript event - SDK calls this method directly"""
try:
# SDK passes the result as second argument
result = args[1] if len(args) > 1 else kwargs.get("result")
if not result:
log_warning("⚠️ No result in transcript event")
return
# βœ… Debug iΓ§in result objesini detaylΔ± inceleyin
if self.total_chunks < 5: # Δ°lk birkaΓ§ event iΓ§in
log_debug(f"πŸ” Result object type: {type(result)}")
log_debug(f"πŸ” Result dir: {[attr for attr in dir(result) if not attr.startswith('_')]}")
# Result'un tΓΌm property'lerini logla
try:
if hasattr(result, '__dict__'):
log_debug(f"πŸ” Result dict: {result.__dict__}")
except:
pass
# Access properties directly from the result object
is_final = result.is_final if hasattr(result, 'is_final') else False
# Get transcript from channel alternatives
if hasattr(result, 'channel') and result.channel:
alternatives = result.channel.alternatives
if alternatives and len(alternatives) > 0:
transcript = alternatives[0].transcript
confidence = alternatives[0].confidence
# Log all transcripts for debugging
log_debug(f"πŸ“ Raw transcript: '{transcript}' (is_final: {is_final}, confidence: {confidence})")
# βœ… Γ–NEMLΔ° DEĞİŞİKLΔ°K: Final result'larΔ± boş olsa bile kabul et
if is_final:
# Final transcript - boş olabilir ama yine de işle
transcription_result = TranscriptionResult(
text=transcript or "", # Boş string olabilir
is_final=is_final,
confidence=confidence,
timestamp=datetime.now().timestamp()
)
try:
self.responses_queue.put(transcription_result)
self.final_result_received = True
if transcript and transcript.strip():
log_info(f"🎯 FINAL TRANSCRIPT: '{transcript}' (confidence: {confidence:.2f})")
else:
log_warning(f"⚠️ Empty final transcript received - but queued for state change")
except queue.Full:
log_warning("⚠️ Response queue full")
elif transcript and transcript.strip():
# Interim result - sadece dolu olanlarΔ± kabul et
transcription_result = TranscriptionResult(
text=transcript,
is_final=is_final,
confidence=confidence,
timestamp=datetime.now().timestamp()
)
try:
self.responses_queue.put(transcription_result)
log_info(f"πŸ“ Interim transcript: '{transcript}'")
except queue.Full:
log_warning("⚠️ Response queue full")
except Exception as e:
log_error(f"❌ Error processing transcript: {e}")
log_error(f"❌ Args: {args}")
log_error(f"❌ Kwargs: {kwargs}")
import traceback
log_error(f"❌ Traceback: {traceback.format_exc()}")
def _on_speech_started(self, *args, **kwargs):
"""Handle speech started event"""
log_info("🎀 Speech detected - User started speaking")
def _on_utterance_end(self, *args, **kwargs):
"""Handle utterance end event"""
log_info("πŸ”š Speech ended - User stopped speaking")
# Deepgram will send final transcript after this
def _on_metadata(self, *args, **kwargs):
"""Handle metadata event"""
metadata = args[1] if len(args) > 1 else kwargs.get("metadata", {})
request_id = metadata.get("request_id", "")
log_debug(f"πŸ“‹ Deepgram metadata - Request ID: {request_id}")
def _on_error(self, *args, **kwargs):
"""Handle error event"""
error = args[1] if len(args) > 1 else kwargs.get("error", {})
log_error(f"❌ Deepgram error: {error}")
def _on_close(self, *args, **kwargs):
"""Handle connection close event"""
log_info("πŸ”Œ Deepgram connection closed")
self.is_streaming = False
async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
"""Stream audio chunk and get transcription results"""
if not self.is_streaming or not self.live_connection:
raise RuntimeError("Streaming not started. Call start_streaming() first.")
try:
# Δ°lk birkaΓ§ chunk iΓ§in audio formatΔ±nΔ± analiz et
if self.total_chunks < 3:
if len(audio_chunk) >= 4:
import struct
try:
first_sample = struct.unpack('<h', audio_chunk[:2])[0]
log_info(f"πŸ”Š Audio format check - Chunk #{self.total_chunks}: First sample={first_sample}, Size={len(audio_chunk)} bytes")
except:
log_warning("⚠️ Could not parse as Linear16")
# βœ… Final result geldiyse audio gΓΆnderme ama queue'yu kontrol et
if not self.final_result_received:
# Send audio to Deepgram
self.live_connection.send(audio_chunk)
self.total_chunks += 1
self.total_audio_bytes += len(audio_chunk)
# Log progress
if self.total_chunks % 50 == 0:
log_debug(f"πŸ“Š Listening... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB")
# βœ… Her zaman queue'yu kontrol et ve result'larΔ± yield et
while True:
try:
result = self.responses_queue.get_nowait()
# Log for debugging
log_debug(f"🎯 Yielding result: is_final={result.is_final}, text='{result.text}'")
if result.is_final:
self.final_result_received = True
yield result # βœ… Final result'Δ± yield et
# ArtΔ±k yeni audio kabul etme
elif not self.final_result_received:
# Final gelmeden ΓΆnce interim'leri yield et
yield result
except queue.Empty:
break
except Exception as e:
log_error(f"❌ Error streaming audio", error=str(e))
self.is_streaming = False
raise
async def stop_streaming(self) -> Optional[TranscriptionResult]:
"""Stop streaming and clean up"""
if not self.is_streaming:
log_debug("Already stopped, nothing to do")
return None
try:
log_info(f"πŸ›‘ Stopping Deepgram STT session #{self.session_id}")
self.is_streaming = False
# Finish the stream to get final results
if self.live_connection:
try:
# Finish the stream - this triggers final transcript
self.live_connection.finish()
# Wait a bit for final result
await asyncio.sleep(0.5)
except Exception as e:
log_warning(f"⚠️ Error finishing stream: {e}")
# Get final result from queue
final_result = None
while not self.responses_queue.empty():
try:
result = self.responses_queue.get_nowait()
if result.is_final:
final_result = result
except queue.Empty:
break
# Clean up
self.live_connection = None
self.deepgram_client = None
self.final_result_received = False
log_info(f"βœ… Deepgram STT session #{self.session_id} stopped")
return final_result
except Exception as e:
log_error(f"❌ Error during stop_streaming", error=str(e))
self.is_streaming = False
self.live_connection = None
self.deepgram_client = None
return None
def _reset_session_data(self):
"""Reset session-specific data"""
# Clear queue
while not self.responses_queue.empty():
try:
self.responses_queue.get_nowait()
except:
pass
# Reset counters
self.total_audio_bytes = 0
self.total_chunks = 0
self.session_id += 1
self.final_result_received = False
log_debug(f"πŸ”„ Session data reset. New session ID: {self.session_id}")
def supports_realtime(self) -> bool:
"""Deepgram supports real-time streaming"""
return True
def get_supported_languages(self) -> List[str]:
"""Get list of supported language codes"""
return [
"tr-TR", # Turkish
"en-US", # English (US)
"en-GB", # English (UK)
"de-DE", # German
"fr-FR", # French
"es-ES", # Spanish
"it-IT", # Italian
"pt-BR", # Portuguese (Brazil)
"ru-RU", # Russian
"ja-JP", # Japanese
"ko-KR", # Korean
"zh-CN", # Chinese (Simplified)
"ar-SA", # Arabic
"nl-NL", # Dutch
"sv-SE", # Swedish
"pl-PL", # Polish
"hi-IN", # Hindi
"cs-CZ", # Czech
"da-DK", # Danish
"fi-FI", # Finnish
"el-GR", # Greek
"he-IL", # Hebrew
"hu-HU", # Hungarian
"id-ID", # Indonesian
"ms-MY", # Malay
"no-NO", # Norwegian
"ro-RO", # Romanian
"sk-SK", # Slovak
"th-TH", # Thai
"uk-UA", # Ukrainian
"vi-VN", # Vietnamese
]
def get_provider_name(self) -> str:
"""Get provider name"""
return "deepgram"