""" Google Cloud Speech-to-Text Implementation """ import asyncio from typing import AsyncIterator, Optional, List, Any from datetime import datetime import queue import threading import traceback import os from google.cloud import speech from google.cloud.speech import RecognitionConfig, StreamingRecognitionConfig import google.auth from utils.logger import log_info, log_error, log_debug, log_warning from .stt_interface import STTInterface, STTConfig, TranscriptionResult class GoogleSTT(STTInterface): def __init__(self, credentials_path: Optional[str] = None): """ Initialize Google STT Args: credentials_path: Path to service account JSON file (optional if using default credentials) """ try: TEST_CREDENTIALS = ''' { "type": "service_account", "project_id": "YOUR_PROJECT_ID", "private_key_id": "YOUR_KEY_ID", "private_key": "-----BEGIN PRIVATE KEY-----\\nYOUR_PRIVATE_KEY\\n-----END PRIVATE KEY-----\\n", "client_email": "YOUR_SERVICE_ACCOUNT@YOUR_PROJECT.iam.gserviceaccount.com", "client_id": "YOUR_CLIENT_ID", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "YOUR_CERT_URL" } ''' ''' # ✅ Debug için path kontrolü if credentials_path: import os if not os.path.exists(credentials_path): log_error(f"❌ Credentials file not found at: {credentials_path}") raise FileNotFoundError(f"Credentials file not found: {credentials_path}") log_info(f"📁 Using credentials from: {credentials_path}") # Initialize client if credentials_path: self.client = speech.SpeechClient.from_service_account_file(credentials_path) log_info(f"✅ Google STT initialized with service account: {credentials_path}") else: # Use default credentials (ADC) self.client = speech.SpeechClient() log_info("✅ Google STT initialized with default credentials") ''' # String'den credentials oluştur import json from google.oauth2 import service_account credentials_dict = json.loads(TEST_CREDENTIALS) credentials = service_account.Credentials.from_service_account_info( credentials_dict, scopes=["https://www.googleapis.com/auth/cloud-platform"] ) self.client = speech.SpeechClient(credentials=credentials) log_info(f"✅ Google STT initialized with inline credentials") # Streaming state self.is_streaming = False self.audio_generator = None self.responses_stream = None self.audio_queue = queue.Queue() self.results_queue = queue.Queue(maxsize=100) # Session tracking self.session_id = 0 self.total_audio_bytes = 0 self.total_chunks = 0 # Threading self.stream_thread = None self.stop_event = threading.Event() except Exception as e: log_error(f"❌ Failed to initialize Google STT: {str(e)}") raise def _map_language_code(self, language: str) -> str: """Map language codes to Google format""" # Google uses BCP-47 language codes language_map = { "tr-TR": "tr-TR", "en-US": "en-US", "en-GB": "en-GB", "de-DE": "de-DE", "fr-FR": "fr-FR", "es-ES": "es-ES", "it-IT": "it-IT", "pt-BR": "pt-BR", "ru-RU": "ru-RU", "ja-JP": "ja-JP", "ko-KR": "ko-KR", "zh-CN": "zh-CN", "ar-SA": "ar-SA", } return language_map.get(language, language) async def start_streaming(self, config: STTConfig) -> None: """Initialize streaming session""" try: # Stop any existing stream if self.is_streaming: log_warning("⚠️ Previous stream still active, stopping it first") await self.stop_streaming() await asyncio.sleep(0.5) # Reset session data self._reset_session_data() log_info(f"🎤 Starting Google STT - Session #{self.session_id}") # Configure recognition settings language_code = self._map_language_code(config.language) """ # ✅ Google STT best practices for Turkish and single utterance recognition_config = RecognitionConfig( encoding=RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=16000, language_code="tr-TR", # ✅ Single utterance için ideal ayarlar enable_automatic_punctuation=True, # Model selection - latest_long for better accuracy model="latest_long", # Use enhanced model if available (better for Turkish) use_enhanced=True, # Single channel audio audio_channel_count=1, # Alternative transcripts for debugging max_alternatives=1, # Profanity filter disabled for accuracy profanity_filter=False, # Word level confidence enable_word_confidence=False, enable_spoken_punctuation=False, enable_spoken_emojis=False, ) # ✅ Streaming config - optimized for final results only self.streaming_config = StreamingRecognitionConfig( config=recognition_config, single_utterance=False, interim_results=True ) """ # ✅ EN BASİT CONFIG - sadece zorunlu alanlar recognition_config = RecognitionConfig( encoding=RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=16000, language_code="tr-TR" ) # ✅ Streaming config - en basit hali self.streaming_config = StreamingRecognitionConfig( config=recognition_config, interim_results=True ) log_info(f"🔧 Google STT config: language={language_code}, " f"model=latest_long, enhanced=True, " f"single_utterance=True, interim_results=False") # Start streaming in background thread self.stop_event.clear() self.stream_thread = threading.Thread( target=self._stream_recognition, daemon=True ) self.stream_thread.start() self.is_streaming = True log_info(f"✅ Google STT started - Ready for speech") except Exception as e: log_error(f"❌ Failed to start Google STT", error=str(e)) self.is_streaming = False raise def _stream_recognition(self): """Background thread for streaming recognition""" try: log_debug("🎙️ Starting recognition stream thread") # ✅ Config'i logla log_debug(f"Config details: {self.streaming_config}") # Create audio generator audio_generator = self._audio_generator() # ✅ Daha detaylı hata yakalama try: # Start streaming recognition responses = self.client.streaming_recognize( self.streaming_config, audio_generator ) except Exception as api_error: log_error(f"❌ Google API error: {str(api_error)}") log_error(f"❌ Error type: {type(api_error).__name__}") if hasattr(api_error, 'details'): log_error(f"❌ Error details: {api_error.details()}") if hasattr(api_error, '__dict__'): log_error(f"❌ Error attributes: {api_error.__dict__}") import traceback log_error(f"❌ Full traceback: {traceback.format_exc()}") raise # Process responses for response in responses: if self.stop_event.is_set(): break if not response.results: continue # Process each result for result in response.results: if not result.alternatives: continue # Get best alternative alternative = result.alternatives[0] # Only process if we have transcript if alternative.transcript: # ✅ Interim result'ları logla ama queue'ya koyma if not result.is_final: log_debug(f"📝 Interim transcript (ignored): '{alternative.transcript}'") continue # ✅ Sadece final result'ları işle transcription_result = TranscriptionResult( text=alternative.transcript, is_final=result.is_final, confidence=alternative.confidence, timestamp=datetime.now().timestamp() ) try: self.results_queue.put(transcription_result) if result.is_final: log_info(f"🎯 FINAL TRANSCRIPT: '{alternative.transcript}' " f"(confidence: {alternative.confidence:.2f})") # Single utterance mode will end stream after this break else: # This shouldn't happen with interim_results=False log_debug(f"📝 Transcript: '{alternative.transcript}'") except queue.Full: log_warning("⚠️ Results queue full") # Check if stream ended due to single_utterance if hasattr(response, 'speech_event_type'): if response.speech_event_type == speech.StreamingRecognizeResponse.SpeechEventType.END_OF_SINGLE_UTTERANCE: log_info("🔚 End of single utterance detected") break except Exception as e: if not self.stop_event.is_set(): log_error(f"❌ Recognition stream error: {str(e)}") # Put error in queue error_result = TranscriptionResult( text="", is_final=True, confidence=0.0, timestamp=datetime.now().timestamp() ) self.results_queue.put(error_result) finally: log_debug("🎙️ Recognition stream thread ended") self.is_streaming = False def _audio_generator(self): """Generator that yields audio chunks for streaming""" chunk_count = 0 try: while not self.stop_event.is_set(): try: # Get audio chunk with timeout chunk = self.audio_queue.get(timeout=0.1) if chunk is None: # Sentinel value log_debug("🔚 Audio generator received sentinel, stopping") break # ✅ Debug için chunk bilgisi chunk_count += 1 if chunk_count <= 5: # İlk 5 chunk için detaylı log log_debug(f"🎵 Audio generator yielding chunk #{chunk_count}, size: {len(chunk)} bytes") # Chunk'ın byte tipinde olduğundan emin ol if not isinstance(chunk, bytes): log_error(f"❌ Chunk is not bytes! Type: {type(chunk)}") continue # ✅ Google API'nin beklediği format yield chunk except queue.Empty: continue except Exception as e: log_error(f"❌ Audio generator error: {str(e)}") break finally: log_debug(f"🎙️ Audio generator stopped after {chunk_count} chunks") async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: """Stream audio chunk and get transcription results""" if not self.is_streaming: raise RuntimeError("Streaming not started. Call start_streaming() first.") try: # ✅ Audio chunk tipini kontrol et if not isinstance(audio_chunk, bytes): log_error(f"❌ Audio chunk is not bytes! Type: {type(audio_chunk)}") raise TypeError(f"Expected bytes, got {type(audio_chunk)}") # ✅ Chunk boyutunu logla if self.total_chunks < 5: log_debug(f"📦 Adding audio chunk #{self.total_chunks} to queue, size: {len(audio_chunk)} bytes") # Add audio to queue for background thread self.audio_queue.put(audio_chunk) self.total_chunks += 1 self.total_audio_bytes += len(audio_chunk) # Log progress if self.total_chunks % 50 == 0: log_debug(f"📊 Processing... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB") # Check for results while True: try: result = self.results_queue.get_nowait() # Log for debugging log_debug(f"🎯 Yielding result: is_final={result.is_final}, text='{result.text}'") yield result # If final result, stream will end if result.is_final: self.is_streaming = False except queue.Empty: break except Exception as e: log_error(f"❌ Error streaming audio", error=str(e)) self.is_streaming = False raise async def stop_streaming(self) -> Optional[TranscriptionResult]: """Stop streaming and clean up""" if not self.is_streaming: log_debug("Already stopped, nothing to do") return None try: log_info(f"🛑 Stopping Google STT session #{self.session_id}") self.is_streaming = False # Signal stop self.stop_event.set() # Send sentinel to audio queue self.audio_queue.put(None) # Wait for thread to finish if self.stream_thread and self.stream_thread.is_alive(): self.stream_thread.join(timeout=2.0) # Get final result if any final_result = None while not self.results_queue.empty(): try: result = self.results_queue.get_nowait() if result.is_final and result.text: final_result = result except queue.Empty: break log_info(f"✅ Google STT session #{self.session_id} stopped") return final_result except Exception as e: log_error(f"❌ Error during stop_streaming", error=str(e)) self.is_streaming = False return None def _reset_session_data(self): """Reset session-specific data""" # Clear queues while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() except: pass while not self.results_queue.empty(): try: self.results_queue.get_nowait() except: pass # Reset counters self.total_audio_bytes = 0 self.total_chunks = 0 self.session_id += 1 log_debug(f"🔄 Session data reset. New session ID: {self.session_id}") def supports_realtime(self) -> bool: """Google STT supports real-time streaming""" return True def get_supported_languages(self) -> List[str]: """Get list of supported language codes""" # Google Cloud Speech-to-Text supported languages (partial list) # Full list: https://cloud.google.com/speech-to-text/docs/languages return [ "tr-TR", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", "es-ES", "es-MX", "es-AR", "fr-FR", "fr-CA", "de-DE", "it-IT", "pt-BR", "pt-PT", "ru-RU", "ja-JP", "ko-KR", "zh-CN", "zh-TW", "ar-SA", "ar-EG", "hi-IN", "nl-NL", "pl-PL", "sv-SE", "da-DK", "no-NO", "fi-FI", "el-GR", "he-IL", "th-TH", "vi-VN", "id-ID", "ms-MY", "fil-PH" ] def get_provider_name(self) -> str: """Get provider name""" return "google"