Spaces:
Running
Running
""" | |
Google Cloud Speech-to-Text Implementation - Simple Batch Mode | |
""" | |
from typing import Optional, List | |
from datetime import datetime | |
import io | |
import wave | |
from google.cloud import speech | |
from google.cloud.speech import RecognitionConfig, RecognitionAudio | |
from utils.logger import log_info, log_error, log_debug, log_warning | |
from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
class GoogleSTT(STTInterface): | |
def __init__(self, credentials_path: Optional[str] = None): | |
""" | |
Initialize Google STT | |
Args: | |
credentials_path: Path to service account JSON file (optional if using default credentials) | |
""" | |
try: | |
# Initialize client | |
if credentials_path: | |
self.client = speech.SpeechClient.from_service_account_file(credentials_path) | |
log_info(f"✅ Google STT initialized with service account: {credentials_path}") | |
else: | |
# Use default credentials (ADC) | |
self.client = speech.SpeechClient() | |
log_info("✅ Google STT initialized with default credentials") | |
except Exception as e: | |
log_error(f"❌ Failed to initialize Google STT: {str(e)}") | |
raise | |
def _map_language_code(self, language: str) -> str: | |
"""Map language codes to Google format""" | |
# Google uses BCP-47 language codes | |
language_map = { | |
"tr": "tr-TR", | |
"tr-TR": "tr-TR", | |
"en": "en-US", | |
"en-US": "en-US", | |
"en-GB": "en-GB", | |
"de": "de-DE", | |
"de-DE": "de-DE", | |
"fr": "fr-FR", | |
"fr-FR": "fr-FR", | |
"es": "es-ES", | |
"es-ES": "es-ES", | |
"it": "it-IT", | |
"it-IT": "it-IT", | |
"pt": "pt-BR", | |
"pt-BR": "pt-BR", | |
"ru": "ru-RU", | |
"ru-RU": "ru-RU", | |
"ja": "ja-JP", | |
"ja-JP": "ja-JP", | |
"ko": "ko-KR", | |
"ko-KR": "ko-KR", | |
"zh": "zh-CN", | |
"zh-CN": "zh-CN", | |
"ar": "ar-SA", | |
"ar-SA": "ar-SA", | |
} | |
# Default to the language itself if not in map | |
return language_map.get(language, language) | |
async def transcribe(self, audio_data: bytes, config: STTConfig) -> Optional[TranscriptionResult]: | |
try: | |
if not audio_data: | |
log_warning("⚠️ No audio data provided") | |
return None | |
log_info(f"📊 Transcribing {len(audio_data)} bytes of audio") | |
# ✅ Detaylı audio analizi - logda | |
import struct | |
samples = struct.unpack(f'{len(audio_data)//2}h', audio_data) | |
total_samples = len(samples) | |
# 1. Genel istatistikler | |
non_zero_samples = [s for s in samples if s != 0] | |
zero_count = total_samples - len(non_zero_samples) | |
if non_zero_samples: | |
avg_amplitude = sum(abs(s) for s in non_zero_samples) / len(non_zero_samples) | |
max_amplitude = max(abs(s) for s in non_zero_samples) | |
else: | |
avg_amplitude = 0 | |
max_amplitude = 0 | |
log_info(f"🔍 Audio stats: {total_samples} total samples, {zero_count} zeros ({zero_count/total_samples:.1%})") | |
log_info(f"🔍 Non-zero stats: avg={avg_amplitude:.1f}, max={max_amplitude}") | |
# 2. Bölüm bazlı analiz (10 bölüme ayır) | |
section_size = total_samples // 10 | |
log_info(f"🔍 Section analysis (each {section_size} samples):") | |
for i in range(10): | |
start_idx = i * section_size | |
end_idx = (i + 1) * section_size if i < 9 else total_samples | |
section = samples[start_idx:end_idx] | |
section_non_zero = [s for s in section if s != 0] | |
section_max = max(abs(s) for s in section_non_zero) if section_non_zero else 0 | |
section_avg = sum(abs(s) for s in section_non_zero) / len(section_non_zero) if section_non_zero else 0 | |
zero_ratio = (len(section) - len(section_non_zero)) / len(section) | |
log_info(f" Section {i+1}: max={section_max}, avg={section_avg:.1f}, zeros={zero_ratio:.1%}") | |
# 3. İlk konuşma başlangıcını bul | |
speech_threshold = 500 # RMS eşiği | |
speech_start_idx = -1 | |
# 100 sample'lık pencerelerle RMS hesapla | |
window_size = 100 | |
for i in range(0, total_samples - window_size, window_size): | |
window = samples[i:i + window_size] | |
rms = (sum(s * s for s in window) / window_size) ** 0.5 | |
if rms > speech_threshold: | |
speech_start_idx = i | |
break | |
if speech_start_idx >= 0: | |
speech_start_time = speech_start_idx / config.sample_rate | |
log_info(f"🎤 Speech detected starting at sample {speech_start_idx} ({speech_start_time:.2f}s)") | |
else: | |
log_warning("⚠️ No speech detected above threshold in entire audio") | |
# 4. Audio'nun gerçekten boş olup olmadığını kontrol et | |
if max_amplitude < 100: | |
log_warning(f"⚠️ Audio appears silent: max_amplitude={max_amplitude}") | |
return None | |
if zero_count / total_samples > 0.95: # %95'den fazla sıfır | |
log_warning(f"⚠️ Audio is mostly zeros: {zero_count/total_samples:.1%}") | |
return None | |
# Convert to WAV format | |
wav_audio = self._convert_to_wav(audio_data, config.sample_rate) | |
# Configure recognition | |
recognition_config = RecognitionConfig( | |
encoding=RecognitionConfig.AudioEncoding.LINEAR16, | |
sample_rate_hertz=16000, | |
language_code="tr-TR", | |
audio_channel_count=1, | |
enable_separate_recognition_per_channel=False, | |
enable_automatic_punctuation=True, | |
) | |
# Create audio object | |
audio = RecognitionAudio(content=wav_audio) | |
# Perform synchronous recognition | |
log_info(f"🔄 Sending audio to Google Cloud Speech API...") | |
response = self.client.recognize(config=recognition_config, audio=audio) | |
# ✅ Detaylı response analizi | |
log_info(f"🔍 Google response details:") | |
log_info(f" - Has results: {bool(response.results)}") | |
log_info(f" - Results count: {len(response.results) if response.results else 0}") | |
if hasattr(response, 'total_billed_time'): | |
if response.total_billed_time and response.total_billed_time.total_seconds() > 0: | |
log_info(f" - Billed time: {response.total_billed_time.total_seconds()}s") | |
else: | |
log_info(f" - Billed time: 0s (no audio processed)") | |
# Process results | |
if response.results and len(response.results) > 0: | |
for i, result in enumerate(response.results): | |
log_info(f" - Result {i}: {len(result.alternatives)} alternatives") | |
if result.alternatives: | |
for j, alt in enumerate(result.alternatives): | |
log_info(f" - Alt {j}: '{alt.transcript}' (conf: {alt.confidence:.3f})") | |
result = response.results[0] | |
if result.alternatives and len(result.alternatives) > 0: | |
alternative = result.alternatives[0] | |
transcription = TranscriptionResult( | |
text=alternative.transcript, | |
confidence=alternative.confidence, | |
timestamp=datetime.now().timestamp(), | |
language="tr-TR", | |
word_timestamps=None | |
) | |
log_info(f"✅ Transcription SUCCESS: '{alternative.transcript}' (confidence: {alternative.confidence:.2f})") | |
return transcription | |
log_warning("⚠️ No transcription results - Google couldn't recognize speech") | |
return None | |
except Exception as e: | |
log_error(f"❌ Error during transcription: {str(e)}") | |
import traceback | |
log_error(f"Traceback: {traceback.format_exc()}") | |
return None | |
def _convert_to_wav(self, audio_data: bytes, sample_rate: int) -> bytes: | |
"""Convert raw PCM audio to WAV format""" | |
# Create WAV file in memory | |
wav_buffer = io.BytesIO() | |
with wave.open(wav_buffer, 'wb') as wav_file: | |
# Set WAV parameters | |
wav_file.setnchannels(1) # Mono | |
wav_file.setsampwidth(2) # 16-bit | |
wav_file.setframerate(sample_rate) | |
wav_file.writeframes(audio_data) | |
# Get WAV data | |
wav_buffer.seek(0) | |
return wav_buffer.read() | |
def get_supported_languages(self) -> List[str]: | |
"""Get list of supported language codes""" | |
# Google Cloud Speech-to-Text supported languages (partial list) | |
return [ | |
"tr-TR", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", | |
"es-ES", "es-MX", "es-AR", "fr-FR", "fr-CA", "de-DE", | |
"it-IT", "pt-BR", "pt-PT", "ru-RU", "ja-JP", "ko-KR", | |
"zh-CN", "zh-TW", "ar-SA", "ar-EG", "hi-IN", "nl-NL", | |
"pl-PL", "sv-SE", "da-DK", "no-NO", "fi-FI", "el-GR", | |
"he-IL", "th-TH", "vi-VN", "id-ID", "ms-MY", "fil-PH" | |
] | |
def get_provider_name(self) -> str: | |
"""Get provider name""" | |
return "google" |