flare / stt /stt_google.py
ciyidogan's picture
Update stt/stt_google.py
1e56973 verified
raw
history blame
18.5 kB
"""
Google Cloud Speech-to-Text Implementation
"""
import asyncio
from typing import AsyncIterator, Optional, List, Any
from datetime import datetime
import queue
import threading
import traceback
import os
from google.cloud import speech
from google.cloud.speech import RecognitionConfig, StreamingRecognitionConfig
import google.auth
from utils.logger import log_info, log_error, log_debug, log_warning
from .stt_interface import STTInterface, STTConfig, TranscriptionResult
class GoogleSTT(STTInterface):
def __init__(self, credentials_path: Optional[str] = None):
"""
Initialize Google STT
Args:
credentials_path: Path to service account JSON file (optional if using default credentials)
"""
try:
TEST_CREDENTIALS = '''
{
"type": "service_account",
"project_id": "YOUR_PROJECT_ID",
"private_key_id": "YOUR_KEY_ID",
"private_key": "-----BEGIN PRIVATE KEY-----\\nYOUR_PRIVATE_KEY\\n-----END PRIVATE KEY-----\\n",
"client_email": "YOUR_SERVICE_ACCOUNT@YOUR_PROJECT.iam.gserviceaccount.com",
"client_id": "YOUR_CLIENT_ID",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "YOUR_CERT_URL"
}
'''
'''
# โœ… Debug iรงin path kontrolรผ
if credentials_path:
import os
if not os.path.exists(credentials_path):
log_error(f"โŒ Credentials file not found at: {credentials_path}")
raise FileNotFoundError(f"Credentials file not found: {credentials_path}")
log_info(f"๐Ÿ“ Using credentials from: {credentials_path}")
# Initialize client
if credentials_path:
self.client = speech.SpeechClient.from_service_account_file(credentials_path)
log_info(f"โœ… Google STT initialized with service account: {credentials_path}")
else:
# Use default credentials (ADC)
self.client = speech.SpeechClient()
log_info("โœ… Google STT initialized with default credentials")
'''
# String'den credentials oluลŸtur
import json
from google.oauth2 import service_account
credentials_dict = json.loads(TEST_CREDENTIALS)
credentials = service_account.Credentials.from_service_account_info(
credentials_dict,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
self.client = speech.SpeechClient(credentials=credentials)
log_info(f"โœ… Google STT initialized with inline credentials")
# Streaming state
self.is_streaming = False
self.audio_generator = None
self.responses_stream = None
self.audio_queue = queue.Queue()
self.results_queue = queue.Queue(maxsize=100)
# Session tracking
self.session_id = 0
self.total_audio_bytes = 0
self.total_chunks = 0
# Threading
self.stream_thread = None
self.stop_event = threading.Event()
except Exception as e:
log_error(f"โŒ Failed to initialize Google STT: {str(e)}")
raise
def _map_language_code(self, language: str) -> str:
"""Map language codes to Google format"""
# Google uses BCP-47 language codes
language_map = {
"tr-TR": "tr-TR",
"en-US": "en-US",
"en-GB": "en-GB",
"de-DE": "de-DE",
"fr-FR": "fr-FR",
"es-ES": "es-ES",
"it-IT": "it-IT",
"pt-BR": "pt-BR",
"ru-RU": "ru-RU",
"ja-JP": "ja-JP",
"ko-KR": "ko-KR",
"zh-CN": "zh-CN",
"ar-SA": "ar-SA",
}
return language_map.get(language, language)
async def start_streaming(self, config: STTConfig) -> None:
"""Initialize streaming session"""
try:
# Stop any existing stream
if self.is_streaming:
log_warning("โš ๏ธ Previous stream still active, stopping it first")
await self.stop_streaming()
await asyncio.sleep(0.5)
# Reset session data
self._reset_session_data()
log_info(f"๐ŸŽค Starting Google STT - Session #{self.session_id}")
# Configure recognition settings
language_code = self._map_language_code(config.language)
"""
# โœ… Google STT best practices for Turkish and single utterance
recognition_config = RecognitionConfig(
encoding=RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=16000,
language_code="tr-TR",
# โœ… Single utterance iรงin ideal ayarlar
enable_automatic_punctuation=True,
# Model selection - latest_long for better accuracy
model="latest_long",
# Use enhanced model if available (better for Turkish)
use_enhanced=True,
# Single channel audio
audio_channel_count=1,
# Alternative transcripts for debugging
max_alternatives=1,
# Profanity filter disabled for accuracy
profanity_filter=False,
# Word level confidence
enable_word_confidence=False,
enable_spoken_punctuation=False,
enable_spoken_emojis=False,
)
# โœ… Streaming config - optimized for final results only
self.streaming_config = StreamingRecognitionConfig(
config=recognition_config,
single_utterance=False,
interim_results=True
)
"""
# โœ… EN BASฤฐT CONFIG - sadece zorunlu alanlar
recognition_config = RecognitionConfig(
encoding=RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=16000,
language_code="tr-TR"
)
# โœ… Streaming config - en basit hali
self.streaming_config = StreamingRecognitionConfig(
config=recognition_config,
interim_results=True
)
log_info(f"๐Ÿ”ง Google STT config: language={language_code}, "
f"model=latest_long, enhanced=True, "
f"single_utterance=True, interim_results=False")
# Start streaming in background thread
self.stop_event.clear()
self.stream_thread = threading.Thread(
target=self._stream_recognition,
daemon=True
)
self.stream_thread.start()
self.is_streaming = True
log_info(f"โœ… Google STT started - Ready for speech")
except Exception as e:
log_error(f"โŒ Failed to start Google STT", error=str(e))
self.is_streaming = False
raise
def _stream_recognition(self):
"""Background thread for streaming recognition"""
try:
log_debug("๐ŸŽ™๏ธ Starting recognition stream thread")
# โœ… Config'i logla
log_debug(f"Config details: {self.streaming_config}")
# Create audio generator
audio_generator = self._audio_generator()
# โœ… Daha detaylฤฑ hata yakalama
try:
# Start streaming recognition
responses = self.client.streaming_recognize(
self.streaming_config,
audio_generator
)
except Exception as api_error:
log_error(f"โŒ Google API error: {str(api_error)}")
log_error(f"โŒ Error type: {type(api_error).__name__}")
if hasattr(api_error, 'details'):
log_error(f"โŒ Error details: {api_error.details()}")
if hasattr(api_error, '__dict__'):
log_error(f"โŒ Error attributes: {api_error.__dict__}")
import traceback
log_error(f"โŒ Full traceback: {traceback.format_exc()}")
raise
# Process responses
for response in responses:
if self.stop_event.is_set():
break
if not response.results:
continue
# Process each result
for result in response.results:
if not result.alternatives:
continue
# Get best alternative
alternative = result.alternatives[0]
# Only process if we have transcript
if alternative.transcript:
# โœ… Interim result'larฤฑ logla ama queue'ya koyma
if not result.is_final:
log_debug(f"๐Ÿ“ Interim transcript (ignored): '{alternative.transcript}'")
continue
# โœ… Sadece final result'larฤฑ iลŸle
transcription_result = TranscriptionResult(
text=alternative.transcript,
is_final=result.is_final,
confidence=alternative.confidence,
timestamp=datetime.now().timestamp()
)
try:
self.results_queue.put(transcription_result)
if result.is_final:
log_info(f"๐ŸŽฏ FINAL TRANSCRIPT: '{alternative.transcript}' "
f"(confidence: {alternative.confidence:.2f})")
# Single utterance mode will end stream after this
break
else:
# This shouldn't happen with interim_results=False
log_debug(f"๐Ÿ“ Transcript: '{alternative.transcript}'")
except queue.Full:
log_warning("โš ๏ธ Results queue full")
# Check if stream ended due to single_utterance
if hasattr(response, 'speech_event_type'):
if response.speech_event_type == speech.StreamingRecognizeResponse.SpeechEventType.END_OF_SINGLE_UTTERANCE:
log_info("๐Ÿ”š End of single utterance detected")
break
except Exception as e:
if not self.stop_event.is_set():
log_error(f"โŒ Recognition stream error: {str(e)}")
# Put error in queue
error_result = TranscriptionResult(
text="",
is_final=True,
confidence=0.0,
timestamp=datetime.now().timestamp()
)
self.results_queue.put(error_result)
finally:
log_debug("๐ŸŽ™๏ธ Recognition stream thread ended")
self.is_streaming = False
def _audio_generator(self):
"""Generator that yields audio chunks for streaming"""
chunk_count = 0
try:
while not self.stop_event.is_set():
try:
# Get audio chunk with timeout
chunk = self.audio_queue.get(timeout=0.1)
if chunk is None: # Sentinel value
log_debug("๐Ÿ”š Audio generator received sentinel, stopping")
break
# โœ… Debug iรงin chunk bilgisi
chunk_count += 1
if chunk_count <= 5: # ฤฐlk 5 chunk iรงin detaylฤฑ log
log_debug(f"๐ŸŽต Audio generator yielding chunk #{chunk_count}, size: {len(chunk)} bytes")
# Chunk'ฤฑn byte tipinde olduฤŸundan emin ol
if not isinstance(chunk, bytes):
log_error(f"โŒ Chunk is not bytes! Type: {type(chunk)}")
continue
# โœ… Google API'nin beklediฤŸi format
yield chunk
except queue.Empty:
continue
except Exception as e:
log_error(f"โŒ Audio generator error: {str(e)}")
break
finally:
log_debug(f"๐ŸŽ™๏ธ Audio generator stopped after {chunk_count} chunks")
async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
"""Stream audio chunk and get transcription results"""
if not self.is_streaming:
raise RuntimeError("Streaming not started. Call start_streaming() first.")
try:
# โœ… Audio chunk tipini kontrol et
if not isinstance(audio_chunk, bytes):
log_error(f"โŒ Audio chunk is not bytes! Type: {type(audio_chunk)}")
raise TypeError(f"Expected bytes, got {type(audio_chunk)}")
# โœ… Chunk boyutunu logla
if self.total_chunks < 5:
log_debug(f"๐Ÿ“ฆ Adding audio chunk #{self.total_chunks} to queue, size: {len(audio_chunk)} bytes")
# Add audio to queue for background thread
self.audio_queue.put(audio_chunk)
self.total_chunks += 1
self.total_audio_bytes += len(audio_chunk)
# Log progress
if self.total_chunks % 50 == 0:
log_debug(f"๐Ÿ“Š Processing... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB")
# Check for results
while True:
try:
result = self.results_queue.get_nowait()
# Log for debugging
log_debug(f"๐ŸŽฏ Yielding result: is_final={result.is_final}, text='{result.text}'")
yield result
# If final result, stream will end
if result.is_final:
self.is_streaming = False
except queue.Empty:
break
except Exception as e:
log_error(f"โŒ Error streaming audio", error=str(e))
self.is_streaming = False
raise
async def stop_streaming(self) -> Optional[TranscriptionResult]:
"""Stop streaming and clean up"""
if not self.is_streaming:
log_debug("Already stopped, nothing to do")
return None
try:
log_info(f"๐Ÿ›‘ Stopping Google STT session #{self.session_id}")
self.is_streaming = False
# Signal stop
self.stop_event.set()
# Send sentinel to audio queue
self.audio_queue.put(None)
# Wait for thread to finish
if self.stream_thread and self.stream_thread.is_alive():
self.stream_thread.join(timeout=2.0)
# Get final result if any
final_result = None
while not self.results_queue.empty():
try:
result = self.results_queue.get_nowait()
if result.is_final and result.text:
final_result = result
except queue.Empty:
break
log_info(f"โœ… Google STT session #{self.session_id} stopped")
return final_result
except Exception as e:
log_error(f"โŒ Error during stop_streaming", error=str(e))
self.is_streaming = False
return None
def _reset_session_data(self):
"""Reset session-specific data"""
# Clear queues
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
except:
pass
while not self.results_queue.empty():
try:
self.results_queue.get_nowait()
except:
pass
# Reset counters
self.total_audio_bytes = 0
self.total_chunks = 0
self.session_id += 1
log_debug(f"๐Ÿ”„ Session data reset. New session ID: {self.session_id}")
def supports_realtime(self) -> bool:
"""Google STT supports real-time streaming"""
return True
def get_supported_languages(self) -> List[str]:
"""Get list of supported language codes"""
# Google Cloud Speech-to-Text supported languages (partial list)
# Full list: https://cloud.google.com/speech-to-text/docs/languages
return [
"tr-TR", "en-US", "en-GB", "en-AU", "en-CA", "en-IN",
"es-ES", "es-MX", "es-AR", "fr-FR", "fr-CA", "de-DE",
"it-IT", "pt-BR", "pt-PT", "ru-RU", "ja-JP", "ko-KR",
"zh-CN", "zh-TW", "ar-SA", "ar-EG", "hi-IN", "nl-NL",
"pl-PL", "sv-SE", "da-DK", "no-NO", "fi-FI", "el-GR",
"he-IL", "th-TH", "vi-VN", "id-ID", "ms-MY", "fil-PH"
]
def get_provider_name(self) -> str:
"""Get provider name"""
return "google"