Spaces:
Paused
Paused
| """ | |
| Google Cloud Speech-to-Text Implementation | |
| """ | |
| import os | |
| import asyncio | |
| from typing import AsyncIterator, Optional, List, Any | |
| from datetime import datetime | |
| import sys | |
| import queue | |
| import threading | |
| import time | |
| import traceback | |
| from logger import log_info, log_error, log_debug, log_warning | |
| # Import Google Cloud Speech only if available | |
| try: | |
| from google.cloud import speech | |
| from google.api_core import exceptions | |
| GOOGLE_SPEECH_AVAILABLE = True | |
| except ImportError: | |
| GOOGLE_SPEECH_AVAILABLE = False | |
| log_info("⚠️ Google Cloud Speech library not installed") | |
| from stt_interface import STTInterface, STTConfig, TranscriptionResult | |
| class GoogleCloudSTT(STTInterface): | |
| """Google Cloud Speech-to-Text implementation""" | |
| def __init__(self, credentials_path: str): | |
| if not GOOGLE_SPEECH_AVAILABLE: | |
| raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech") | |
| if credentials_path and os.path.exists(credentials_path): | |
| os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path | |
| log_info(f"✅ Google credentials set from: {credentials_path}") | |
| # Test credential'ları | |
| try: | |
| # Client'ı burada oluşturma, her seferinde yeni instance oluştur | |
| test_client = speech.SpeechClient() | |
| log_info("🔐 Testing Google credentials...") | |
| log_info("✅ Google credentials valid") | |
| # Test client'ı kapat | |
| if hasattr(test_client, 'transport') and hasattr(test_client.transport, 'close'): | |
| test_client.transport.close() | |
| except Exception as e: | |
| log_error(f"❌ Google credentials error", error=str(e)) | |
| raise | |
| else: | |
| log_error(f"❌ Google credentials path not found: {credentials_path}") | |
| raise FileNotFoundError(f"Credentials file not found: {credentials_path}") | |
| # Client'ı burada oluşturma, start_streaming'de oluştur | |
| self.client = None | |
| self.streaming_config = None | |
| self.is_streaming = False | |
| self.audio_queue = None # Queue'ları None olarak başlat | |
| self.responses_queue = None | |
| self.stream_thread = None | |
| self.stop_event = threading.Event() | |
| self.credentials_path = credentials_path | |
| # Session tracking | |
| self.session_id = 0 | |
| self.total_audio_bytes = 0 | |
| self.total_chunks = 0 | |
| def _reset_session_data(self): | |
| """Reset all session-specific data""" | |
| # Queue'ları temizle | |
| if self.audio_queue: | |
| while not self.audio_queue.empty(): | |
| try: | |
| self.audio_queue.get_nowait() | |
| except: | |
| pass | |
| if self.responses_queue: | |
| while not self.responses_queue.empty(): | |
| try: | |
| self.responses_queue.get_nowait() | |
| except: | |
| pass | |
| # Counters'ı sıfırla | |
| self.total_audio_bytes = 0 | |
| self.total_chunks = 0 | |
| # Yeni session ID | |
| self.session_id += 1 | |
| log_info(f"🔄 Google STT session data reset. New session ID: {self.session_id}") | |
| def _create_fresh_queues(self): | |
| """Create fresh queue instances""" | |
| # Eski queue'ları temizle | |
| if self.audio_queue: | |
| while not self.audio_queue.empty(): | |
| try: | |
| self.audio_queue.get_nowait() | |
| except: | |
| pass | |
| if self.responses_queue: | |
| while not self.responses_queue.empty(): | |
| try: | |
| self.responses_queue.get_nowait() | |
| except: | |
| pass | |
| # Yeni queue'lar oluştur | |
| self.audio_queue = queue.Queue(maxsize=1000) # Max size ekle | |
| self.responses_queue = queue.Queue(maxsize=100) | |
| log_debug("✅ Created fresh queues") | |
| async def start_streaming(self, config: dict) -> None: | |
| """Initialize streaming session with clean state""" | |
| try: | |
| # Önce mevcut stream'i temizle | |
| if self.is_streaming or self.stream_thread: | |
| log_warning("⚠️ Previous stream still active, stopping it first") | |
| await self.stop_streaming() | |
| # Temizlik için bekle | |
| await asyncio.sleep(0.5) | |
| # Session verilerini resetle ve ID'yi artır | |
| self._reset_session_data() | |
| log_info(f"🎤 Starting Google STT streaming session #{self.session_id} with config: {config}") | |
| # Fresh queue'lar oluştur | |
| self._create_fresh_queues() | |
| # Stop event'i temizle | |
| self.stop_event.clear() | |
| # Yeni client oluştur (TEK SEFER) | |
| self.client = speech.SpeechClient() | |
| log_info("✅ Created new Google Speech client") | |
| # Convert dict to STTConfig if needed | |
| if isinstance(config, dict): | |
| stt_config = STTConfig( | |
| language=config.get("language", "tr-TR"), | |
| sample_rate=config.get("sample_rate", 16000), | |
| encoding=config.get("encoding", "WEBM_OPUS"), | |
| enable_punctuation=config.get("enable_punctuation", True), | |
| interim_results=config.get("interim_results", True), | |
| single_utterance=config.get("single_utterance", False) | |
| ) | |
| else: | |
| stt_config = config | |
| recognition_config = speech.RecognitionConfig( | |
| encoding=self._get_encoding(stt_config.encoding), | |
| sample_rate_hertz=stt_config.sample_rate, | |
| language_code=stt_config.language, | |
| enable_automatic_punctuation=stt_config.enable_punctuation, | |
| model="latest_long", | |
| use_enhanced=True | |
| ) | |
| self.streaming_config = speech.StreamingRecognitionConfig( | |
| config=recognition_config, | |
| interim_results=stt_config.interim_results, | |
| single_utterance=stt_config.single_utterance | |
| ) | |
| self.is_streaming = True | |
| # Start streaming thread with unique name | |
| self.stream_thread = threading.Thread( | |
| target=self._run_stream, | |
| name=f"GoogleSTT-Session-{self.session_id}" | |
| ) | |
| self.stream_thread.daemon = True # Daemon thread olarak işaretle | |
| self.stream_thread.start() | |
| log_info(f"✅ Google STT streaming session #{self.session_id} started successfully") | |
| except Exception as e: | |
| log_error(f"❌ Failed to start Google STT streaming", error=str(e)) | |
| self.is_streaming = False | |
| self.client = None | |
| self._create_fresh_queues() # Hata durumunda da queue'ları temizle | |
| raise | |
| def _put_result(self, result: TranscriptionResult): | |
| """Helper to put result in queue""" | |
| try: | |
| self.responses_queue.put(result) | |
| # Debug log'u kaldırdık | |
| except Exception as e: | |
| log_error(f"❌ Error queuing result: {e}") | |
| def _run_stream(self): | |
| """Run the streaming recognition in a separate thread""" | |
| try: | |
| log_info("🎤 Google STT stream thread started") | |
| def request_generator(): | |
| """Generate streaming requests""" | |
| chunk_count = 0 | |
| total_bytes = 0 | |
| while not self.stop_event.is_set(): | |
| try: | |
| chunk = self.audio_queue.get(timeout=0.1) | |
| if chunk is None: | |
| log_info("📛 Poison pill received, stopping request generator") | |
| break | |
| chunk_count += 1 | |
| total_bytes += len(chunk) | |
| # İlk chunk log'u | |
| if chunk_count == 1: | |
| log_info(f"📤 First audio chunk to Google STT - size: {len(chunk)} bytes") | |
| # Her 100 chunk'ta durum raporu | |
| if chunk_count % 100 == 0: | |
| avg_chunk_size = total_bytes / chunk_count | |
| log_info(f"📤 Progress: {chunk_count} chunks, {total_bytes/1024:.1f}KB total, avg {avg_chunk_size:.0f} bytes/chunk") | |
| yield speech.StreamingRecognizeRequest(audio_content=chunk) | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| log_error(f"❌ Error in request generator: {e}") | |
| break | |
| # Create streaming client | |
| requests = request_generator() | |
| log_info("🎤 Creating Google STT streaming client...") | |
| try: | |
| responses = self.client.streaming_recognize( | |
| self.streaming_config, | |
| requests, | |
| timeout=300 # 5 dakika timeout | |
| ) | |
| log_info("✅ Google STT streaming client created") | |
| # Response timeout kontrolü | |
| last_response_time = time.time() | |
| RESPONSE_TIMEOUT = 30 # 30 saniye içinde response gelmezse | |
| # Process responses | |
| response_count = 0 | |
| empty_response_count = 0 | |
| for response in responses: | |
| last_response_time = time.time() | |
| response_count += 1 | |
| # Response type'ı logla | |
| if response_count == 1: | |
| log_info(f"📨 First response received from Google STT") | |
| if self.stop_event.is_set(): | |
| log_info("🛑 Stop event detected, breaking response loop") | |
| break | |
| # Response içeriğini kontrol et | |
| if not response.results: | |
| empty_response_count += 1 | |
| if empty_response_count % 10 == 0: | |
| log_debug(f"Empty responses received: {empty_response_count}") | |
| continue | |
| # Her result'ı işle | |
| for result_index, result in enumerate(response.results): | |
| if result.alternatives: | |
| alternative = result.alternatives[0] | |
| # Sonucu logla (sadece debug modda interim) | |
| if result.is_final or self.debug_mode: | |
| log_debug(f"📋 Result {result_index}: is_final={result.is_final}, alternatives={len(result.alternatives)}") | |
| # TranscriptionResult oluştur ve queue'ya ekle | |
| transcription_result = TranscriptionResult( | |
| text=alternative.transcript, | |
| is_final=result.is_final, | |
| confidence=alternative.confidence if hasattr(alternative, 'confidence') else 0.0 | |
| ) | |
| # Final result'ları her zaman logla | |
| if result.is_final: | |
| log_info(f"🎯 GOOGLE STT FINAL: '{transcription_result.text}'") | |
| # Queue'ya ekle | |
| self._put_result(transcription_result) | |
| # Eğer final result ise stream'i durdur | |
| if result.is_final and self.single_utterance: | |
| log_info("🏁 Final result received with single_utterance=True, stopping stream") | |
| self.stop_event.set() | |
| break | |
| log_info(f"📊 Google STT stream ended. Total responses: {response_count}, Empty: {empty_response_count}") | |
| except grpc.RpcError as e: | |
| error_details = e.details() if hasattr(e, 'details') else str(e) | |
| error_code = e.code() if hasattr(e, 'code') else None | |
| error_msg = f"gRPC error - Code: {error_code}, Details: {error_details}" | |
| # Spesifik hata durumlarını kontrol et | |
| if "Audio Timeout Error" in error_details or "stream duration" in error_details: | |
| log_info(f"⏱️ Google STT stream timeout after long duration. This is expected for long sessions.") | |
| elif "Bad language code" in error_msg: | |
| log_error(f"❌ Invalid language code in STT config. Check locale settings.") | |
| elif "invalid_argument" in error_msg: | |
| log_error(f"❌ Invalid STT configuration. Check encoding and sample rate.") | |
| elif "Deadline Exceeded" in error_msg: | |
| log_error(f"❌ Google STT response timeout - possibly network issue or slow connection") | |
| elif "503" in error_msg or "Service Unavailable" in error_msg: | |
| log_error(f"❌ Google STT service temporarily unavailable. Will retry...") | |
| else: | |
| log_error(f"❌ Google STT stream error: {error_msg}") | |
| except Exception as e: | |
| log_error(f"❌ Fatal error in STT stream thread", error=str(e), traceback=traceback.format_exc()) | |
| finally: | |
| log_info("🎤 Google STT stream thread ended") | |
| # Thread bittiğinde streaming flag'ini kapat | |
| self.is_streaming = False | |
| async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: | |
| """Stream audio chunk and get transcription results""" | |
| if not self.is_streaming: | |
| # Daha detaylı hata mesajı | |
| log_error(f"❌ STT not streaming - is_streaming: {self.is_streaming}, thread alive: {self.stream_thread and self.stream_thread.is_alive() if hasattr(self, 'stream_thread') else 'No thread'}") | |
| raise RuntimeError("Streaming not started. Call start_streaming() first.") | |
| try: | |
| # Put audio in queue for streaming thread | |
| self.audio_queue.put(audio_chunk) | |
| # Check for any results in queue | |
| while True: | |
| try: | |
| # Non-blocking get from normal queue | |
| result = self.responses_queue.get_nowait() | |
| yield result | |
| except queue.Empty: | |
| # No more results in queue | |
| break | |
| except Exception as e: | |
| log_error(f"❌ Google STT streaming error", error=str(e)) | |
| # Stream'i tekrar başlatmayı tetikle | |
| self.is_streaming = False | |
| raise | |
| async def stop_streaming(self) -> Optional[TranscriptionResult]: | |
| """Stop streaming and clean up all resources""" | |
| if not self.is_streaming and not self.stream_thread: | |
| log_debug("Already stopped, nothing to do") | |
| return None | |
| try: | |
| log_info(f"🛑 Stopping Google STT streaming session #{self.session_id}") | |
| # Flag'i hemen kapat | |
| self.is_streaming = False | |
| self.stop_event.set() | |
| # Send poison pill to stop request generator | |
| if self.audio_queue: | |
| try: | |
| self.audio_queue.put(None) | |
| except: | |
| pass | |
| # Thread'i durdur | |
| if self.stream_thread and self.stream_thread.is_alive(): | |
| log_info("⏳ Waiting for stream thread to finish...") | |
| self.stream_thread.join(timeout=5.0) # 5 saniye bekle | |
| if self.stream_thread.is_alive(): | |
| log_warning("⚠️ STT thread did not stop gracefully after 5s") | |
| # Thread'i zorla sonlandıramayız Python'da, ama daemon olduğu için | |
| # ana program kapanınca otomatik kapanacak | |
| else: | |
| log_info("✅ Stream thread finished") | |
| # Final result'ı al | |
| final_result = None | |
| if self.responses_queue: | |
| while not self.responses_queue.empty(): | |
| try: | |
| result = self.responses_queue.get_nowait() | |
| if result.is_final: | |
| final_result = result | |
| except: | |
| pass | |
| # Client'ı kapat | |
| if self.client: | |
| try: | |
| # Transport'u kapat | |
| if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'): | |
| self.client.transport.close() | |
| log_debug("✅ Client transport closed") | |
| # gRPC channel'ı kapat | |
| if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'): | |
| self.client._transport._grpc_channel.close() | |
| log_debug("✅ gRPC channel closed") | |
| except Exception as e: | |
| log_warning(f"⚠️ Error closing Google client: {e}") | |
| finally: | |
| self.client = None | |
| # Queue'ları None yap (yeniden kullanım için fresh queue gerekecek) | |
| self.audio_queue = None | |
| self.responses_queue = None | |
| # Diğer değişkenleri resetle | |
| self.stream_thread = None | |
| self.streaming_config = None | |
| self.stop_event.clear() | |
| log_info(f"✅ Google STT streaming session #{self.session_id} stopped and cleaned") | |
| return final_result | |
| except Exception as e: | |
| log_error(f"❌ Error during stop_streaming", error=str(e)) | |
| # Force cleanup on error | |
| self.is_streaming = False | |
| self.stream_thread = None | |
| self.client = None | |
| self.streaming_config = None | |
| self.stop_event.clear() | |
| self.audio_queue = None | |
| self.responses_queue = None | |
| return None | |
| def supports_realtime(self) -> bool: | |
| """Google Cloud STT supports real-time streaming""" | |
| return True | |
| def get_supported_languages(self) -> List[str]: | |
| """Get list of supported language codes""" | |
| return [ | |
| "tr-TR", # Turkish | |
| "en-US", # English (US) | |
| "en-GB", # English (UK) | |
| "de-DE", # German | |
| "fr-FR", # French | |
| "es-ES", # Spanish | |
| "it-IT", # Italian | |
| "pt-BR", # Portuguese (Brazil) | |
| "ru-RU", # Russian | |
| "ja-JP", # Japanese | |
| "ko-KR", # Korean | |
| "zh-CN", # Chinese (Simplified) | |
| "ar-SA", # Arabic | |
| ] | |
| def get_provider_name(self) -> str: | |
| """Get provider name""" | |
| return "google" | |
| def _get_encoding(self, encoding_str: str): | |
| """Convert encoding string to Google Speech enum""" | |
| if not GOOGLE_SPEECH_AVAILABLE: | |
| return None | |
| encoding_map = { | |
| "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS, | |
| "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16, | |
| "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC, | |
| "MP3": speech.RecognitionConfig.AudioEncoding.MP3, | |
| "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS, | |
| "MULAW": speech.RecognitionConfig.AudioEncoding.MULAW, | |
| "AMR": speech.RecognitionConfig.AudioEncoding.AMR, | |
| "AMR_WB": speech.RecognitionConfig.AudioEncoding.AMR_WB, | |
| "ENCODING_UNSPECIFIED": speech.RecognitionConfig.AudioEncoding.ENCODING_UNSPECIFIED | |
| } | |
| # Default to LINEAR16 if not found | |
| return encoding_map.get(encoding_str.upper(), speech.RecognitionConfig.AudioEncoding.LINEAR16) |