Spaces:
Running
Running
""" | |
Deepgram Speech-to-Text Implementation using Deepgram SDK | |
""" | |
import asyncio | |
from typing import AsyncIterator, Optional, List, Any | |
from datetime import datetime | |
import queue | |
import threading | |
import traceback | |
from deepgram import ( | |
DeepgramClient, | |
DeepgramClientOptions, | |
LiveTranscriptionEvents, | |
LiveOptions, | |
Microphone, | |
) | |
from utils.logger import log_info, log_error, log_debug, log_warning | |
from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
class DeepgramSTT(STTInterface): | |
"""Deepgram STT implementation using official SDK""" | |
def __init__(self, api_key: str): | |
if not api_key: | |
raise ValueError("Deepgram API key is required") | |
# Debug iΓ§in API key'in ilk 10 karakterini logla | |
log_info(f"π Deepgram API key resolved: {api_key[:10]}... (length: {len(api_key)})") | |
self.api_key = api_key | |
self.deepgram_client = None | |
self.live_connection = None | |
self.is_streaming = False | |
self.responses_queue = queue.Queue(maxsize=100) | |
# Session tracking | |
self.session_id = 0 | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
# Final result tracking | |
self.final_result_received = False | |
self.stop_event = threading.Event() | |
log_info(f"β Deepgram STT initialized (SDK version)") | |
def _map_language_code(self, language: str) -> str: | |
"""Map language codes to Deepgram format""" | |
language_map = { | |
"tr-TR": "tr", | |
"en-US": "en-US", | |
"en-GB": "en-GB", | |
"de-DE": "de", | |
"fr-FR": "fr", | |
"es-ES": "es", | |
"it-IT": "it", | |
"pt-BR": "pt-BR", | |
"ru-RU": "ru", | |
"ja-JP": "ja", | |
"ko-KR": "ko", | |
"zh-CN": "zh-CN", | |
"ar-SA": "ar", | |
} | |
return language_map.get(language, language) | |
async def start_streaming(self, config: STTConfig) -> None: | |
"""Initialize streaming session using SDK""" | |
try: | |
# Stop any existing stream | |
if self.is_streaming: | |
log_warning("β οΈ Previous stream still active, stopping it first") | |
await self.stop_streaming() | |
await asyncio.sleep(0.5) | |
# Reset session data | |
self._reset_session_data() | |
log_info(f"π€ Starting Deepgram STT (SDK) - Session #{self.session_id}") | |
# Create Deepgram client with more verbose logging for debugging | |
config_options = DeepgramClientOptions( | |
verbose=False, | |
options={"keepalive": "true"} | |
) | |
self.deepgram_client = DeepgramClient(self.api_key, config=config_options) | |
# Try minimal configuration first | |
options = LiveOptions( | |
language="tr", | |
model="nova-2", | |
encoding="linear16", | |
sample_rate=16000, | |
interim_results=True, # Bu zorunlu, yoksa final result da gelmiyor... | |
channels=1, | |
utterance_end_ms=1000, # 1 second silence = end | |
punctuate=True, | |
smart_format=True, | |
#numerals=True, | |
#profanity_filter=False, | |
#redact=False, | |
no_delay=True, | |
vad_events=True # Enable VAD events | |
) | |
log_info(f"π§ Deepgram options: language=tr, model=nova-2, encoding=linear16, interim_results=True") | |
# Create live connection | |
self.live_connection = self.deepgram_client.listen.live.v("1") | |
# Setup event handlers | |
self._setup_event_handlers() | |
try: | |
# Log before connection attempt | |
log_info("π Attempting to connect to Deepgram...") | |
result = self.live_connection.start(options) | |
log_info(f"π Connection start result: {result}") | |
if result: | |
self.is_streaming = True | |
log_info(f"β Deepgram SDK connected - Ready for speech") | |
else: | |
# Try to get more error details | |
if hasattr(self.live_connection, 'get_error') or hasattr(self.live_connection, 'error'): | |
error_detail = getattr(self.live_connection, 'error', 'No error details') | |
log_error(f"β Connection failed with details: {error_detail}") | |
raise RuntimeError("Failed to start Deepgram connection") | |
except Exception as e: | |
log_error(f"β Connection error: {str(e)}") | |
# Log more details about the exception | |
if hasattr(e, 'response'): | |
log_error(f"β Response: {e.response}") | |
if hasattr(e, 'status_code'): | |
log_error(f"β Status code: {e.status_code}") | |
raise | |
except Exception as e: | |
log_error(f"β Failed to start Deepgram STT", error=str(e)) | |
if hasattr(e, '__dict__'): | |
log_error(f"β Error details: {e.__dict__}") | |
self.is_streaming = False | |
self.live_connection = None | |
self.deepgram_client = None | |
raise | |
def _setup_event_handlers(self): | |
"""Setup event handlers for Deepgram events""" | |
# Transcript received - use the existing class method | |
self.live_connection.on(LiveTranscriptionEvents.Transcript, self._on_transcript) | |
# Speech started | |
self.live_connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started) | |
# Utterance end | |
self.live_connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end) | |
# Metadata | |
self.live_connection.on(LiveTranscriptionEvents.Metadata, self._on_metadata) | |
# Error | |
self.live_connection.on(LiveTranscriptionEvents.Error, self._on_error) | |
# Connection closed | |
self.live_connection.on(LiveTranscriptionEvents.Close, self._on_close) | |
def _on_transcript(self, *args, **kwargs): | |
"""Handle transcript event - SDK calls this method directly""" | |
try: | |
# SDK passes the result as second argument | |
result = args[1] if len(args) > 1 else kwargs.get("result") | |
if not result: | |
log_warning("β οΈ No result in transcript event") | |
return | |
# β Debug iΓ§in result objesini detaylΔ± inceleyin | |
if self.total_chunks < 5: # Δ°lk birkaΓ§ event iΓ§in | |
log_debug(f"π Result object type: {type(result)}") | |
log_debug(f"π Result dir: {[attr for attr in dir(result) if not attr.startswith('_')]}") | |
# Result'un tΓΌm property'lerini logla | |
try: | |
if hasattr(result, '__dict__'): | |
log_debug(f"π Result dict: {result.__dict__}") | |
except: | |
pass | |
# Access properties directly from the result object | |
is_final = result.is_final if hasattr(result, 'is_final') else False | |
# Get transcript from channel alternatives | |
if hasattr(result, 'channel') and result.channel: | |
alternatives = result.channel.alternatives | |
if alternatives and len(alternatives) > 0: | |
transcript = alternatives[0].transcript | |
confidence = alternatives[0].confidence | |
# Log all transcripts for debugging | |
log_debug(f"π Raw transcript: '{transcript}' (is_final: {is_final}, confidence: {confidence})") | |
# β ΓNEMLΔ° DEΔΔ°ΕΔ°KLΔ°K: Final result'larΔ± boΕ olsa bile kabul et | |
if is_final: | |
# Final transcript - boΕ olabilir ama yine de iΕle | |
transcription_result = TranscriptionResult( | |
text=transcript or "", # BoΕ string olabilir | |
is_final=is_final, | |
confidence=confidence, | |
timestamp=datetime.now().timestamp() | |
) | |
try: | |
self.responses_queue.put(transcription_result) | |
self.final_result_received = True | |
if transcript and transcript.strip(): | |
log_info(f"π― FINAL TRANSCRIPT: '{transcript}' (confidence: {confidence:.2f})") | |
else: | |
log_warning(f"β οΈ Empty final transcript received - but queued for state change") | |
except queue.Full: | |
log_warning("β οΈ Response queue full") | |
elif transcript and transcript.strip(): | |
# Interim result - sadece dolu olanlarΔ± kabul et | |
transcription_result = TranscriptionResult( | |
text=transcript, | |
is_final=is_final, | |
confidence=confidence, | |
timestamp=datetime.now().timestamp() | |
) | |
try: | |
self.responses_queue.put(transcription_result) | |
log_info(f"π Interim transcript: '{transcript}'") | |
except queue.Full: | |
log_warning("β οΈ Response queue full") | |
except Exception as e: | |
log_error(f"β Error processing transcript: {e}") | |
log_error(f"β Args: {args}") | |
log_error(f"β Kwargs: {kwargs}") | |
import traceback | |
log_error(f"β Traceback: {traceback.format_exc()}") | |
def _on_speech_started(self, *args, **kwargs): | |
"""Handle speech started event""" | |
log_info("π€ Speech detected - User started speaking") | |
def _on_utterance_end(self, *args, **kwargs): | |
"""Handle utterance end event""" | |
log_info("π Speech ended - User stopped speaking") | |
# Deepgram will send final transcript after this | |
def _on_metadata(self, *args, **kwargs): | |
"""Handle metadata event""" | |
metadata = args[1] if len(args) > 1 else kwargs.get("metadata", {}) | |
request_id = metadata.get("request_id", "") | |
log_debug(f"π Deepgram metadata - Request ID: {request_id}") | |
def _on_error(self, *args, **kwargs): | |
"""Handle error event""" | |
error = args[1] if len(args) > 1 else kwargs.get("error", {}) | |
log_error(f"β Deepgram error: {error}") | |
def _on_close(self, *args, **kwargs): | |
"""Handle connection close event""" | |
log_info("π Deepgram connection closed") | |
self.is_streaming = False | |
async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: | |
"""Stream audio chunk and get transcription results""" | |
if not self.is_streaming or not self.live_connection: | |
raise RuntimeError("Streaming not started. Call start_streaming() first.") | |
try: | |
# Δ°lk birkaΓ§ chunk iΓ§in audio formatΔ±nΔ± analiz et | |
if self.total_chunks < 3: | |
if len(audio_chunk) >= 4: | |
import struct | |
try: | |
first_sample = struct.unpack('<h', audio_chunk[:2])[0] | |
log_info(f"π Audio format check - Chunk #{self.total_chunks}: First sample={first_sample}, Size={len(audio_chunk)} bytes") | |
except: | |
log_warning("β οΈ Could not parse as Linear16") | |
# β Final result geldiyse audio gΓΆnderme ama queue'yu kontrol et | |
if not self.final_result_received: | |
# Send audio to Deepgram | |
self.live_connection.send(audio_chunk) | |
self.total_chunks += 1 | |
self.total_audio_bytes += len(audio_chunk) | |
# Log progress | |
if self.total_chunks % 50 == 0: | |
log_debug(f"π Listening... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB") | |
# β Her zaman queue'yu kontrol et ve result'larΔ± yield et | |
while True: | |
try: | |
result = self.responses_queue.get_nowait() | |
# Log for debugging | |
log_debug(f"π― Yielding result: is_final={result.is_final}, text='{result.text}'") | |
if result.is_final: | |
self.final_result_received = True | |
yield result # β Final result'Δ± yield et | |
# ArtΔ±k yeni audio kabul etme | |
elif not self.final_result_received: | |
# Final gelmeden ΓΆnce interim'leri yield et | |
yield result | |
except queue.Empty: | |
break | |
except Exception as e: | |
log_error(f"β Error streaming audio", error=str(e)) | |
self.is_streaming = False | |
raise | |
async def stop_streaming(self) -> Optional[TranscriptionResult]: | |
"""Stop streaming and clean up""" | |
if not self.is_streaming: | |
log_debug("Already stopped, nothing to do") | |
return None | |
try: | |
log_info(f"π Stopping Deepgram STT session #{self.session_id}") | |
self.is_streaming = False | |
# Finish the stream to get final results | |
if self.live_connection: | |
try: | |
# Finish the stream - this triggers final transcript | |
self.live_connection.finish() | |
# Wait a bit for final result | |
await asyncio.sleep(0.5) | |
except Exception as e: | |
log_warning(f"β οΈ Error finishing stream: {e}") | |
# Get final result from queue | |
final_result = None | |
while not self.responses_queue.empty(): | |
try: | |
result = self.responses_queue.get_nowait() | |
if result.is_final: | |
final_result = result | |
except queue.Empty: | |
break | |
# Clean up | |
self.live_connection = None | |
self.deepgram_client = None | |
self.final_result_received = False | |
log_info(f"β Deepgram STT session #{self.session_id} stopped") | |
return final_result | |
except Exception as e: | |
log_error(f"β Error during stop_streaming", error=str(e)) | |
self.is_streaming = False | |
self.live_connection = None | |
self.deepgram_client = None | |
return None | |
def _reset_session_data(self): | |
"""Reset session-specific data""" | |
# Clear queue | |
while not self.responses_queue.empty(): | |
try: | |
self.responses_queue.get_nowait() | |
except: | |
pass | |
# Reset counters | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
self.session_id += 1 | |
self.final_result_received = False | |
log_debug(f"π Session data reset. New session ID: {self.session_id}") | |
def supports_realtime(self) -> bool: | |
"""Deepgram supports real-time streaming""" | |
return True | |
def get_supported_languages(self) -> List[str]: | |
"""Get list of supported language codes""" | |
return [ | |
"tr-TR", # Turkish | |
"en-US", # English (US) | |
"en-GB", # English (UK) | |
"de-DE", # German | |
"fr-FR", # French | |
"es-ES", # Spanish | |
"it-IT", # Italian | |
"pt-BR", # Portuguese (Brazil) | |
"ru-RU", # Russian | |
"ja-JP", # Japanese | |
"ko-KR", # Korean | |
"zh-CN", # Chinese (Simplified) | |
"ar-SA", # Arabic | |
"nl-NL", # Dutch | |
"sv-SE", # Swedish | |
"pl-PL", # Polish | |
"hi-IN", # Hindi | |
"cs-CZ", # Czech | |
"da-DK", # Danish | |
"fi-FI", # Finnish | |
"el-GR", # Greek | |
"he-IL", # Hebrew | |
"hu-HU", # Hungarian | |
"id-ID", # Indonesian | |
"ms-MY", # Malay | |
"no-NO", # Norwegian | |
"ro-RO", # Romanian | |
"sk-SK", # Slovak | |
"th-TH", # Thai | |
"uk-UA", # Ukrainian | |
"vi-VN", # Vietnamese | |
] | |
def get_provider_name(self) -> str: | |
"""Get provider name""" | |
return "deepgram" |