""" Google Cloud Speech-to-Text Implementation - Simple Batch Mode """ from typing import Optional, List from datetime import datetime import io import wave from google.cloud import speech from google.cloud.speech import RecognitionConfig, RecognitionAudio from utils.logger import log_info, log_error, log_debug, log_warning from .stt_interface import STTInterface, STTConfig, TranscriptionResult class GoogleSTT(STTInterface): def __init__(self, credentials_path: Optional[str] = None): """ Initialize Google STT Args: credentials_path: Path to service account JSON file (optional if using default credentials) """ try: # Initialize client if credentials_path: self.client = speech.SpeechClient.from_service_account_file(credentials_path) log_info(f"✅ Google STT initialized with service account: {credentials_path}") else: # Use default credentials (ADC) self.client = speech.SpeechClient() log_info("✅ Google STT initialized with default credentials") except Exception as e: log_error(f"❌ Failed to initialize Google STT: {str(e)}") raise def _map_language_code(self, language: str) -> str: """Map language codes to Google format""" # Google uses BCP-47 language codes language_map = { "tr": "tr-TR", "tr-TR": "tr-TR", "en": "en-US", "en-US": "en-US", "en-GB": "en-GB", "de": "de-DE", "de-DE": "de-DE", "fr": "fr-FR", "fr-FR": "fr-FR", "es": "es-ES", "es-ES": "es-ES", "it": "it-IT", "it-IT": "it-IT", "pt": "pt-BR", "pt-BR": "pt-BR", "ru": "ru-RU", "ru-RU": "ru-RU", "ja": "ja-JP", "ja-JP": "ja-JP", "ko": "ko-KR", "ko-KR": "ko-KR", "zh": "zh-CN", "zh-CN": "zh-CN", "ar": "ar-SA", "ar-SA": "ar-SA", } # Default to the language itself if not in map return language_map.get(language, language) async def transcribe(self, audio_data: bytes, config: STTConfig) -> Optional[TranscriptionResult]: try: if not audio_data: log_warning("⚠️ No audio data provided") return None log_info(f"📊 Transcribing {len(audio_data)} bytes of audio") # ✅ Detaylı audio analizi - logda import struct samples = struct.unpack(f'{len(audio_data)//2}h', audio_data) total_samples = len(samples) # 1. Genel istatistikler non_zero_samples = [s for s in samples if s != 0] zero_count = total_samples - len(non_zero_samples) if non_zero_samples: avg_amplitude = sum(abs(s) for s in non_zero_samples) / len(non_zero_samples) max_amplitude = max(abs(s) for s in non_zero_samples) else: avg_amplitude = 0 max_amplitude = 0 log_info(f"🔍 Audio stats: {total_samples} total samples, {zero_count} zeros ({zero_count/total_samples:.1%})") log_info(f"🔍 Non-zero stats: avg={avg_amplitude:.1f}, max={max_amplitude}") # 2. Bölüm bazlı analiz (10 bölüme ayır) section_size = total_samples // 10 log_info(f"🔍 Section analysis (each {section_size} samples):") for i in range(10): start_idx = i * section_size end_idx = (i + 1) * section_size if i < 9 else total_samples section = samples[start_idx:end_idx] section_non_zero = [s for s in section if s != 0] section_max = max(abs(s) for s in section_non_zero) if section_non_zero else 0 section_avg = sum(abs(s) for s in section_non_zero) / len(section_non_zero) if section_non_zero else 0 zero_ratio = (len(section) - len(section_non_zero)) / len(section) log_info(f" Section {i+1}: max={section_max}, avg={section_avg:.1f}, zeros={zero_ratio:.1%}") # 3. İlk konuşma başlangıcını bul speech_threshold = 500 # RMS eşiği speech_start_idx = -1 # 100 sample'lık pencerelerle RMS hesapla window_size = 100 for i in range(0, total_samples - window_size, window_size): window = samples[i:i + window_size] rms = (sum(s * s for s in window) / window_size) ** 0.5 if rms > speech_threshold: speech_start_idx = i break if speech_start_idx >= 0: speech_start_time = speech_start_idx / config.sample_rate log_info(f"🎤 Speech detected starting at sample {speech_start_idx} ({speech_start_time:.2f}s)") else: log_warning("⚠️ No speech detected above threshold in entire audio") # 4. Audio'nun gerçekten boş olup olmadığını kontrol et if max_amplitude < 100: log_warning(f"⚠️ Audio appears silent: max_amplitude={max_amplitude}") return None if zero_count / total_samples > 0.95: # %95'den fazla sıfır log_warning(f"⚠️ Audio is mostly zeros: {zero_count/total_samples:.1%}") return None # Convert to WAV format wav_audio = self._convert_to_wav(audio_data, config.sample_rate) # Configure recognition recognition_config = RecognitionConfig( encoding=RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=16000, language_code="tr-TR", audio_channel_count=1, enable_separate_recognition_per_channel=False, enable_automatic_punctuation=True, ) # Create audio object audio = RecognitionAudio(content=wav_audio) # Perform synchronous recognition log_info(f"🔄 Sending audio to Google Cloud Speech API...") response = self.client.recognize(config=recognition_config, audio=audio) # ✅ Detaylı response analizi log_info(f"🔍 Google response details:") log_info(f" - Has results: {bool(response.results)}") log_info(f" - Results count: {len(response.results) if response.results else 0}") if hasattr(response, 'total_billed_time'): if response.total_billed_time and response.total_billed_time.total_seconds() > 0: log_info(f" - Billed time: {response.total_billed_time.total_seconds()}s") else: log_info(f" - Billed time: 0s (no audio processed)") # Process results if response.results and len(response.results) > 0: for i, result in enumerate(response.results): log_info(f" - Result {i}: {len(result.alternatives)} alternatives") if result.alternatives: for j, alt in enumerate(result.alternatives): log_info(f" - Alt {j}: '{alt.transcript}' (conf: {alt.confidence:.3f})") result = response.results[0] if result.alternatives and len(result.alternatives) > 0: alternative = result.alternatives[0] transcription = TranscriptionResult( text=alternative.transcript, confidence=alternative.confidence, timestamp=datetime.now().timestamp(), language="tr-TR", word_timestamps=None ) log_info(f"✅ Transcription SUCCESS: '{alternative.transcript}' (confidence: {alternative.confidence:.2f})") return transcription log_warning("⚠️ No transcription results - Google couldn't recognize speech") return None except Exception as e: log_error(f"❌ Error during transcription: {str(e)}") import traceback log_error(f"Traceback: {traceback.format_exc()}") return None def _convert_to_wav(self, audio_data: bytes, sample_rate: int) -> bytes: """Convert raw PCM audio to WAV format""" # Create WAV file in memory wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wav_file: # Set WAV parameters wav_file.setnchannels(1) # Mono wav_file.setsampwidth(2) # 16-bit wav_file.setframerate(sample_rate) wav_file.writeframes(audio_data) # Get WAV data wav_buffer.seek(0) return wav_buffer.read() def get_supported_languages(self) -> List[str]: """Get list of supported language codes""" # Google Cloud Speech-to-Text supported languages (partial list) return [ "tr-TR", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", "es-ES", "es-MX", "es-AR", "fr-FR", "fr-CA", "de-DE", "it-IT", "pt-BR", "pt-PT", "ru-RU", "ja-JP", "ko-KR", "zh-CN", "zh-TW", "ar-SA", "ar-EG", "hi-IN", "nl-NL", "pl-PL", "sv-SE", "da-DK", "no-NO", "fi-FI", "el-GR", "he-IL", "th-TH", "vi-VN", "id-ID", "ms-MY", "fil-PH" ] def get_provider_name(self) -> str: """Get provider name""" return "google"