""" Google Cloud Speech-to-Text Implementation """ import os import asyncio from typing import AsyncIterator, Optional, List, Any from datetime import datetime import sys import queue import threading from logger import log_info, log_error, log_debug, log_warning # Import Google Cloud Speech only if available try: from google.cloud import speech from google.api_core import exceptions GOOGLE_SPEECH_AVAILABLE = True except ImportError: GOOGLE_SPEECH_AVAILABLE = False log_info("⚠️ Google Cloud Speech library not installed") from stt_interface import STTInterface, STTConfig, TranscriptionResult class GoogleCloudSTT(STTInterface): """Google Cloud Speech-to-Text implementation""" def __init__(self, credentials_path: str): if not GOOGLE_SPEECH_AVAILABLE: raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech") if credentials_path and os.path.exists(credentials_path): os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path log_info(f"✅ Google credentials set from: {credentials_path}") # Test credential'ları try: self.client = speech.SpeechClient() # Basit bir test çağrısı log_info("🔐 Testing Google credentials...") # Bu sadece client'ın oluşturulabildiğini test eder log_info("✅ Google credentials valid") except Exception as e: log_error(f"❌ Google credentials error", error=str(e)) raise else: log_error(f"❌ Google credentials path not found: {credentials_path}") raise FileNotFoundError(f"Credentials file not found: {credentials_path}") self.client = speech.SpeechClient() self.streaming_config = None self.is_streaming = False self.audio_queue = queue.Queue() self.responses_queue = asyncio.Queue() self.stream_thread = None self.stop_event = threading.Event() async def start_streaming(self, config: dict) -> None: """Initialize streaming session""" try: log_info(f"🎤 Starting Google STT streaming with config: {config}") # Convert dict to STTConfig if needed if isinstance(config, dict): stt_config = STTConfig( language=config.get("language", "tr-TR"), sample_rate=config.get("sample_rate", 16000), encoding=config.get("encoding", "WEBM_OPUS"), enable_punctuation=config.get("enable_punctuation", True), interim_results=config.get("interim_results", True), single_utterance=config.get("single_utterance", False) ) else: stt_config = config recognition_config = speech.RecognitionConfig( encoding=self._get_encoding(stt_config.encoding), sample_rate_hertz=stt_config.sample_rate, language_code=stt_config.language, enable_automatic_punctuation=stt_config.enable_punctuation, model="latest_long", use_enhanced=True ) self.streaming_config = speech.StreamingRecognitionConfig( config=recognition_config, interim_results=stt_config.interim_results, single_utterance=stt_config.single_utterance ) self.is_streaming = True self.stop_event.clear() # Start streaming thread self.stream_thread = threading.Thread(target=self._run_stream) self.stream_thread.start() log_info("✅ Google STT streaming started successfully") except Exception as e: log_error(f"❌ Failed to start Google STT streaming", error=str(e)) self.is_streaming = False raise def _run_stream(self): """Run the streaming recognition in a separate thread""" try: log_info("🎤 Google STT stream thread started") # Create a new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Task listesi tut pending_tasks = [] def request_generator(): """Generate streaming requests""" chunk_count = 0 while not self.stop_event.is_set(): try: # Get audio chunk with timeout chunk = self.audio_queue.get(timeout=0.1) if chunk is None: # Poison pill break chunk_count += 1 log_debug(f"📤 Sending chunk {chunk_count} to Google STT, size: {len(chunk)} bytes") yield speech.StreamingRecognizeRequest(audio_content=chunk) except queue.Empty: continue # Create streaming client requests = request_generator() log_info("🎤 Creating Google STT streaming client...") responses = self.client.streaming_recognize(self.streaming_config, requests) log_info("✅ Google STT streaming client created") # Process responses response_count = 0 for response in responses: response_count += 1 log_debug(f"📥 Google STT response {response_count} received") if self.stop_event.is_set(): break for result in response.results: if result.alternatives: # Create transcription result transcription = TranscriptionResult( text=result.alternatives[0].transcript, is_final=result.is_final, confidence=result.alternatives[0].confidence if result.alternatives[0].confidence else 0.0, timestamp=datetime.now().timestamp() ) # Put result in async queue - create task and add to list task = loop.create_task(self._put_result(transcription)) pending_tasks.append(task) log_debug(f"📝 STT result: {result.alternatives[0].transcript}, final: {result.is_final}") # ÖNEMLI: Final result'u loglayalım if result.is_final: log_info(f"🎯 FINAL STT RESULT: {result.alternatives[0].transcript}") # Wait for all pending tasks before closing loop if pending_tasks: loop.run_until_complete(asyncio.gather(*pending_tasks, return_exceptions=True)) except Exception as e: error_msg = str(e) log_error(f"❌ Google STT stream error", error=error_msg, traceback=traceback.format_exc()) # Send error to responses queue if "Bad language code" in error_msg: log_error(f"❌ Invalid language code in STT config. Check locale settings.") elif "invalid_argument" in error_msg: log_error(f"❌ Invalid STT configuration. Check encoding and sample rate.") elif "Deadline Exceeded" in error_msg: log_error(f"❌ Google STT timeout - possibly network issue or slow connection") finally: # Wait for remaining tasks if pending_tasks: loop.run_until_complete(asyncio.gather(*pending_tasks, return_exceptions=True)) log_info("🎤 Google STT stream thread ended") loop.close() async def _put_result(self, result: TranscriptionResult): """Helper to put result in queue""" await self.responses_queue.put(result) async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: """Stream audio chunk and get transcription results""" if not self.is_streaming: raise RuntimeError("Streaming not started. Call start_streaming() first.") try: # Put audio in queue for streaming thread self.audio_queue.put(audio_chunk) # Check for any results (non-blocking) try: while True: result = self.responses_queue.get_nowait() yield result except asyncio.QueueEmpty: pass except Exception as e: log_error(f"❌ Google STT streaming error", error=str(e)) raise async def stop_streaming(self) -> Optional[TranscriptionResult]: """Stop streaming and get final result""" if not self.is_streaming: return None try: log_info("🛑 Stopping Google STT streaming...") self.is_streaming = False self.stop_event.set() # Send poison pill to queue self.audio_queue.put(None) # Wait for thread to finish if self.stream_thread: self.stream_thread.join(timeout=5.0) # Clear queues while not self.audio_queue.empty(): self.audio_queue.get_nowait() final_result = None while not self.responses_queue.empty(): result = await self.responses_queue.get() if result.is_final: final_result = result log_info("✅ Google STT streaming stopped") return final_result except Exception as e: log_error(f"❌ Failed to stop Google STT streaming", error=str(e)) return None def supports_realtime(self) -> bool: """Google Cloud STT supports real-time streaming""" return True def get_supported_languages(self) -> List[str]: """Get list of supported language codes""" return [ "tr-TR", # Turkish "en-US", # English (US) "en-GB", # English (UK) "de-DE", # German "fr-FR", # French "es-ES", # Spanish "it-IT", # Italian "pt-BR", # Portuguese (Brazil) "ru-RU", # Russian "ja-JP", # Japanese "ko-KR", # Korean "zh-CN", # Chinese (Simplified) "ar-SA", # Arabic ] def get_provider_name(self) -> str: """Get provider name""" return "google" def _get_encoding(self, encoding_str: str): """Convert encoding string to Google Speech enum""" if not GOOGLE_SPEECH_AVAILABLE: return None encoding_map = { "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS, "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16, "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC, "MP3": speech.RecognitionConfig.AudioEncoding.MP3, "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS, } return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS)