Spaces:
Paused
Paused
| """ | |
| Google Cloud Speech-to-Text Implementation | |
| """ | |
| import os | |
| import asyncio | |
| from typing import AsyncIterator, AsyncGenerator, Optional, List, Any | |
| import numpy as np | |
| from datetime import datetime | |
| import sys | |
| import queue | |
| import threading | |
| import time | |
| import traceback | |
| from utils.logger import log_info, log_error, log_debug, log_warning | |
| # Import Google Cloud Speech only if available | |
| try: | |
| from google.cloud import speech | |
| from google.api_core import exceptions | |
| GOOGLE_SPEECH_AVAILABLE = True | |
| except ImportError: | |
| GOOGLE_SPEECH_AVAILABLE = False | |
| log_info("β οΈ Google Cloud Speech library not installed") | |
| from .stt_interface import STTInterface, STTConfig, TranscriptionResult | |
| class GoogleCloudSTT(STTInterface): | |
| """Google Cloud Speech-to-Text implementation""" | |
| def __init__(self, credentials_path: Optional[str] = None): | |
| """Initialize Google Cloud STT""" | |
| log_info("π€ Creating STT provider: google") | |
| # Initialize all required attributes | |
| self.client = None | |
| self.streaming_config = None | |
| self.stream_thread = None | |
| self.audio_queue = queue.Queue() | |
| self.responses_queue = queue.Queue() | |
| self.is_streaming = False | |
| self.should_stop = False | |
| self.error_message = None | |
| self.session_id = 0 | |
| self.stream_start_time = None | |
| # Additional attributes | |
| self.lock = threading.Lock() | |
| self.single_utterance = False | |
| self.chunk_count = 0 | |
| self.total_bytes = 0 | |
| self.stop_event = threading.Event() | |
| # Set Google credentials | |
| if credentials_path: | |
| if os.path.exists(credentials_path): | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path | |
| log_info(f"β Google credentials set from: {credentials_path}") | |
| else: | |
| log_error(f"β Credentials file not found: {credentials_path}") | |
| raise ValueError(f"Google credentials file not found: {credentials_path}") | |
| else: | |
| # Fallback to environment variable | |
| creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") | |
| if not creds_path: | |
| creds_path = "./credentials/google-service-account.json" | |
| if os.path.exists(creds_path): | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path | |
| log_info(f"β Google credentials set from default: {creds_path}") | |
| else: | |
| raise ValueError("Google credentials not found. Please provide credentials_path") | |
| # Test credentials | |
| try: | |
| log_info("π Testing Google credentials...") | |
| test_client = speech.SpeechClient() | |
| log_info("β Google credentials valid") | |
| except Exception as e: | |
| log_error(f"β Invalid Google credentials: {e}") | |
| raise | |
| def _get_encoding(self, encoding_str: str): | |
| """Convert encoding string to Google Speech enum""" | |
| if not GOOGLE_SPEECH_AVAILABLE: | |
| return None | |
| encoding_map = { | |
| "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS, | |
| "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16, | |
| "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC, | |
| "MP3": speech.RecognitionConfig.AudioEncoding.MP3, | |
| "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS, | |
| } | |
| return encoding_map.get(encoding_str, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS) | |
| # Alias for compatibility | |
| _get_google_encoding = _get_encoding | |
| async def stream_audio(self, audio_chunk: bytes) -> AsyncGenerator[TranscriptionResult, None]: | |
| """Stream audio chunk and get results""" | |
| if not self.is_streaming: | |
| raise Exception("Streaming not started") | |
| try: | |
| chunk_size = len(audio_chunk) | |
| # Log first chunk details | |
| if self.chunk_count == 0: | |
| log_info(f"π€ First chunk - size: {chunk_size} bytes") | |
| if audio_chunk.startswith(b'\x1a\x45\xdf\xa3'): | |
| log_info("β Valid WEBM header detected") | |
| else: | |
| hex_preview = audio_chunk[:20].hex() | |
| log_warning(f"β οΈ Unexpected audio format. First 20 bytes: {hex_preview}") | |
| # Try to measure audio level (if it's raw PCM) | |
| try: | |
| if encoding_str == "LINEAR16": # Only for raw PCM | |
| audio_array = np.frombuffer(audio_chunk, dtype=np.int16) | |
| if len(audio_array) > 0: | |
| rms = np.sqrt(np.mean(audio_array.astype(float) ** 2)) | |
| db = 20 * np.log10(max(rms, 1) / 32768.0) | |
| if self.chunk_count % 50 == 0: | |
| log_info(f"π Audio level: {db:.1f} dB") | |
| except: | |
| pass | |
| # Put chunk in queue | |
| self.audio_queue.put(audio_chunk) | |
| self.chunk_count += 1 | |
| self.total_bytes += chunk_size | |
| # Log progress | |
| if self.chunk_count % 50 == 0: | |
| log_info(f"π€ Progress: {self.chunk_count} chunks, {self.total_bytes/1024:.1f}KB total") | |
| # Check for responses | |
| timeout = 0.1 | |
| end_time = time.time() + timeout | |
| while time.time() < end_time: | |
| try: | |
| result = self.responses_queue.get_nowait() | |
| log_info(f"π― Got result from queue: is_final={result.is_final}, text='{result.text[:30]}...'") | |
| yield result | |
| except queue.Empty: | |
| await asyncio.sleep(0.01) | |
| except Exception as e: | |
| log_error(f"Error getting result from queue: {e}") | |
| break | |
| except Exception as e: | |
| log_error(f"β Error in stream_audio: {e}") | |
| raise | |
| async def stop_streaming(self) -> Optional[TranscriptionResult]: | |
| """Stop streaming and clean up all resources""" | |
| if not self.is_streaming and not self.stream_thread: | |
| log_debug("Already stopped, nothing to do") | |
| return None | |
| try: | |
| log_info(f"π Stopping Google STT streaming session #{self.session_id}") | |
| # Set flags | |
| self.is_streaming = False | |
| self.should_stop = True | |
| self.stop_event.set() | |
| # Send poison pill | |
| if self.audio_queue: | |
| try: | |
| self.audio_queue.put(None) | |
| except: | |
| pass | |
| # Wait for thread | |
| if self.stream_thread and self.stream_thread.is_alive(): | |
| log_info("β³ Waiting for stream thread to finish...") | |
| self.stream_thread.join(timeout=5.0) | |
| if self.stream_thread.is_alive(): | |
| log_warning("β οΈ STT thread did not stop gracefully after 5s") | |
| else: | |
| log_info("β Stream thread finished") | |
| # Get final result | |
| final_result = None | |
| if self.responses_queue: | |
| while not self.responses_queue.empty(): | |
| try: | |
| result = self.responses_queue.get_nowait() | |
| if result.is_final: | |
| final_result = result | |
| except queue.Empty: | |
| break | |
| # Close client | |
| if self.client: | |
| try: | |
| if hasattr(self.client, 'transport') and hasattr(self.client.transport, 'close'): | |
| self.client.transport.close() | |
| log_debug("β Client transport closed") | |
| if hasattr(self.client, '_transport') and hasattr(self.client._transport, '_grpc_channel'): | |
| self.client._transport._grpc_channel.close() | |
| log_debug("β gRPC channel closed") | |
| except Exception as e: | |
| log_warning(f"β οΈ Error closing Google client: {e}") | |
| finally: | |
| self.client = None | |
| # Reset state | |
| self.audio_queue = None | |
| self.responses_queue = None | |
| self.stream_thread = None | |
| self.streaming_config = None | |
| self.stop_event.clear() | |
| log_info(f"β Google STT streaming session #{self.session_id} stopped and cleaned") | |
| return final_result | |
| except Exception as e: | |
| log_error(f"β Error during stop_streaming", error=str(e)) | |
| self.is_streaming = False | |
| self.stream_thread = None | |
| self.client = None | |
| self.streaming_config = None | |
| self.stop_event.clear() | |
| self.audio_queue = None | |
| self.responses_queue = None | |
| return None | |
| def supports_realtime(self) -> bool: | |
| """Google Cloud STT supports real-time streaming""" | |
| return True | |
| def get_supported_languages(self) -> List[str]: | |
| """Get list of supported language codes""" | |
| return [ | |
| "tr-TR", "en-US", "en-GB", "de-DE", "fr-FR", "es-ES", | |
| "it-IT", "pt-BR", "ru-RU", "ja-JP", "ko-KR", "zh-CN", "ar-SA" | |
| ] | |
| def get_provider_name(self) -> str: | |
| """Get provider name""" | |
| return "google" | |
| def _reset_session(self): | |
| """Reset session data""" | |
| # Clear queues | |
| while not self.audio_queue.empty(): | |
| try: | |
| self.audio_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| while not self.responses_queue.empty(): | |
| try: | |
| self.responses_queue.get_nowait() | |
| except queue.Empty: | |
| break | |
| # Reset state | |
| self.should_stop = False | |
| self.error_message = None | |
| self.session_id += 1 | |
| self.stream_start_time = time.time() | |
| self.chunk_count = 0 | |
| self.total_bytes = 0 | |
| log_info(f"π Google STT session data reset. New session ID: {self.session_id}") | |
| # Create fresh queues | |
| self.audio_queue = queue.Queue() | |
| self.responses_queue = queue.Queue() | |
| log_debug("β Created fresh queues") | |
| def _create_fresh_queues(self): | |
| """Create fresh queue instances""" | |
| if self.audio_queue: | |
| while not self.audio_queue.empty(): | |
| try: | |
| self.audio_queue.get_nowait() | |
| except: | |
| pass | |
| if self.responses_queue: | |
| while not self.responses_queue.empty(): | |
| try: | |
| self.responses_queue.get_nowait() | |
| except: | |
| pass | |
| self.audio_queue = queue.Queue(maxsize=1000) | |
| self.responses_queue = queue.Queue(maxsize=100) | |
| log_debug("β Created fresh queues") | |
| def _request_generator(self): | |
| """Generate requests for the streaming recognize API""" | |
| # First request with config | |
| yield speech.StreamingRecognizeRequest(streaming_config=self.streaming_config) | |
| # Audio chunks | |
| while not self.should_stop: | |
| try: | |
| audio_chunk = self.audio_queue.get(timeout=0.1) | |
| if audio_chunk is None: | |
| log_info("π Poison pill received, stopping request generator") | |
| break | |
| yield speech.StreamingRecognizeRequest(audio_content=audio_chunk) | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| log_error(f"Error in request generator: {e}") | |
| break | |
| log_info(f"π Request generator finished. Total chunks: {self.chunk_count}, Total bytes: {self.total_bytes}") | |
| async def start_streaming(self, config: STTConfig) -> None: | |
| """Initialize streaming session with clean state""" | |
| try: | |
| # Thread safety iΓ§in lock kullan | |
| async with asyncio.Lock(): | |
| # Clean up any existing stream | |
| if self.is_streaming or self.stream_thread: | |
| log_warning("β οΈ Previous stream still active, stopping it first") | |
| await self.stop_streaming() | |
| await asyncio.sleep(0.5) | |
| # Double-check after cleanup | |
| if self.stream_thread and self.stream_thread.is_alive(): | |
| log_error(f"β Stream thread STILL running after cleanup! Thread: {self.stream_thread.name}") | |
| raise Exception("Failed to stop previous stream thread") | |
| # Reset session | |
| self._reset_session() | |
| self.single_utterance = config.single_utterance | |
| self.current_encoding = config.encoding | |
| log_info(f"π€ Starting Google STT streaming session #{self.session_id} with config: {config}") | |
| # Create fresh queues | |
| self._create_fresh_queues() | |
| self.stop_event.clear() | |
| self.should_stop = False | |
| # Create new client | |
| self.client = speech.SpeechClient() | |
| log_info("β Created new Google Speech client") | |
| # Create recognition config | |
| recognition_config = speech.RecognitionConfig( | |
| encoding=speech.RecognitionConfig.AudioEncoding.WEBM_OPUS, | |
| sample_rate_hertz=16000, | |
| language_code="tr-TR", | |
| enable_automatic_punctuation=True, | |
| model="latest_long", | |
| use_enhanced=True, | |
| max_alternatives=1, | |
| metadata=speech.RecognitionMetadata( | |
| interaction_type=speech.RecognitionMetadata.InteractionType.VOICE_SEARCH, | |
| microphone_distance=speech.RecognitionMetadata.MicrophoneDistance.NEARFIELD, | |
| recording_device_type=speech.RecognitionMetadata.RecordingDeviceType.PC, | |
| ) | |
| ) | |
| # Create streaming config with VAD | |
| self.streaming_config = speech.StreamingRecognitionConfig( | |
| config=recognition_config, | |
| interim_results=True, | |
| single_utterance=False, | |
| enable_voice_activity_events=True # β VAD events enabled | |
| ) | |
| self.is_streaming = True | |
| self.stop_event.clear() | |
| # Thread baΕlatmadan ΓΆnce son kontrol | |
| if self.stream_thread is not None: | |
| log_error("β stream_thread should be None at this point!") | |
| self.stream_thread = None | |
| self.is_streaming = True | |
| # Start streaming thread with unique ID | |
| thread_id = f"GoogleSTT-Session-{self.session_id}-{int(time.time()*1000)}" | |
| self.stream_thread = threading.Thread( | |
| target=self._run_stream, | |
| name=thread_id | |
| ) | |
| self.stream_thread.daemon = True | |
| log_info(f"π Starting thread: {thread_id}") | |
| self.stream_thread.start() | |
| log_info(f"β Google STT streaming session #{self.session_id} started successfully") | |
| except Exception as e: | |
| log_error(f"β Failed to start Google STT streaming", error=str(e)) | |
| self.is_streaming = False | |
| self.client = None | |
| self._create_fresh_queues() | |
| raise | |
| def _run_stream(self): | |
| """Run the streaming recognition loop in a separate thread""" | |
| try: | |
| thread_id = threading.current_thread().ident | |
| log_info(f"π€ Google STT stream thread started - Thread ID: {thread_id}, Session: {self.session_id}") | |
| # Create request generator | |
| requests = self._request_generator() | |
| # Create streaming client | |
| log_info(f"π€ Creating Google STT streaming client... Thread ID: {thread_id}") | |
| # Get responses (no timeout parameter!) | |
| responses = self.client.streaming_recognize(requests) | |
| # Track responses | |
| first_response_time = None | |
| response_count = 0 | |
| # Process responses | |
| for response in responses: | |
| if self.should_stop: | |
| log_info("π Stop flag detected, ending stream") | |
| break | |
| response_count += 1 | |
| if first_response_time is None: | |
| first_response_time = time.time() | |
| elapsed = first_response_time - self.stream_start_time | |
| log_info(f"π FIRST RESPONSE from Google STT after {elapsed:.2f}s") | |
| # Check for VAD events | |
| if hasattr(response, 'speech_event_type') and response.speech_event_type: | |
| event_type = response.speech_event_type | |
| log_info(f"ποΈ VAD Event: {event_type}") | |
| if event_type == speech.StreamingRecognizeResponse.SpeechEventType.END_OF_SINGLE_UTTERANCE: | |
| log_info("π End of utterance detected by VAD") | |
| # Log response | |
| has_results = len(response.results) > 0 if hasattr(response, 'results') else False | |
| log_info(f"π¨ Google STT Response #{response_count}: has_results={has_results}") | |
| if not response.results: | |
| continue | |
| # Process results | |
| for result_idx, result in enumerate(response.results): | |
| # Check result type | |
| result_type = "π INTERIM" if not result.is_final else "β FINAL" | |
| stability = getattr(result, 'stability', 0.0) | |
| log_info(f"{result_type} Result #{result_idx}: " | |
| f"alternatives={len(result.alternatives)}, " | |
| f"stability={stability:.3f}") | |
| if result.alternatives: | |
| best_alternative = result.alternatives[0] | |
| transcript = best_alternative.transcript | |
| confidence = best_alternative.confidence if result.is_final else stability | |
| # Log transcript | |
| if result.is_final: | |
| log_info(f"β FINAL TRANSCRIPT: '{transcript}' " | |
| f"(confidence: {confidence:.3f})") | |
| else: | |
| log_info(f"π INTERIM TRANSCRIPT: '{transcript[:100]}...' " | |
| f"(stability: {stability:.3f})") | |
| # Queue result | |
| result_obj = TranscriptionResult( | |
| text=transcript, | |
| is_final=result.is_final, | |
| confidence=confidence, | |
| timestamp=datetime.utcnow() | |
| ) | |
| self.responses_queue.put(result_obj) | |
| log_info(f"π₯ {'FINAL' if result.is_final else 'INTERIM'} result queued") | |
| # Log completion | |
| if response_count == 0: | |
| log_error("β Google STT stream ended without ANY responses!") | |
| else: | |
| log_info(f"β Google STT stream ended normally after {response_count} responses") | |
| except Exception as e: | |
| log_error(f"β Google STT error: {e}") | |
| if hasattr(e, 'details'): | |
| log_error(f"Error details: {e.details}") | |
| self.error_message = str(e) | |
| finally: | |
| log_info("π€ Google STT stream thread ended") | |
| with self.lock: | |
| self.is_streaming = False |