Spaces:
Running
Running
""" | |
Google Cloud Speech-to-Text Implementation - Simple Batch Mode | |
""" | |
from typing import Optional, List | |
from datetime import datetime | |
import io | |
import wave | |
from google.cloud import speech | |
from google.cloud.speech import RecognitionConfig, RecognitionAudio | |
from utils.logger import log_info, log_error, log_debug, log_warning | |
from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
class GoogleSTT(STTInterface): | |
def __init__(self, credentials_path: Optional[str] = None): | |
""" | |
Initialize Google STT | |
Args: | |
credentials_path: Path to service account JSON file (optional if using default credentials) | |
""" | |
try: | |
# Initialize client | |
if credentials_path: | |
self.client = speech.SpeechClient.from_service_account_file(credentials_path) | |
log_info(f"✅ Google STT initialized with service account: {credentials_path}") | |
else: | |
# Use default credentials (ADC) | |
self.client = speech.SpeechClient() | |
log_info("✅ Google STT initialized with default credentials") | |
except Exception as e: | |
log_error(f"❌ Failed to initialize Google STT: {str(e)}") | |
raise | |
def _map_language_code(self, language: str) -> str: | |
"""Map language codes to Google format""" | |
# Google uses BCP-47 language codes | |
language_map = { | |
"tr": "tr-TR", | |
"tr-TR": "tr-TR", | |
"en": "en-US", | |
"en-US": "en-US", | |
"en-GB": "en-GB", | |
"de": "de-DE", | |
"de-DE": "de-DE", | |
"fr": "fr-FR", | |
"fr-FR": "fr-FR", | |
"es": "es-ES", | |
"es-ES": "es-ES", | |
"it": "it-IT", | |
"it-IT": "it-IT", | |
"pt": "pt-BR", | |
"pt-BR": "pt-BR", | |
"ru": "ru-RU", | |
"ru-RU": "ru-RU", | |
"ja": "ja-JP", | |
"ja-JP": "ja-JP", | |
"ko": "ko-KR", | |
"ko-KR": "ko-KR", | |
"zh": "zh-CN", | |
"zh-CN": "zh-CN", | |
"ar": "ar-SA", | |
"ar-SA": "ar-SA", | |
} | |
# Default to the language itself if not in map | |
return language_map.get(language, language) | |
async def transcribe(self, audio_data: bytes, config: STTConfig) -> Optional[TranscriptionResult]: | |
"""Transcribe audio data using Google Cloud Speech API""" | |
try: | |
# Check if we have audio to transcribe | |
if not audio_data: | |
log_warning("⚠️ No audio data provided") | |
return None | |
log_info(f"📊 Transcribing {len(audio_data)} bytes of audio") | |
# ✅ Debug - audio verisi analizi | |
if len(audio_data) > 100: | |
# İlk ve son 50 byte'ı kontrol et | |
first_50 = audio_data[:50] | |
last_50 = audio_data[-50:] | |
log_debug(f"Audio first 50 bytes: {first_50.hex()}") | |
log_debug(f"Audio last 50 bytes: {last_50.hex()}") | |
# Ortalama amplitude kontrolü | |
import struct | |
samples = struct.unpack(f'{len(audio_data)//2}h', audio_data) | |
avg_amplitude = sum(abs(s) for s in samples) / len(samples) | |
max_amplitude = max(abs(s) for s in samples) | |
log_debug(f"Audio stats: avg_amplitude={avg_amplitude:.1f}, max_amplitude={max_amplitude}") | |
# Convert to WAV format for better compatibility | |
wav_audio = self._convert_to_wav(audio_data, config.sample_rate) | |
# Configure recognition | |
language_code = self._map_language_code(config.language) | |
""" | |
recognition_config = RecognitionConfig( | |
encoding=RecognitionConfig.AudioEncoding.LINEAR16, | |
sample_rate_hertz=config.sample_rate, | |
language_code=language_code, | |
enable_automatic_punctuation=config.enable_punctuation, | |
model=config.model, | |
use_enhanced=config.use_enhanced, | |
enable_word_time_offsets=config.enable_word_timestamps, | |
) | |
""" | |
recognition_config = RecognitionConfig( | |
encoding=RecognitionConfig.AudioEncoding.LINEAR16, | |
sample_rate_hertz=16000, | |
language_code="tr-TR", | |
audio_channel_count=1, # Frontend mono audio gönderiyor | |
enable_separate_recognition_per_channel=False, | |
) | |
log_debug(f"Recognition config: language={language_code}, sample_rate={config.sample_rate}, model={config.model}") | |
# Create audio object | |
audio = RecognitionAudio(content=wav_audio) | |
# Perform synchronous recognition | |
log_info(f"🔄 Sending audio to Google Cloud Speech API...") | |
response = self.client.recognize(config=recognition_config, audio=audio) | |
# ✅ Debug response | |
log_debug(f"API Response: {response}") | |
# Process results | |
if response.results: | |
result = response.results[0] | |
if result.alternatives: | |
alternative = result.alternatives[0] | |
# Extract word timestamps if available | |
word_timestamps = None | |
if config.enable_word_timestamps and hasattr(alternative, 'words'): | |
word_timestamps = [ | |
{ | |
"word": word_info.word, | |
"start_time": word_info.start_time.total_seconds(), | |
"end_time": word_info.end_time.total_seconds() | |
} | |
for word_info in alternative.words | |
] | |
transcription = TranscriptionResult( | |
text=alternative.transcript, | |
confidence=alternative.confidence, | |
timestamp=datetime.now().timestamp(), | |
language=language_code, | |
word_timestamps=word_timestamps | |
) | |
log_info(f"✅ Transcription: '{alternative.transcript}' (confidence: {alternative.confidence:.2f})") | |
return transcription | |
log_warning("⚠️ No transcription results") | |
return None | |
except Exception as e: | |
log_error(f"❌ Error during transcription: {str(e)}") | |
import traceback | |
log_error(f"Traceback: {traceback.format_exc()}") | |
return None | |
def _convert_to_wav(self, audio_data: bytes, sample_rate: int) -> bytes: | |
"""Convert raw PCM audio to WAV format""" | |
# Create WAV file in memory | |
wav_buffer = io.BytesIO() | |
with wave.open(wav_buffer, 'wb') as wav_file: | |
# Set WAV parameters | |
wav_file.setnchannels(1) # Mono | |
wav_file.setsampwidth(2) # 16-bit | |
wav_file.setframerate(sample_rate) | |
wav_file.writeframes(audio_data) | |
# Get WAV data | |
wav_buffer.seek(0) | |
return wav_buffer.read() | |
def get_supported_languages(self) -> List[str]: | |
"""Get list of supported language codes""" | |
# Google Cloud Speech-to-Text supported languages (partial list) | |
return [ | |
"tr-TR", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", | |
"es-ES", "es-MX", "es-AR", "fr-FR", "fr-CA", "de-DE", | |
"it-IT", "pt-BR", "pt-PT", "ru-RU", "ja-JP", "ko-KR", | |
"zh-CN", "zh-TW", "ar-SA", "ar-EG", "hi-IN", "nl-NL", | |
"pl-PL", "sv-SE", "da-DK", "no-NO", "fi-FI", "el-GR", | |
"he-IL", "th-TH", "vi-VN", "id-ID", "ms-MY", "fil-PH" | |
] | |
def get_provider_name(self) -> str: | |
"""Get provider name""" | |
return "google" |