""" Google Cloud Speech-to-Text Implementation """ import os import asyncio from typing import AsyncIterator, Optional, List from datetime import datetime import sys from logger import log_info, log_error, log_debug, log_warning # Import Google Cloud Speech only if available try: from google.cloud import speech_v1p1beta1 as speech from google.api_core import exceptions GOOGLE_SPEECH_AVAILABLE = True except ImportError: GOOGLE_SPEECH_AVAILABLE = False log_info("⚠️ Google Cloud Speech library not installed") from stt_interface import STTInterface, STTConfig, TranscriptionResult class GoogleCloudSTT(STTInterface): """Google Cloud Speech-to-Text implementation""" def __init__(self, credentials_path: str): if not GOOGLE_SPEECH_AVAILABLE: raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech") if credentials_path and os.path.exists(credentials_path): os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path log_info(f"✅ Google credentials set from: {credentials_path}") else: log_info("⚠️ Google credentials path not found, using default credentials") self.client = speech.SpeechAsyncClient() self.streaming_config = None self.is_streaming = False self.audio_queue = asyncio.Queue() async def start_streaming(self, config: STTConfig) -> None: """Initialize streaming session""" try: # Config'in dict mi STTConfig objesi mi olduğunu kontrol et if isinstance(config, dict): # Dict ise STTConfig objesine çevir stt_config = STTConfig( language=config.get("language", "tr-TR"), sample_rate=config.get("sample_rate", 16000), encoding=config.get("encoding", "WEBM_OPUS"), enable_punctuation=config.get("enable_punctuation", True), enable_word_timestamps=config.get("enable_word_timestamps", False), model=config.get("model", "latest_long"), use_enhanced=config.get("use_enhanced", True), interim_results=config.get("interim_results", True), single_utterance=config.get("single_utterance", False) ) else: stt_config = config recognition_config = speech.RecognitionConfig( encoding=self._get_encoding(stt_config.encoding), sample_rate_hertz=stt_config.sample_rate, language_code=stt_config.language, enable_automatic_punctuation=stt_config.enable_punctuation, enable_word_time_offsets=stt_config.enable_word_timestamps, model=stt_config.model, use_enhanced=stt_config.use_enhanced ) self.streaming_config = speech.StreamingRecognitionConfig( config=recognition_config, interim_results=stt_config.interim_results, single_utterance=stt_config.single_utterance ) self.is_streaming = True log_info("✅ Google STT streaming started") except Exception as e: log_error("❌ Failed to start Google STT streaming", e) raise async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: """Stream audio chunk and get transcription results""" if not self.is_streaming: raise RuntimeError("Streaming not started. Call start_streaming() first.") try: # Add audio to queue await self.audio_queue.put(audio_chunk) # Process with Google STT request = speech.StreamingRecognizeRequest(audio_content=audio_chunk) # This is a simplified version - actual implementation would need # proper streaming handling with Google's API # For now, return empty iterator return yield # Make it a generator except Exception as e: log_error("❌ Google STT streaming error", e) raise async def stop_streaming(self) -> Optional[TranscriptionResult]: """Stop streaming and get final result""" if not self.is_streaming: return None try: self.is_streaming = False log_info("✅ Google STT streaming stopped") # Return final result if any return None except Exception as e: log_error("❌ Failed to stop Google STT streaming", e) raise def supports_realtime(self) -> bool: """Google Cloud STT supports real-time streaming""" return True def get_supported_languages(self) -> List[str]: """Get list of supported language codes""" return [ "tr-TR", # Turkish "en-US", # English (US) "en-GB", # English (UK) "de-DE", # German "fr-FR", # French "es-ES", # Spanish "it-IT", # Italian "pt-BR", # Portuguese (Brazil) "ru-RU", # Russian "ja-JP", # Japanese "ko-KR", # Korean "zh-CN", # Chinese (Simplified) "ar-SA", # Arabic ] def get_provider_name(self) -> str: """Get provider name""" return "google" def _get_encoding(self, encoding_str: str): """Convert encoding string to Google Speech enum""" if not GOOGLE_SPEECH_AVAILABLE: return None encoding_map = { "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS, "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16, "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC, "MP3": speech.RecognitionConfig.AudioEncoding.MP3, "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS, } return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS)