Spaces:
Paused
Paused
""" | |
Google Cloud Speech-to-Text Implementation | |
""" | |
import os | |
import asyncio | |
from typing import AsyncIterator, AsyncGenerator, Optional, List, Any | |
import numpy as np # Audio level check için | |
from datetime import datetime | |
import sys | |
import queue | |
import threading | |
import time | |
import traceback | |
from utils.logger import log_info, log_error, log_debug, log_warning | |
# Import Google Cloud Speech only if available | |
try: | |
from google.cloud import speech | |
from google.api_core import exceptions | |
GOOGLE_SPEECH_AVAILABLE = True | |
except ImportError: | |
GOOGLE_SPEECH_AVAILABLE = False | |
log_info("⚠️ Google Cloud Speech library not installed") | |
from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
class GoogleCloudSTT(STTInterface): | |
"""Google Cloud Speech-to-Text implementation""" | |
def __init__(self, credentials_path: str): | |
if not GOOGLE_SPEECH_AVAILABLE: | |
raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech") | |
if credentials_path and os.path.exists(credentials_path): | |
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path | |
log_info(f"✅ Google credentials set from: {credentials_path}") | |
# Test credential'ları | |
try: | |
# Client'ı burada oluşturma, her seferinde yeni instance oluştur | |
test_client = speech.SpeechClient() | |
log_info("🔐 Testing Google credentials...") | |
log_info("✅ Google credentials valid") | |
# Test client'ı kapat | |
if hasattr(test_client, 'transport') and hasattr(test_client.transport, 'close'): | |
test_client.transport.close() | |
except Exception as e: | |
log_error(f"❌ Google credentials error", error=str(e)) | |
raise | |
else: | |
log_error(f"❌ Google credentials path not found: {credentials_path}") | |
raise FileNotFoundError(f"Credentials file not found: {credentials_path}") | |
# Client'ı burada oluşturma, start_streaming'de oluştur | |
self.client = None | |
self.streaming_config = None | |
self.is_streaming = False | |
self.audio_queue = None # Queue'ları None olarak başlat | |
self.responses_queue = None | |
self.stream_thread = None | |
self.stop_event = threading.Event() | |
self.credentials_path = credentials_path | |
# Session tracking | |
self.session_id = 0 | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
def _get_encoding(self, encoding_str: str): | |
"""Convert encoding string to Google Speech enum""" | |
if not GOOGLE_SPEECH_AVAILABLE: | |
return None | |
encoding_map = { | |
"WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS, | |
"LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16, | |
"FLAC": speech.RecognitionConfig.AudioEncoding.FLAC, | |
"MP3": speech.RecognitionConfig.AudioEncoding.MP3, | |
"OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS, | |
} | |
return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS) | |
async def stream_audio(self, audio_chunk: bytes) -> AsyncGenerator[TranscriptionResult, None]: | |
"""Stream audio chunk and get results""" | |
if not self.is_streaming: | |
raise Exception("Streaming not started") | |
try: | |
# Audio validation and logging | |
chunk_size = len(audio_chunk) | |
# Log first chunk details | |
if self.chunk_count == 0: | |
log_info(f"📤 First chunk - size: {chunk_size} bytes") | |
# Check for WEBM header | |
if audio_chunk.startswith(b'\x1a\x45\xdf\xa3'): | |
log_info("✅ Valid WEBM header detected") | |
else: | |
hex_preview = audio_chunk[:20].hex() | |
log_warning(f"⚠️ Unexpected audio format. First 20 bytes: {hex_preview}") | |
# Try to measure audio level (if it's raw PCM) | |
try: | |
import numpy as np | |
# This might fail for WEBM, but let's try | |
audio_array = np.frombuffer(audio_chunk[-1000:], dtype=np.int16) # Last 1000 bytes | |
if len(audio_array) > 0: | |
rms = np.sqrt(np.mean(audio_array.astype(float) ** 2)) | |
db = 20 * np.log10(max(rms, 1) / 32768.0) | |
if self.chunk_count % 50 == 0: | |
log_info(f"🔊 Audio level estimate: {db:.1f} dB") | |
except: | |
# Expected for WEBM format | |
pass | |
# Put chunk in queue | |
self.audio_queue.put(audio_chunk) | |
self.chunk_count += 1 | |
self.total_bytes += chunk_size | |
# Log progress | |
if self.chunk_count % 50 == 0: | |
log_info(f"📤 Progress: {self.chunk_count} chunks, {self.total_bytes/1024:.1f}KB total") | |
# Check for responses with timeout | |
timeout = 0.1 # 100ms timeout for checking responses | |
end_time = time.time() + timeout | |
while time.time() < end_time: | |
try: | |
result = self.responses_queue.get_nowait() | |
log_info(f"🎯 Got result from queue: is_final={result.is_final}, text='{result.text[:30]}...'") | |
yield result | |
except queue.Empty: | |
# No results yet, continue | |
await asyncio.sleep(0.01) | |
except Exception as e: | |
log_error(f"Error getting result from queue: {e}") | |
break | |
except Exception as e: | |
log_error(f"❌ Error in stream_audio: {e}") | |
raise | |
async def stop_streaming(self) -> Optional[TranscriptionResult]: | |
"""Stop streaming and clean up all resources""" | |
if not self.is_streaming and not self.stream_thread: | |
log_debug("Already stopped, nothing to do") | |
return None | |
try: | |
log_info(f"🛑 Stopping Google STT streaming session #{self.session_id}") | |
# Flag'i hemen kapat | |
self.is_streaming = False | |
self.stop_event.set() | |
# Send poison pill to stop request generator | |
if self.audio_queue: | |
try: | |
self.audio_queue.put(None) | |
except: | |
pass | |
# Thread'i durdur | |
if self.stream_thread and self.stream_thread.is_alive(): | |
log_info("⏳ Waiting for stream thread to finish...") | |
self.stream_thread.join(timeout=5.0) | |
if self.stream_thread.is_alive(): | |
log_warning("⚠️ STT thread did not stop gracefully after 5s") | |
else: | |
log_info("✅ Stream thread finished") | |
# Final result'ı al - ✅ BURADA DÜZELTME | |
final_result = None | |
if self.responses_queue: | |
while not self.responses_queue.empty(): | |
try: | |
result = self.responses_queue.get_nowait() # ✅ await değil, get_nowait() | |
if result.is_final: | |
final_result = result | |
except queue.Empty: # ✅ queue.Empty kullan | |
break | |
# Client'ı kapat | |
if self.client: | |
try: | |
if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'): | |
self.client.transport.close() | |
log_debug("✅ Client transport closed") | |
if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'): | |
self.client._transport._grpc_channel.close() | |
log_debug("✅ gRPC channel closed") | |
except Exception as e: | |
log_warning(f"⚠️ Error closing Google client: {e}") | |
finally: | |
self.client = None | |
# Queue'ları None yap | |
self.audio_queue = None | |
self.responses_queue = None | |
# Diğer değişkenleri resetle | |
self.stream_thread = None | |
self.streaming_config = None | |
self.stop_event.clear() | |
log_info(f"✅ Google STT streaming session #{self.session_id} stopped and cleaned") | |
return final_result | |
except Exception as e: | |
log_error(f"❌ Error during stop_streaming", error=str(e)) | |
# Force cleanup on error | |
self.is_streaming = False | |
self.stream_thread = None | |
self.client = None | |
self.streaming_config = None | |
self.stop_event.clear() | |
self.audio_queue = None | |
self.responses_queue = None | |
return None | |
def supports_realtime(self) -> bool: | |
"""Google Cloud STT supports real-time streaming""" | |
return True | |
def get_supported_languages(self) -> List[str]: | |
"""Get list of supported language codes""" | |
return [ | |
"tr-TR", # Turkish | |
"en-US", # English (US) | |
"en-GB", # English (UK) | |
"de-DE", # German | |
"fr-FR", # French | |
"es-ES", # Spanish | |
"it-IT", # Italian | |
"pt-BR", # Portuguese (Brazil) | |
"ru-RU", # Russian | |
"ja-JP", # Japanese | |
"ko-KR", # Korean | |
"zh-CN", # Chinese (Simplified) | |
"ar-SA", # Arabic | |
] | |
def get_provider_name(self) -> str: | |
"""Get provider name""" | |
return "google" | |
def _reset_session_data(self): | |
"""Reset all session-specific data""" | |
# Queue'ları temizle | |
if self.audio_queue: | |
while not self.audio_queue.empty(): | |
try: | |
self.audio_queue.get_nowait() | |
except: | |
pass | |
if self.responses_queue: | |
while not self.responses_queue.empty(): | |
try: | |
self.responses_queue.get_nowait() | |
except: | |
pass | |
# Counters'ı sıfırla | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
# Yeni session ID | |
self.session_id += 1 | |
log_info(f"🔄 Google STT session data reset. New session ID: {self.session_id}") | |
def _create_fresh_queues(self): | |
"""Create fresh queue instances""" | |
# Eski queue'ları temizle | |
if self.audio_queue: | |
while not self.audio_queue.empty(): | |
try: | |
self.audio_queue.get_nowait() | |
except: | |
pass | |
if self.responses_queue: | |
while not self.responses_queue.empty(): | |
try: | |
self.responses_queue.get_nowait() | |
except: | |
pass | |
# Yeni queue'lar oluştur | |
self.audio_queue = queue.Queue(maxsize=1000) # Max size ekle | |
self.responses_queue = queue.Queue(maxsize=100) | |
log_debug("✅ Created fresh queues") | |
async def start_streaming(self, config: dict) -> None: | |
"""Initialize streaming session with clean state""" | |
try: | |
# Önce mevcut stream'i temizle | |
if self.is_streaming or self.stream_thread: | |
log_warning("⚠️ Previous stream still active, stopping it first") | |
await self.stop_streaming() | |
# Temizlik için bekle | |
await asyncio.sleep(0.5) | |
# Session verilerini resetle ve ID'yi artır | |
self._reset_session_data() | |
log_info(f"🎤 Starting Google STT streaming session #{self.session_id} with config: {config}") | |
# Fresh queue'lar oluştur | |
self._create_fresh_queues() | |
# Stop event'i temizle | |
self.stop_event.clear() | |
# Yeni client oluştur (TEK SEFER) | |
self.client = speech.SpeechClient() | |
log_info("✅ Created new Google Speech client") | |
# Convert dict to STTConfig if needed | |
if isinstance(config, dict): | |
stt_config = STTConfig( | |
language=config.get("language", "tr-TR"), | |
sample_rate=config.get("sample_rate", 16000), | |
encoding=config.get("encoding", "WEBM_OPUS"), | |
enable_punctuation=config.get("enable_punctuation", True), | |
interim_results=config.get("interim_results", False), | |
single_utterance=config.get("single_utterance", True) | |
) | |
else: | |
stt_config = config | |
recognition_config = speech.RecognitionConfig( | |
encoding=self._get_encoding(stt_config.encoding), | |
sample_rate_hertz=stt_config.sample_rate, | |
language_code=stt_config.language, | |
enable_automatic_punctuation=stt_config.enable_punctuation, | |
model="latest_long", | |
use_enhanced=True, | |
# Bu parametreleri kaldırıyoruz - v1 API'de yok | |
# enable_voice_activity_events=True, | |
# audio_channel_count=1 | |
) | |
self.streaming_config = speech.StreamingRecognitionConfig( | |
config=recognition_config, | |
interim_results=stt_config.interim_results, | |
single_utterance=stt_config.single_utterance | |
# enable_voice_activity_events kaldırıldı | |
) | |
self.is_streaming = True | |
# Start streaming thread with unique name | |
self.stream_thread = threading.Thread( | |
target=self._run_stream, | |
name=f"GoogleSTT-Session-{self.session_id}" | |
) | |
self.stream_thread.daemon = True # Daemon thread olarak işaretle | |
self.stream_thread.start() | |
log_info(f"✅ Google STT streaming session #{self.session_id} started successfully") | |
except Exception as e: | |
log_error(f"❌ Failed to start Google STT streaming", error=str(e)) | |
self.is_streaming = False | |
self.client = None | |
self._create_fresh_queues() # Hata durumunda da queue'ları temizle | |
raise | |
def _put_result(self, result: TranscriptionResult): | |
"""Helper to put result in queue""" | |
try: | |
self.responses_queue.put(result) | |
# Debug log'u kaldırdık | |
except Exception as e: | |
log_error(f"❌ Error queuing result: {e}") | |
def _run_stream(self): | |
"""Run the streaming recognition loop in a separate thread""" | |
try: | |
log_info("🎤 Google STT stream thread started - Single utterance mode: {}".format(self.single_utterance)) | |
# Create request generator | |
requests = self._request_generator() | |
# Create streaming client | |
log_info("🎤 Creating Google STT streaming client...") | |
# Set a timeout for the streaming call | |
import grpc | |
timeout = 300 # 5 minutes max for the stream | |
# Create streaming client with timeout | |
responses = self.client.streaming_recognize( | |
self.streaming_config, | |
requests, | |
timeout=timeout | |
) | |
# Set initial response timeout | |
initial_response_timeout = 30 # 30 seconds to get first response | |
stream_start = time.time() | |
got_first_response = False | |
# Track if we've received any response | |
first_response_time = None | |
response_count = 0 | |
# Process responses with detailed logging | |
for response in responses: | |
if not got_first_response: | |
got_first_response = True | |
elapsed = time.time() - stream_start | |
log_info(f"✅ Got first response from Google after {elapsed:.2f}s") | |
response_count += 1 | |
if first_response_time is None: | |
first_response_time = time.time() | |
elapsed = first_response_time - self.stream_start_time | |
log_info(f"🎉 FIRST RESPONSE from Google STT after {elapsed:.2f}s") | |
# Log every response, even if empty | |
log_info(f"📨 Google STT Response #{response_count}: has_results={len(response.results) > 0}") | |
if not response.results: | |
log_info("📭 Empty response from Google STT (no results)") | |
continue | |
# Log all results in detail | |
for result_idx, result in enumerate(response.results): | |
log_info(f"📝 Result #{result_idx}: is_final={result.is_final}, " | |
f"alternatives={len(result.alternatives)}, " | |
f"stability={getattr(result, 'stability', 'N/A')}") | |
if result.alternatives: | |
best_alternative = result.alternatives[0] | |
log_info(f"🗣️ Transcript: '{best_alternative.transcript}' " | |
f"(confidence: {best_alternative.confidence:.3f})") | |
# Put result in queue | |
result_obj = TranscriptionResult( | |
text=best_alternative.transcript, | |
is_final=result.is_final, | |
confidence=best_alternative.confidence, | |
timestamp=datetime.utcnow() | |
) | |
self.responses_queue.put(result_obj) | |
log_info(f"✅ Result queued: is_final={result.is_final}, text='{best_alternative.transcript[:50]}...'") | |
# Log if we exit without any responses | |
if response_count == 0: | |
log_error("❌ Google STT stream ended without ANY responses!") | |
else: | |
log_info(f"✅ Google STT stream ended normally after {response_count} responses") | |
except Exception as e: | |
log_error(f"❌ Google STT error: {e}") | |
if hasattr(e, 'details'): | |
log_error(f"Error details: {e.details}") | |
self.error_message = str(e) | |
finally: | |
log_info("🎤 Google STT stream thread ended") | |
with self.lock: | |
self.is_streaming = False |