flare / stt /stt_google.py
ciyidogan's picture
Update stt/stt_google.py
9070a82 verified
raw
history blame
19.2 kB
"""
Google Cloud Speech-to-Text Implementation
"""
import os
import asyncio
from typing import AsyncIterator, AsyncGenerator, Optional, List, Any
import numpy as np # Audio level check için
from datetime import datetime
import sys
import queue
import threading
import time
import traceback
from utils.logger import log_info, log_error, log_debug, log_warning
# Import Google Cloud Speech only if available
try:
from google.cloud import speech
from google.api_core import exceptions
GOOGLE_SPEECH_AVAILABLE = True
except ImportError:
GOOGLE_SPEECH_AVAILABLE = False
log_info("⚠️ Google Cloud Speech library not installed")
from .stt_interface import STTInterface, STTConfig, TranscriptionResult
class GoogleCloudSTT(STTInterface):
"""Google Cloud Speech-to-Text implementation"""
def __init__(self, credentials_path: str):
if not GOOGLE_SPEECH_AVAILABLE:
raise ImportError("google-cloud-speech library not installed. Run: pip install google-cloud-speech")
if credentials_path and os.path.exists(credentials_path):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
log_info(f"✅ Google credentials set from: {credentials_path}")
# Test credential'ları
try:
# Client'ı burada oluşturma, her seferinde yeni instance oluştur
test_client = speech.SpeechClient()
log_info("🔐 Testing Google credentials...")
log_info("✅ Google credentials valid")
# Test client'ı kapat
if hasattr(test_client, 'transport') and hasattr(test_client.transport, 'close'):
test_client.transport.close()
except Exception as e:
log_error(f"❌ Google credentials error", error=str(e))
raise
else:
log_error(f"❌ Google credentials path not found: {credentials_path}")
raise FileNotFoundError(f"Credentials file not found: {credentials_path}")
# Client'ı burada oluşturma, start_streaming'de oluştur
self.client = None
self.streaming_config = None
self.is_streaming = False
self.audio_queue = None # Queue'ları None olarak başlat
self.responses_queue = None
self.stream_thread = None
self.stop_event = threading.Event()
self.credentials_path = credentials_path
# Session tracking
self.session_id = 0
self.total_audio_bytes = 0
self.total_chunks = 0
def _get_encoding(self, encoding_str: str):
"""Convert encoding string to Google Speech enum"""
if not GOOGLE_SPEECH_AVAILABLE:
return None
encoding_map = {
"WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS,
"LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16,
"FLAC": speech.RecognitionConfig.AudioEncoding.FLAC,
"MP3": speech.RecognitionConfig.AudioEncoding.MP3,
"OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS,
}
return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS)
async def stream_audio(self, audio_chunk: bytes) -> AsyncGenerator[TranscriptionResult, None]:
"""Stream audio chunk and get results"""
if not self.is_streaming:
raise Exception("Streaming not started")
try:
# Audio validation and logging
chunk_size = len(audio_chunk)
# Log first chunk details
if self.chunk_count == 0:
log_info(f"📤 First chunk - size: {chunk_size} bytes")
# Check for WEBM header
if audio_chunk.startswith(b'\x1a\x45\xdf\xa3'):
log_info("✅ Valid WEBM header detected")
else:
hex_preview = audio_chunk[:20].hex()
log_warning(f"⚠️ Unexpected audio format. First 20 bytes: {hex_preview}")
# Try to measure audio level (if it's raw PCM)
try:
import numpy as np
# This might fail for WEBM, but let's try
audio_array = np.frombuffer(audio_chunk[-1000:], dtype=np.int16) # Last 1000 bytes
if len(audio_array) > 0:
rms = np.sqrt(np.mean(audio_array.astype(float) ** 2))
db = 20 * np.log10(max(rms, 1) / 32768.0)
if self.chunk_count % 50 == 0:
log_info(f"🔊 Audio level estimate: {db:.1f} dB")
except:
# Expected for WEBM format
pass
# Put chunk in queue
self.audio_queue.put(audio_chunk)
self.chunk_count += 1
self.total_bytes += chunk_size
# Log progress
if self.chunk_count % 50 == 0:
log_info(f"📤 Progress: {self.chunk_count} chunks, {self.total_bytes/1024:.1f}KB total")
# Check for responses with timeout
timeout = 0.1 # 100ms timeout for checking responses
end_time = time.time() + timeout
while time.time() < end_time:
try:
result = self.responses_queue.get_nowait()
log_info(f"🎯 Got result from queue: is_final={result.is_final}, text='{result.text[:30]}...'")
yield result
except queue.Empty:
# No results yet, continue
await asyncio.sleep(0.01)
except Exception as e:
log_error(f"Error getting result from queue: {e}")
break
except Exception as e:
log_error(f"❌ Error in stream_audio: {e}")
raise
async def stop_streaming(self) -> Optional[TranscriptionResult]:
"""Stop streaming and clean up all resources"""
if not self.is_streaming and not self.stream_thread:
log_debug("Already stopped, nothing to do")
return None
try:
log_info(f"🛑 Stopping Google STT streaming session #{self.session_id}")
# Flag'i hemen kapat
self.is_streaming = False
self.stop_event.set()
# Send poison pill to stop request generator
if self.audio_queue:
try:
self.audio_queue.put(None)
except:
pass
# Thread'i durdur
if self.stream_thread and self.stream_thread.is_alive():
log_info("⏳ Waiting for stream thread to finish...")
self.stream_thread.join(timeout=5.0)
if self.stream_thread.is_alive():
log_warning("⚠️ STT thread did not stop gracefully after 5s")
else:
log_info("✅ Stream thread finished")
# Final result'ı al - ✅ BURADA DÜZELTME
final_result = None
if self.responses_queue:
while not self.responses_queue.empty():
try:
result = self.responses_queue.get_nowait() # ✅ await değil, get_nowait()
if result.is_final:
final_result = result
except queue.Empty: # ✅ queue.Empty kullan
break
# Client'ı kapat
if self.client:
try:
if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'):
self.client.transport.close()
log_debug("✅ Client transport closed")
if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'):
self.client._transport._grpc_channel.close()
log_debug("✅ gRPC channel closed")
except Exception as e:
log_warning(f"⚠️ Error closing Google client: {e}")
finally:
self.client = None
# Queue'ları None yap
self.audio_queue = None
self.responses_queue = None
# Diğer değişkenleri resetle
self.stream_thread = None
self.streaming_config = None
self.stop_event.clear()
log_info(f"✅ Google STT streaming session #{self.session_id} stopped and cleaned")
return final_result
except Exception as e:
log_error(f"❌ Error during stop_streaming", error=str(e))
# Force cleanup on error
self.is_streaming = False
self.stream_thread = None
self.client = None
self.streaming_config = None
self.stop_event.clear()
self.audio_queue = None
self.responses_queue = None
return None
def supports_realtime(self) -> bool:
"""Google Cloud STT supports real-time streaming"""
return True
def get_supported_languages(self) -> List[str]:
"""Get list of supported language codes"""
return [
"tr-TR", # Turkish
"en-US", # English (US)
"en-GB", # English (UK)
"de-DE", # German
"fr-FR", # French
"es-ES", # Spanish
"it-IT", # Italian
"pt-BR", # Portuguese (Brazil)
"ru-RU", # Russian
"ja-JP", # Japanese
"ko-KR", # Korean
"zh-CN", # Chinese (Simplified)
"ar-SA", # Arabic
]
def get_provider_name(self) -> str:
"""Get provider name"""
return "google"
def _reset_session_data(self):
"""Reset all session-specific data"""
# Queue'ları temizle
if self.audio_queue:
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
except:
pass
if self.responses_queue:
while not self.responses_queue.empty():
try:
self.responses_queue.get_nowait()
except:
pass
# Counters'ı sıfırla
self.total_audio_bytes = 0
self.total_chunks = 0
# Yeni session ID
self.session_id += 1
log_info(f"🔄 Google STT session data reset. New session ID: {self.session_id}")
def _create_fresh_queues(self):
"""Create fresh queue instances"""
# Eski queue'ları temizle
if self.audio_queue:
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
except:
pass
if self.responses_queue:
while not self.responses_queue.empty():
try:
self.responses_queue.get_nowait()
except:
pass
# Yeni queue'lar oluştur
self.audio_queue = queue.Queue(maxsize=1000) # Max size ekle
self.responses_queue = queue.Queue(maxsize=100)
log_debug("✅ Created fresh queues")
async def start_streaming(self, config: dict) -> None:
"""Initialize streaming session with clean state"""
try:
# Önce mevcut stream'i temizle
if self.is_streaming or self.stream_thread:
log_warning("⚠️ Previous stream still active, stopping it first")
await self.stop_streaming()
# Temizlik için bekle
await asyncio.sleep(0.5)
# Session verilerini resetle ve ID'yi artır
self._reset_session_data()
log_info(f"🎤 Starting Google STT streaming session #{self.session_id} with config: {config}")
# Fresh queue'lar oluştur
self._create_fresh_queues()
# Stop event'i temizle
self.stop_event.clear()
# Yeni client oluştur (TEK SEFER)
self.client = speech.SpeechClient()
log_info("✅ Created new Google Speech client")
# Convert dict to STTConfig if needed
if isinstance(config, dict):
stt_config = STTConfig(
language=config.get("language", "tr-TR"),
sample_rate=config.get("sample_rate", 16000),
encoding=config.get("encoding", "WEBM_OPUS"),
enable_punctuation=config.get("enable_punctuation", True),
interim_results=config.get("interim_results", False),
single_utterance=config.get("single_utterance", True)
)
else:
stt_config = config
recognition_config = speech.RecognitionConfig(
encoding=self._get_encoding(stt_config.encoding),
sample_rate_hertz=stt_config.sample_rate,
language_code=stt_config.language,
enable_automatic_punctuation=stt_config.enable_punctuation,
model="latest_long",
use_enhanced=True,
# Bu parametreleri kaldırıyoruz - v1 API'de yok
# enable_voice_activity_events=True,
# audio_channel_count=1
)
self.streaming_config = speech.StreamingRecognitionConfig(
config=recognition_config,
interim_results=stt_config.interim_results,
single_utterance=stt_config.single_utterance
# enable_voice_activity_events kaldırıldı
)
self.is_streaming = True
# Start streaming thread with unique name
self.stream_thread = threading.Thread(
target=self._run_stream,
name=f"GoogleSTT-Session-{self.session_id}"
)
self.stream_thread.daemon = True # Daemon thread olarak işaretle
self.stream_thread.start()
log_info(f"✅ Google STT streaming session #{self.session_id} started successfully")
except Exception as e:
log_error(f"❌ Failed to start Google STT streaming", error=str(e))
self.is_streaming = False
self.client = None
self._create_fresh_queues() # Hata durumunda da queue'ları temizle
raise
def _put_result(self, result: TranscriptionResult):
"""Helper to put result in queue"""
try:
self.responses_queue.put(result)
# Debug log'u kaldırdık
except Exception as e:
log_error(f"❌ Error queuing result: {e}")
def _run_stream(self):
"""Run the streaming recognition loop in a separate thread"""
try:
log_info("🎤 Google STT stream thread started - Single utterance mode: {}".format(self.single_utterance))
# Create request generator
requests = self._request_generator()
# Create streaming client
log_info("🎤 Creating Google STT streaming client...")
# Set a timeout for the streaming call
import grpc
timeout = 300 # 5 minutes max for the stream
# Create streaming client with timeout
responses = self.client.streaming_recognize(
self.streaming_config,
requests,
timeout=timeout
)
# Set initial response timeout
initial_response_timeout = 30 # 30 seconds to get first response
stream_start = time.time()
got_first_response = False
# Track if we've received any response
first_response_time = None
response_count = 0
# Process responses with detailed logging
for response in responses:
if not got_first_response:
got_first_response = True
elapsed = time.time() - stream_start
log_info(f"✅ Got first response from Google after {elapsed:.2f}s")
response_count += 1
if first_response_time is None:
first_response_time = time.time()
elapsed = first_response_time - self.stream_start_time
log_info(f"🎉 FIRST RESPONSE from Google STT after {elapsed:.2f}s")
# Log every response, even if empty
log_info(f"📨 Google STT Response #{response_count}: has_results={len(response.results) > 0}")
if not response.results:
log_info("📭 Empty response from Google STT (no results)")
continue
# Log all results in detail
for result_idx, result in enumerate(response.results):
log_info(f"📝 Result #{result_idx}: is_final={result.is_final}, "
f"alternatives={len(result.alternatives)}, "
f"stability={getattr(result, 'stability', 'N/A')}")
if result.alternatives:
best_alternative = result.alternatives[0]
log_info(f"🗣️ Transcript: '{best_alternative.transcript}' "
f"(confidence: {best_alternative.confidence:.3f})")
# Put result in queue
result_obj = TranscriptionResult(
text=best_alternative.transcript,
is_final=result.is_final,
confidence=best_alternative.confidence,
timestamp=datetime.utcnow()
)
self.responses_queue.put(result_obj)
log_info(f"✅ Result queued: is_final={result.is_final}, text='{best_alternative.transcript[:50]}...'")
# Log if we exit without any responses
if response_count == 0:
log_error("❌ Google STT stream ended without ANY responses!")
else:
log_info(f"✅ Google STT stream ended normally after {response_count} responses")
except Exception as e:
log_error(f"❌ Google STT error: {e}")
if hasattr(e, 'details'):
log_error(f"Error details: {e.details}")
self.error_message = str(e)
finally:
log_info("🎤 Google STT stream thread ended")
with self.lock:
self.is_streaming = False