""" Google Cloud Speech-to-Text Implementation """ import os import asyncio from typing import AsyncIterator, AsyncGenerator, Optional, List, Any import numpy as np # Audio level check için from datetime import datetime import sys import queue import threading import time import traceback from utils.logger import log_info, log_error, log_debug, log_warning # Import Google Cloud Speech only if available try: from google.cloud import speech from google.api_core import exceptions GOOGLE_SPEECH_AVAILABLE = True except ImportError: GOOGLE_SPEECH_AVAILABLE = False log_info("⚠️ Google Cloud Speech library not installed") from .stt_interface import STTInterface, STTConfig, TranscriptionResult class GoogleCloudSTT(STTInterface): """Google Cloud Speech-to-Text implementation""" def __init__(self, credentials_path: str): if not GOOGLE_SPEECH_AVAILABLE: raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech") if credentials_path and os.path.exists(credentials_path): os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path log_info(f"✅ Google credentials set from: {credentials_path}") # Test credential'ları try: # Client'ı burada oluşturma, her seferinde yeni instance oluştur test_client = speech.SpeechClient() log_info("🔐 Testing Google credentials...") log_info("✅ Google credentials valid") # Test client'ı kapat if hasattr(test_client, 'transport') and hasattr(test_client.transport, 'close'): test_client.transport.close() except Exception as e: log_error(f"❌ Google credentials error", error=str(e)) raise else: log_error(f"❌ Google credentials path not found: {credentials_path}") raise FileNotFoundError(f"Credentials file not found: {credentials_path}") # Client'ı burada oluşturma, start_streaming'de oluştur self.client = None self.streaming_config = None self.is_streaming = False self.audio_queue = None # Queue'ları None olarak başlat self.responses_queue = None self.stream_thread = None self.stop_event = threading.Event() self.credentials_path = credentials_path # Session tracking self.session_id = 0 self.total_audio_bytes = 0 self.total_chunks = 0 def _get_encoding(self, encoding_str: str): """Convert encoding string to Google Speech enum""" if not GOOGLE_SPEECH_AVAILABLE: return None encoding_map = { "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS, "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16, "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC, "MP3": speech.RecognitionConfig.AudioEncoding.MP3, "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS, } return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS) async def stream_audio(self, audio_chunk: bytes) -> AsyncGenerator[TranscriptionResult, None]: """Stream audio chunk and get results""" if not self.is_streaming: raise Exception("Streaming not started") try: # Audio validation and logging chunk_size = len(audio_chunk) # Log first chunk details if self.chunk_count == 0: log_info(f"📤 First chunk - size: {chunk_size} bytes") # Check for WEBM header if audio_chunk.startswith(b'\x1a\x45\xdf\xa3'): log_info("✅ Valid WEBM header detected") else: hex_preview = audio_chunk[:20].hex() log_warning(f"⚠️ Unexpected audio format. First 20 bytes: {hex_preview}") # Try to measure audio level (if it's raw PCM) try: import numpy as np # This might fail for WEBM, but let's try audio_array = np.frombuffer(audio_chunk[-1000:], dtype=np.int16) # Last 1000 bytes if len(audio_array) > 0: rms = np.sqrt(np.mean(audio_array.astype(float) ** 2)) db = 20 * np.log10(max(rms, 1) / 32768.0) if self.chunk_count % 50 == 0: log_info(f"🔊 Audio level estimate: {db:.1f} dB") except: # Expected for WEBM format pass # Put chunk in queue self.audio_queue.put(audio_chunk) self.chunk_count += 1 self.total_bytes += chunk_size # Log progress if self.chunk_count % 50 == 0: log_info(f"📤 Progress: {self.chunk_count} chunks, {self.total_bytes/1024:.1f}KB total") # Check for responses with timeout timeout = 0.1 # 100ms timeout for checking responses end_time = time.time() + timeout while time.time() < end_time: try: result = self.responses_queue.get_nowait() log_info(f"🎯 Got result from queue: is_final={result.is_final}, text='{result.text[:30]}...'") yield result except queue.Empty: # No results yet, continue await asyncio.sleep(0.01) except Exception as e: log_error(f"Error getting result from queue: {e}") break except Exception as e: log_error(f"❌ Error in stream_audio: {e}") raise async def stop_streaming(self) -> Optional[TranscriptionResult]: """Stop streaming and clean up all resources""" if not self.is_streaming and not self.stream_thread: log_debug("Already stopped, nothing to do") return None try: log_info(f"🛑 Stopping Google STT streaming session #{self.session_id}") # Flag'i hemen kapat self.is_streaming = False self.stop_event.set() # Send poison pill to stop request generator if self.audio_queue: try: self.audio_queue.put(None) except: pass # Thread'i durdur if self.stream_thread and self.stream_thread.is_alive(): log_info("⏳ Waiting for stream thread to finish...") self.stream_thread.join(timeout=5.0) if self.stream_thread.is_alive(): log_warning("⚠️ STT thread did not stop gracefully after 5s") else: log_info("✅ Stream thread finished") # Final result'ı al - ✅ BURADA DÜZELTME final_result = None if self.responses_queue: while not self.responses_queue.empty(): try: result = self.responses_queue.get_nowait() # ✅ await değil, get_nowait() if result.is_final: final_result = result except queue.Empty: # ✅ queue.Empty kullan break # Client'ı kapat if self.client: try: if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'): self.client.transport.close() log_debug("✅ Client transport closed") if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'): self.client._transport._grpc_channel.close() log_debug("✅ gRPC channel closed") except Exception as e: log_warning(f"⚠️ Error closing Google client: {e}") finally: self.client = None # Queue'ları None yap self.audio_queue = None self.responses_queue = None # Diğer değişkenleri resetle self.stream_thread = None self.streaming_config = None self.stop_event.clear() log_info(f"✅ Google STT streaming session #{self.session_id} stopped and cleaned") return final_result except Exception as e: log_error(f"❌ Error during stop_streaming", error=str(e)) # Force cleanup on error self.is_streaming = False self.stream_thread = None self.client = None self.streaming_config = None self.stop_event.clear() self.audio_queue = None self.responses_queue = None return None def supports_realtime(self) -> bool: """Google Cloud STT supports real-time streaming""" return True def get_supported_languages(self) -> List[str]: """Get list of supported language codes""" return [ "tr-TR", # Turkish "en-US", # English (US) "en-GB", # English (UK) "de-DE", # German "fr-FR", # French "es-ES", # Spanish "it-IT", # Italian "pt-BR", # Portuguese (Brazil) "ru-RU", # Russian "ja-JP", # Japanese "ko-KR", # Korean "zh-CN", # Chinese (Simplified) "ar-SA", # Arabic ] def get_provider_name(self) -> str: """Get provider name""" return "google" def _reset_session_data(self): """Reset all session-specific data""" # Queue'ları temizle if self.audio_queue: while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() except: pass if self.responses_queue: while not self.responses_queue.empty(): try: self.responses_queue.get_nowait() except: pass # Counters'ı sıfırla self.total_audio_bytes = 0 self.total_chunks = 0 # Yeni session ID self.session_id += 1 log_info(f"🔄 Google STT session data reset. New session ID: {self.session_id}") def _create_fresh_queues(self): """Create fresh queue instances""" # Eski queue'ları temizle if self.audio_queue: while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() except: pass if self.responses_queue: while not self.responses_queue.empty(): try: self.responses_queue.get_nowait() except: pass # Yeni queue'lar oluştur self.audio_queue = queue.Queue(maxsize=1000) # Max size ekle self.responses_queue = queue.Queue(maxsize=100) log_debug("✅ Created fresh queues") async def start_streaming(self, config: dict) -> None: """Initialize streaming session with clean state""" try: # Önce mevcut stream'i temizle if self.is_streaming or self.stream_thread: log_warning("⚠️ Previous stream still active, stopping it first") await self.stop_streaming() # Temizlik için bekle await asyncio.sleep(0.5) # Session verilerini resetle ve ID'yi artır self._reset_session_data() log_info(f"🎤 Starting Google STT streaming session #{self.session_id} with config: {config}") # Fresh queue'lar oluştur self._create_fresh_queues() # Stop event'i temizle self.stop_event.clear() # Yeni client oluştur (TEK SEFER) self.client = speech.SpeechClient() log_info("✅ Created new Google Speech client") # Convert dict to STTConfig if needed if isinstance(config, dict): stt_config = STTConfig( language=config.get("language", "tr-TR"), sample_rate=config.get("sample_rate", 16000), encoding=config.get("encoding", "WEBM_OPUS"), enable_punctuation=config.get("enable_punctuation", True), interim_results=config.get("interim_results", False), single_utterance=config.get("single_utterance", True) ) else: stt_config = config recognition_config = speech.RecognitionConfig( encoding=self._get_encoding(stt_config.encoding), sample_rate_hertz=stt_config.sample_rate, language_code=stt_config.language, enable_automatic_punctuation=stt_config.enable_punctuation, model="latest_long", use_enhanced=True, # Bu parametreleri kaldırıyoruz - v1 API'de yok # enable_voice_activity_events=True, # audio_channel_count=1 ) self.streaming_config = speech.StreamingRecognitionConfig( config=recognition_config, interim_results=stt_config.interim_results, single_utterance=stt_config.single_utterance # enable_voice_activity_events kaldırıldı ) self.is_streaming = True # Start streaming thread with unique name self.stream_thread = threading.Thread( target=self._run_stream, name=f"GoogleSTT-Session-{self.session_id}" ) self.stream_thread.daemon = True # Daemon thread olarak işaretle self.stream_thread.start() log_info(f"✅ Google STT streaming session #{self.session_id} started successfully") except Exception as e: log_error(f"❌ Failed to start Google STT streaming", error=str(e)) self.is_streaming = False self.client = None self._create_fresh_queues() # Hata durumunda da queue'ları temizle raise def _put_result(self, result: TranscriptionResult): """Helper to put result in queue""" try: self.responses_queue.put(result) # Debug log'u kaldırdık except Exception as e: log_error(f"❌ Error queuing result: {e}") def _run_stream(self): """Run the streaming recognition loop in a separate thread""" try: log_info("🎤 Google STT stream thread started - Single utterance mode: {}".format(self.single_utterance)) # Create request generator requests = self._request_generator() # Create streaming client log_info("🎤 Creating Google STT streaming client...") # Set a timeout for the streaming call import grpc timeout = 300 # 5 minutes max for the stream # Create streaming client with timeout responses = self.client.streaming_recognize( self.streaming_config, requests, timeout=timeout ) # Set initial response timeout initial_response_timeout = 30 # 30 seconds to get first response stream_start = time.time() got_first_response = False # Track if we've received any response first_response_time = None response_count = 0 # Process responses with detailed logging for response in responses: if not got_first_response: got_first_response = True elapsed = time.time() - stream_start log_info(f"✅ Got first response from Google after {elapsed:.2f}s") response_count += 1 if first_response_time is None: first_response_time = time.time() elapsed = first_response_time - self.stream_start_time log_info(f"🎉 FIRST RESPONSE from Google STT after {elapsed:.2f}s") # Log every response, even if empty log_info(f"📨 Google STT Response #{response_count}: has_results={len(response.results) > 0}") if not response.results: log_info("📭 Empty response from Google STT (no results)") continue # Log all results in detail for result_idx, result in enumerate(response.results): log_info(f"📝 Result #{result_idx}: is_final={result.is_final}, " f"alternatives={len(result.alternatives)}, " f"stability={getattr(result, 'stability', 'N/A')}") if result.alternatives: best_alternative = result.alternatives[0] log_info(f"🗣️ Transcript: '{best_alternative.transcript}' " f"(confidence: {best_alternative.confidence:.3f})") # Put result in queue result_obj = TranscriptionResult( text=best_alternative.transcript, is_final=result.is_final, confidence=best_alternative.confidence, timestamp=datetime.utcnow() ) self.responses_queue.put(result_obj) log_info(f"✅ Result queued: is_final={result.is_final}, text='{best_alternative.transcript[:50]}...'") # Log if we exit without any responses if response_count == 0: log_error("❌ Google STT stream ended without ANY responses!") else: log_info(f"✅ Google STT stream ended normally after {response_count} responses") except Exception as e: log_error(f"❌ Google STT error: {e}") if hasattr(e, 'details'): log_error(f"Error details: {e.details}") self.error_message = str(e) finally: log_info("🎤 Google STT stream thread ended") with self.lock: self.is_streaming = False