Spaces:
Running
Running
""" | |
Google Cloud Speech-to-Text Implementation | |
""" | |
import asyncio | |
from typing import AsyncIterator, Optional, List, Any | |
from datetime import datetime | |
import queue | |
import threading | |
import traceback | |
import os | |
from google.cloud import speech | |
from google.cloud.speech import RecognitionConfig, StreamingRecognitionConfig | |
import google.auth | |
from utils.logger import log_info, log_error, log_debug, log_warning | |
from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
class GoogleSTT(STTInterface): | |
def __init__(self, credentials_path: Optional[str] = None): | |
""" | |
Initialize Google STT | |
Args: | |
credentials_path: Path to service account JSON file (optional if using default credentials) | |
""" | |
try: | |
TEST_CREDENTIALS = ''' | |
{ | |
"type": "service_account", | |
"project_id": "YOUR_PROJECT_ID", | |
"private_key_id": "YOUR_KEY_ID", | |
"private_key": "-----BEGIN PRIVATE KEY-----\\nYOUR_PRIVATE_KEY\\n-----END PRIVATE KEY-----\\n", | |
"client_email": "YOUR_SERVICE_ACCOUNT@YOUR_PROJECT.iam.gserviceaccount.com", | |
"client_id": "YOUR_CLIENT_ID", | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
"client_x509_cert_url": "YOUR_CERT_URL" | |
} | |
''' | |
''' | |
# โ Debug iรงin path kontrolรผ | |
if credentials_path: | |
import os | |
if not os.path.exists(credentials_path): | |
log_error(f"โ Credentials file not found at: {credentials_path}") | |
raise FileNotFoundError(f"Credentials file not found: {credentials_path}") | |
log_info(f"๐ Using credentials from: {credentials_path}") | |
# Initialize client | |
if credentials_path: | |
self.client = speech.SpeechClient.from_service_account_file(credentials_path) | |
log_info(f"โ Google STT initialized with service account: {credentials_path}") | |
else: | |
# Use default credentials (ADC) | |
self.client = speech.SpeechClient() | |
log_info("โ Google STT initialized with default credentials") | |
''' | |
# String'den credentials oluลtur | |
import json | |
from google.oauth2 import service_account | |
credentials_dict = json.loads(TEST_CREDENTIALS) | |
credentials = service_account.Credentials.from_service_account_info( | |
credentials_dict, | |
scopes=["https://www.googleapis.com/auth/cloud-platform"] | |
) | |
self.client = speech.SpeechClient(credentials=credentials) | |
log_info(f"โ Google STT initialized with inline credentials") | |
# Streaming state | |
self.is_streaming = False | |
self.audio_generator = None | |
self.responses_stream = None | |
self.audio_queue = queue.Queue() | |
self.results_queue = queue.Queue(maxsize=100) | |
# Session tracking | |
self.session_id = 0 | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
# Threading | |
self.stream_thread = None | |
self.stop_event = threading.Event() | |
except Exception as e: | |
log_error(f"โ Failed to initialize Google STT: {str(e)}") | |
raise | |
def _map_language_code(self, language: str) -> str: | |
"""Map language codes to Google format""" | |
# Google uses BCP-47 language codes | |
language_map = { | |
"tr-TR": "tr-TR", | |
"en-US": "en-US", | |
"en-GB": "en-GB", | |
"de-DE": "de-DE", | |
"fr-FR": "fr-FR", | |
"es-ES": "es-ES", | |
"it-IT": "it-IT", | |
"pt-BR": "pt-BR", | |
"ru-RU": "ru-RU", | |
"ja-JP": "ja-JP", | |
"ko-KR": "ko-KR", | |
"zh-CN": "zh-CN", | |
"ar-SA": "ar-SA", | |
} | |
return language_map.get(language, language) | |
async def start_streaming(self, config: STTConfig) -> None: | |
"""Initialize streaming session""" | |
try: | |
# Stop any existing stream | |
if self.is_streaming: | |
log_warning("โ ๏ธ Previous stream still active, stopping it first") | |
await self.stop_streaming() | |
await asyncio.sleep(0.5) | |
# Reset session data | |
self._reset_session_data() | |
log_info(f"๐ค Starting Google STT - Session #{self.session_id}") | |
# Configure recognition settings | |
language_code = self._map_language_code(config.language) | |
""" | |
# โ Google STT best practices for Turkish and single utterance | |
recognition_config = RecognitionConfig( | |
encoding=RecognitionConfig.AudioEncoding.LINEAR16, | |
sample_rate_hertz=16000, | |
language_code="tr-TR", | |
# โ Single utterance iรงin ideal ayarlar | |
enable_automatic_punctuation=True, | |
# Model selection - latest_long for better accuracy | |
model="latest_long", | |
# Use enhanced model if available (better for Turkish) | |
use_enhanced=True, | |
# Single channel audio | |
audio_channel_count=1, | |
# Alternative transcripts for debugging | |
max_alternatives=1, | |
# Profanity filter disabled for accuracy | |
profanity_filter=False, | |
# Word level confidence | |
enable_word_confidence=False, | |
enable_spoken_punctuation=False, | |
enable_spoken_emojis=False, | |
) | |
# โ Streaming config - optimized for final results only | |
self.streaming_config = StreamingRecognitionConfig( | |
config=recognition_config, | |
single_utterance=False, | |
interim_results=True | |
) | |
""" | |
# โ EN BASฤฐT CONFIG - sadece zorunlu alanlar | |
recognition_config = RecognitionConfig( | |
encoding=RecognitionConfig.AudioEncoding.LINEAR16, | |
sample_rate_hertz=16000, | |
language_code="tr-TR" | |
) | |
# โ Streaming config - en basit hali | |
self.streaming_config = StreamingRecognitionConfig( | |
config=recognition_config, | |
interim_results=True | |
) | |
log_info(f"๐ง Google STT config: language={language_code}, " | |
f"model=latest_long, enhanced=True, " | |
f"single_utterance=True, interim_results=False") | |
# Start streaming in background thread | |
self.stop_event.clear() | |
self.stream_thread = threading.Thread( | |
target=self._stream_recognition, | |
daemon=True | |
) | |
self.stream_thread.start() | |
self.is_streaming = True | |
log_info(f"โ Google STT started - Ready for speech") | |
except Exception as e: | |
log_error(f"โ Failed to start Google STT", error=str(e)) | |
self.is_streaming = False | |
raise | |
def _stream_recognition(self): | |
"""Background thread for streaming recognition""" | |
try: | |
log_debug("๐๏ธ Starting recognition stream thread") | |
# โ Config'i logla | |
log_debug(f"Config details: {self.streaming_config}") | |
# Create audio generator | |
audio_generator = self._audio_generator() | |
# โ Daha detaylฤฑ hata yakalama | |
try: | |
# Start streaming recognition | |
responses = self.client.streaming_recognize( | |
self.streaming_config, | |
audio_generator | |
) | |
except Exception as api_error: | |
log_error(f"โ Google API error: {str(api_error)}") | |
log_error(f"โ Error type: {type(api_error).__name__}") | |
if hasattr(api_error, 'details'): | |
log_error(f"โ Error details: {api_error.details()}") | |
if hasattr(api_error, '__dict__'): | |
log_error(f"โ Error attributes: {api_error.__dict__}") | |
import traceback | |
log_error(f"โ Full traceback: {traceback.format_exc()}") | |
raise | |
# Process responses | |
for response in responses: | |
if self.stop_event.is_set(): | |
break | |
if not response.results: | |
continue | |
# Process each result | |
for result in response.results: | |
if not result.alternatives: | |
continue | |
# Get best alternative | |
alternative = result.alternatives[0] | |
# Only process if we have transcript | |
if alternative.transcript: | |
# โ Interim result'larฤฑ logla ama queue'ya koyma | |
if not result.is_final: | |
log_debug(f"๐ Interim transcript (ignored): '{alternative.transcript}'") | |
continue | |
# โ Sadece final result'larฤฑ iลle | |
transcription_result = TranscriptionResult( | |
text=alternative.transcript, | |
is_final=result.is_final, | |
confidence=alternative.confidence, | |
timestamp=datetime.now().timestamp() | |
) | |
try: | |
self.results_queue.put(transcription_result) | |
if result.is_final: | |
log_info(f"๐ฏ FINAL TRANSCRIPT: '{alternative.transcript}' " | |
f"(confidence: {alternative.confidence:.2f})") | |
# Single utterance mode will end stream after this | |
break | |
else: | |
# This shouldn't happen with interim_results=False | |
log_debug(f"๐ Transcript: '{alternative.transcript}'") | |
except queue.Full: | |
log_warning("โ ๏ธ Results queue full") | |
# Check if stream ended due to single_utterance | |
if hasattr(response, 'speech_event_type'): | |
if response.speech_event_type == speech.StreamingRecognizeResponse.SpeechEventType.END_OF_SINGLE_UTTERANCE: | |
log_info("๐ End of single utterance detected") | |
break | |
except Exception as e: | |
if not self.stop_event.is_set(): | |
log_error(f"โ Recognition stream error: {str(e)}") | |
# Put error in queue | |
error_result = TranscriptionResult( | |
text="", | |
is_final=True, | |
confidence=0.0, | |
timestamp=datetime.now().timestamp() | |
) | |
self.results_queue.put(error_result) | |
finally: | |
log_debug("๐๏ธ Recognition stream thread ended") | |
self.is_streaming = False | |
def _audio_generator(self): | |
"""Generator that yields audio chunks for streaming""" | |
chunk_count = 0 | |
try: | |
while not self.stop_event.is_set(): | |
try: | |
# Get audio chunk with timeout | |
chunk = self.audio_queue.get(timeout=0.1) | |
if chunk is None: # Sentinel value | |
log_debug("๐ Audio generator received sentinel, stopping") | |
break | |
# โ Debug iรงin chunk bilgisi | |
chunk_count += 1 | |
if chunk_count <= 5: # ฤฐlk 5 chunk iรงin detaylฤฑ log | |
log_debug(f"๐ต Audio generator yielding chunk #{chunk_count}, size: {len(chunk)} bytes") | |
# Chunk'ฤฑn byte tipinde olduฤundan emin ol | |
if not isinstance(chunk, bytes): | |
log_error(f"โ Chunk is not bytes! Type: {type(chunk)}") | |
continue | |
# โ Google API'nin beklediฤi format | |
yield chunk | |
except queue.Empty: | |
continue | |
except Exception as e: | |
log_error(f"โ Audio generator error: {str(e)}") | |
break | |
finally: | |
log_debug(f"๐๏ธ Audio generator stopped after {chunk_count} chunks") | |
async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]: | |
"""Stream audio chunk and get transcription results""" | |
if not self.is_streaming: | |
raise RuntimeError("Streaming not started. Call start_streaming() first.") | |
try: | |
# โ Audio chunk tipini kontrol et | |
if not isinstance(audio_chunk, bytes): | |
log_error(f"โ Audio chunk is not bytes! Type: {type(audio_chunk)}") | |
raise TypeError(f"Expected bytes, got {type(audio_chunk)}") | |
# โ Chunk boyutunu logla | |
if self.total_chunks < 5: | |
log_debug(f"๐ฆ Adding audio chunk #{self.total_chunks} to queue, size: {len(audio_chunk)} bytes") | |
# Add audio to queue for background thread | |
self.audio_queue.put(audio_chunk) | |
self.total_chunks += 1 | |
self.total_audio_bytes += len(audio_chunk) | |
# Log progress | |
if self.total_chunks % 50 == 0: | |
log_debug(f"๐ Processing... {self.total_chunks} chunks, {self.total_audio_bytes/1024:.1f}KB") | |
# Check for results | |
while True: | |
try: | |
result = self.results_queue.get_nowait() | |
# Log for debugging | |
log_debug(f"๐ฏ Yielding result: is_final={result.is_final}, text='{result.text}'") | |
yield result | |
# If final result, stream will end | |
if result.is_final: | |
self.is_streaming = False | |
except queue.Empty: | |
break | |
except Exception as e: | |
log_error(f"โ Error streaming audio", error=str(e)) | |
self.is_streaming = False | |
raise | |
async def stop_streaming(self) -> Optional[TranscriptionResult]: | |
"""Stop streaming and clean up""" | |
if not self.is_streaming: | |
log_debug("Already stopped, nothing to do") | |
return None | |
try: | |
log_info(f"๐ Stopping Google STT session #{self.session_id}") | |
self.is_streaming = False | |
# Signal stop | |
self.stop_event.set() | |
# Send sentinel to audio queue | |
self.audio_queue.put(None) | |
# Wait for thread to finish | |
if self.stream_thread and self.stream_thread.is_alive(): | |
self.stream_thread.join(timeout=2.0) | |
# Get final result if any | |
final_result = None | |
while not self.results_queue.empty(): | |
try: | |
result = self.results_queue.get_nowait() | |
if result.is_final and result.text: | |
final_result = result | |
except queue.Empty: | |
break | |
log_info(f"โ Google STT session #{self.session_id} stopped") | |
return final_result | |
except Exception as e: | |
log_error(f"โ Error during stop_streaming", error=str(e)) | |
self.is_streaming = False | |
return None | |
def _reset_session_data(self): | |
"""Reset session-specific data""" | |
# Clear queues | |
while not self.audio_queue.empty(): | |
try: | |
self.audio_queue.get_nowait() | |
except: | |
pass | |
while not self.results_queue.empty(): | |
try: | |
self.results_queue.get_nowait() | |
except: | |
pass | |
# Reset counters | |
self.total_audio_bytes = 0 | |
self.total_chunks = 0 | |
self.session_id += 1 | |
log_debug(f"๐ Session data reset. New session ID: {self.session_id}") | |
def supports_realtime(self) -> bool: | |
"""Google STT supports real-time streaming""" | |
return True | |
def get_supported_languages(self) -> List[str]: | |
"""Get list of supported language codes""" | |
# Google Cloud Speech-to-Text supported languages (partial list) | |
# Full list: https://cloud.google.com/speech-to-text/docs/languages | |
return [ | |
"tr-TR", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", | |
"es-ES", "es-MX", "es-AR", "fr-FR", "fr-CA", "de-DE", | |
"it-IT", "pt-BR", "pt-PT", "ru-RU", "ja-JP", "ko-KR", | |
"zh-CN", "zh-TW", "ar-SA", "ar-EG", "hi-IN", "nl-NL", | |
"pl-PL", "sv-SE", "da-DK", "no-NO", "fi-FI", "el-GR", | |
"he-IL", "th-TH", "vi-VN", "id-ID", "ms-MY", "fil-PH" | |
] | |
def get_provider_name(self) -> str: | |
"""Get provider name""" | |
return "google" |