Spaces:
Paused
Paused
""" | |
Google Cloud Speech-to-Text Implementation | |
""" | |
import asyncio | |
from typing import AsyncIterator, Optional, List, Any | |
from datetime import datetime | |
import queue | |
import threading | |
import traceback | |
import os | |
from google.cloud import speech | |
from google.cloud.speech import RecognitionConfig, StreamingRecognitionConfig | |
import google.auth | |
from utils.logger import log_info, log_error, log_debug, log_warning | |
from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
class GoogleSTT(STTInterface): | |
def __init__(self, credentials_path: Optional[str] = None): | |
""" | |
Initialize Google STT | |
Args: | |
credentials_path: Path to service account JSON file (optional if using default credentials) | |
""" | |
try: | |
# β Debug iΓ§in path kontrolΓΌ | |
if credentials_path: | |
import os | |
if not os.path.exists(credentials_path): | |
log_error(f"β Credentials file not found at: {credentials_path}") | |
raise FileNotFoundError(f"Credentials file not found: {credentials_path}") | |
log_info(f"π Using credentials from: {credentials_path}") | |
# Initialize client | |
if credentials_path: | |
self.client = speech.SpeechClient.from_service_account_file(credentials_path) | |
log_info(f"β Google STT initialized with service account: {credentials_path}") | |
else: | |
# Use default credentials (ADC) | |
self.client = speech.SpeechClient() | |
log_info("β Google STT initialized with default credentials") | |
# Streaming state | |
self.is_streaming = False | |
self.audio_generator = None | |
self.responses_stream = None | |
self.audio_queue = queue.Queue() | |
self.results_queue = queue.Queue(maxsize=100) | |
# Session tracking | |
self.session_id = 0 | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
# Threading | |
self.stream_thread = None | |
self.stop_event = threading.Event() | |
except Exception as e: | |
log_error(f"β Failed to initialize Google STT: {str(e)}") | |
raise | |
def _map_language_code(self, language: str) -> str: | |
"""Map language codes to Google format""" | |
# Google uses BCP-47 language codes | |
language_map = { | |
"tr-TR": "tr-TR", | |
"en-US": "en-US", | |
"en-GB": "en-GB", | |
"de-DE": "de-DE", | |
"fr-FR": "fr-FR", | |
"es-ES": "es-ES", | |
"it-IT": "it-IT", | |
"pt-BR": "pt-BR", | |
"ru-RU": "ru-RU", | |
"ja-JP": "ja-JP", | |
"ko-KR": "ko-KR", | |
"zh-CN": "zh-CN", | |
"ar-SA": "ar-SA", | |
} | |
return language_map.get(language, language) | |
async def start_streaming(self, config: STTConfig) -> None: | |
"""Initialize streaming session""" | |
try: | |
# Stop any existing stream | |
if self.is_streaming: | |
log_warning("β οΈ Previous stream still active, stopping it first") | |
await self.stop_streaming() | |
await asyncio.sleep(0.5) | |
# Reset session data | |
self._reset_session_data() | |
log_info(f"π€ Starting Google STT - Session #{self.session_id}") | |
# Configure recognition settings | |
language_code = self._map_language_code(config.language) | |
""" | |
# β Google STT best practices for Turkish and single utterance | |
recognition_config = RecognitionConfig( | |
encoding=RecognitionConfig.AudioEncoding.LINEAR16, | |
sample_rate_hertz=16000, | |
language_code="tr-TR", | |
# β Single utterance iΓ§in ideal ayarlar | |
enable_automatic_punctuation=True, | |
# Model selection - latest_long for better accuracy | |
model="latest_long", | |
# Use enhanced model if available (better for Turkish) | |
use_enhanced=True, | |
# Single channel audio | |
audio_channel_count=1, | |
# Alternative transcripts for debugging | |
max_alternatives=1, | |
# Profanity filter disabled for accuracy | |
profanity_filter=False, | |
# Word level confidence | |
enable_word_confidence=False, | |
enable_spoken_punctuation=False, | |
enable_spoken_emojis=False, | |
) | |
# β Streaming config - optimized for final results only | |
self.streaming_config = StreamingRecognitionConfig( | |
config=recognition_config, | |
single_utterance=False, | |
interim_results=True | |
) | |
""" | |
# β EN BASΔ°T CONFIG - sadece zorunlu alanlar | |
recognition_config = RecognitionConfig( | |
encoding=RecognitionConfig.AudioEncoding.LINEAR16, | |
sample_rate_hertz=16000, | |
language_code="tr-TR" | |
) | |
# β Streaming config - en basit hali | |
self.streaming_config = StreamingRecognitionConfig( | |
config=recognition_config, | |
interim_results=True | |
) | |
log_info(f"π§ Google STT config: language={language_code}, " | |
f"model=latest_long, enhanced=True, " | |
f"single_utterance=True, interim_results=False") | |
# Start streaming in background thread | |
self.stop_event.clear() | |
self.stream_thread = threading.Thread( | |
target=self._stream_recognition, | |
daemon=True | |
) | |
self.stream_thread.start() | |
self.is_streaming = True | |
log_info(f"β Google STT started - Ready for speech") | |
except Exception as e: | |
log_error(f"β Failed to start Google STT", error=str(e)) | |
self.is_streaming = False | |
raise | |
def _stream_recognition(self): | |
"""Background thread for streaming recognition""" | |
try: | |
log_debug("ποΈ Starting recognition stream thread") | |
# β Config'i logla | |
log_debug(f"Config details: {self.streaming_config}") | |
# Create audio generator | |
audio_generator = self._audio_generator() | |
# β Daha detaylΔ± hata yakalama | |
try: | |
# Start streaming recognition | |
responses = self.client.streaming_recognize( | |
self.streaming_config, | |
audio_generator | |
) | |
except Exception as api_error: | |
log_error(f"β Google API error: {str(api_error)}") | |
log_error(f"β Error type: {type(api_error).__name__}") | |
if hasattr(api_error, 'details'): | |
log_error(f"β Error details: {api_error.details()}") | |
if hasattr(api_error, '__dict__'): | |
log_error(f"β Error attributes: {api_error.__dict__}") | |
import traceback | |
log_error(f"β Full traceback: {traceback.format_exc()}") | |
raise | |
# Process responses | |
for response in responses: | |
if self.stop_event.is_set(): | |
break | |
if not response.results: | |
continue | |
# Process each result | |
for result in response.results: | |
if not result.alternatives: | |
continue | |
# Get best alternative | |
alternative = result.alternatives[0] | |
# Only process if we have transcript | |
if alternative.transcript: | |
# β Interim result'larΔ± logla ama queue'ya koyma | |
if not result.is_final: | |
log_debug(f"π Interim transcript (ignored): '{alternative.transcript}'") | |
continue | |
# β Sadece final result'larΔ± iΕle | |
transcription_result = TranscriptionResult( | |
text=alternative.transcript, | |
is_final=result.is_final, | |
confidence=alternative.confidence, | |
timestamp=datetime.now().timestamp() | |
) | |
try: | |
self.results_queue.put(transcription_result) | |
if result.is_final: | |
log_info(f"π― FINAL TRANSCRIPT: '{alternative.transcript}' " | |
f"(confidence: {alternative.confidence:.2f})") | |
# Single utterance mode will end stream after this | |
break | |
else: | |
# This shouldn't happen with interim_results=False | |
log_debug(f"π Transcript: '{alternative.transcript}'") | |
except queue.Full: | |
log_warning("β οΈ Results queue full") | |
# Check if stream ended due to single_utterance | |
if hasattr(response, 'speech_event_type'): | |
if response.speech_event_type == speech.StreamingRecognizeResponse.SpeechEventType.END_OF_SINGLE_UTTERANCE: | |
log_info("π End of single utterance detected") | |
break | |
except Exception as e: | |
if not self.stop_event.is_set(): | |
log_error(f"β Recognition stream error: {str(e)}") | |
# Put error in queue | |
error_result = TranscriptionResult( | |
text="", | |
is_final=True, | |
confidence=0.0, | |
timestamp=datetime.now().timestamp() | |
) | |
self.results_queue.put(error_result) | |
finally: | |
log_debug("ποΈ Recognition stream thread ended") | |
self.is_streaming = False | |
def _audio_generator(self): | |
"""Generator that yields audio chunks for streaming""" | |
chunk_count = 0 | |
try: | |
while not self.stop_event.is_set(): | |
try: | |
# Get audio chunk with timeout | |
chunk = self.audio_queue.get(timeout=0.1) | |
if chunk is None: # Sentinel value | |
log_debug("π Audio generator received sentinel, stopping") | |
break | |
# β Debug iΓ§in chunk bilgisi | |
chunk_count += 1 | |
if chunk_count <= 5: # Δ°lk 5 chunk iΓ§in detaylΔ± log | |
log_debug(f"π΅ Audio generator yielding chunk #{chunk_count}, size: {len(chunk)} bytes") | |
# Chunk'Δ±n byte tipinde olduΔundan emin ol | |
if not isinstance(chunk, bytes): | |
log_error(f"β Chunk is not bytes! Type: {type(chunk)}") | |
continue | |
# β Google API'nin beklediΔi format | |
yield chunk | |
except queue.Empty: | |
continue | |
except Exception as e: | |
log_error(f"β Audio generator error: {str(e)}") | |
break | |
finally: | |
log_debug(f"ποΈ Audio generator stopped after {chunk_count} chunks") | |
async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: | |
"""Stream audio chunk and get transcription results""" | |
if not self.is_streaming: | |
raise RuntimeError("Streaming not started. Call start_streaming() first.") | |
try: | |
# β Audio chunk tipini kontrol et | |
if not isinstance(audio_chunk, bytes): | |
log_error(f"β Audio chunk is not bytes! Type: {type(audio_chunk)}") | |
raise TypeError(f"Expected bytes, got {type(audio_chunk)}") | |
# β Chunk boyutunu logla | |
if self.total_chunks < 5: | |
log_debug(f"π¦ Adding audio chunk #{self.total_chunks} to queue, size: {len(audio_chunk)} bytes") | |
# Add audio to queue for background thread | |
self.audio_queue.put(audio_chunk) | |
self.total_chunks += 1 | |
self.total_audio_bytes += len(audio_chunk) | |
# Log progress | |
if self.total_chunks % 50 == 0: | |
log_debug(f"π Processing... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB") | |
# Check for results | |
while True: | |
try: | |
result = self.results_queue.get_nowait() | |
# Log for debugging | |
log_debug(f"π― Yielding result: is_final={result.is_final}, text='{result.text}'") | |
yield result | |
# If final result, stream will end | |
if result.is_final: | |
self.is_streaming = False | |
except queue.Empty: | |
break | |
except Exception as e: | |
log_error(f"β Error streaming audio", error=str(e)) | |
self.is_streaming = False | |
raise | |
async def stop_streaming(self) -> Optional[TranscriptionResult]: | |
"""Stop streaming and clean up""" | |
if not self.is_streaming: | |
log_debug("Already stopped, nothing to do") | |
return None | |
try: | |
log_info(f"π Stopping Google STT session #{self.session_id}") | |
self.is_streaming = False | |
# Signal stop | |
self.stop_event.set() | |
# Send sentinel to audio queue | |
self.audio_queue.put(None) | |
# Wait for thread to finish | |
if self.stream_thread and self.stream_thread.is_alive(): | |
self.stream_thread.join(timeout=2.0) | |
# Get final result if any | |
final_result = None | |
while not self.results_queue.empty(): | |
try: | |
result = self.results_queue.get_nowait() | |
if result.is_final and result.text: | |
final_result = result | |
except queue.Empty: | |
break | |
log_info(f"β Google STT session #{self.session_id} stopped") | |
return final_result | |
except Exception as e: | |
log_error(f"β Error during stop_streaming", error=str(e)) | |
self.is_streaming = False | |
return None | |
def _reset_session_data(self): | |
"""Reset session-specific data""" | |
# Clear queues | |
while not self.audio_queue.empty(): | |
try: | |
self.audio_queue.get_nowait() | |
except: | |
pass | |
while not self.results_queue.empty(): | |
try: | |
self.results_queue.get_nowait() | |
except: | |
pass | |
# Reset counters | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
self.session_id += 1 | |
log_debug(f"π Session data reset. New session ID: {self.session_id}") | |
def supports_realtime(self) -> bool: | |
"""Google STT supports real-time streaming""" | |
return True | |
def get_supported_languages(self) -> List[str]: | |
"""Get list of supported language codes""" | |
# Google Cloud Speech-to-Text supported languages (partial list) | |
# Full list: https://cloud.google.com/speech-to-text/docs/languages | |
return [ | |
"tr-TR", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", | |
"es-ES", "es-MX", "es-AR", "fr-FR", "fr-CA", "de-DE", | |
"it-IT", "pt-BR", "pt-PT", "ru-RU", "ja-JP", "ko-KR", | |
"zh-CN", "zh-TW", "ar-SA", "ar-EG", "hi-IN", "nl-NL", | |
"pl-PL", "sv-SE", "da-DK", "no-NO", "fi-FI", "el-GR", | |
"he-IL", "th-TH", "vi-VN", "id-ID", "ms-MY", "fil-PH" | |
] | |
def get_provider_name(self) -> str: | |
"""Get provider name""" | |
return "google" |