import os import io import time import torch import librosa import requests import tempfile import threading import queue import traceback import numpy as np import soundfile as sf import gradio as gr from datetime import datetime from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer, pipeline, logging as trf_logging from huggingface_hub import login, hf_hub_download, scan_cache_dir import speech_recognition as sr import openai import torch print("CUDA available:", torch.cuda.is_available()) print("CUDA device:", torch.cuda.current_device() if torch.cuda.is_available() else "None") # Set up environment variables and timeouts os.environ["HF_HUB_DOWNLOAD_TIMEOUT"] = "300" # 5-minute timeout # Enable verbose logging trf_logging.set_verbosity_info() # Get API keys from environment HF_TOKEN = os.getenv("HF_TOKEN") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # Set OpenAI API key openai.api_key = OPENAI_API_KEY # Login to Hugging Face if HF_TOKEN: print("๐Ÿ” Logging into Hugging Face with token...") login(token=HF_TOKEN) else: print("โš ๏ธ HF_TOKEN not found. Proceeding without login...") # # Set up device (GPU if available, otherwise CPU) # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # print(f"๐Ÿ”ง Using device: {device}") # Initialize model variables tts_model = None asr_model = None # Define repository IDs tts_repo_id = "ai4bharat/IndicF5" asr_repo_id = "facebook/wav2vec2-large-xlsr-53" # Multilingual ASR model # TTS model wrapper class to standardize the interface class TTSModelWrapper: def __init__(self, model): self.model = model def generate(self, text, ref_audio_path, ref_text): try: if self.model is None: raise ValueError("Model not initialized") output = self.model( text, ref_audio_path=ref_audio_path, ref_text=ref_text ) return output except Exception as e: print(f"Error in TTS generation: {e}") traceback.print_exc() return None def load_tts_model_with_retry(max_retries=3, retry_delay=5): global tts_model, tts_model_wrapper print("Checking if TTS model is in cache...") try: cache_info = scan_cache_dir() model_in_cache = any(tts_repo_id in repo.repo_id for repo in cache_info.repos) if model_in_cache: print(f"Model {tts_repo_id} found in cache, loading locally...") tts_model = AutoModel.from_pretrained( tts_repo_id, trust_remote_code=True, local_files_only=True, device_map="auto", torch_dtype=torch.float16 ) tts_model_wrapper = TTSModelWrapper(tts_model) print("TTS model loaded from cache successfully!") return except Exception as e: print(f"Cache check failed: {e}") for attempt in range(max_retries): try: print(f"Loading {tts_repo_id} model (attempt {attempt+1}/{max_retries})...") tts_model = AutoModel.from_pretrained( tts_repo_id, trust_remote_code=True, revision="main", use_auth_token=HF_TOKEN, low_cpu_mem_usage=True, device_map="auto" # <-- Use device_map here as well ) tts_model_wrapper = TTSModelWrapper(tts_model) print(f"TTS model loaded successfully! Type: {type(tts_model)}") return except Exception as e: print(f"โš ๏ธ Attempt {attempt+1}/{max_retries} failed: {e}") if attempt < max_retries - 1: print(f"Waiting {retry_delay} seconds before retrying...") time.sleep(retry_delay) retry_delay *= 1.5 try: print("Trying with fallback options...") tts_model = AutoModel.from_pretrained( tts_repo_id, trust_remote_code=True, revision="main", local_files_only=False, use_auth_token=HF_TOKEN, force_download=False, resume_download=True, device_map="auto" # <-- And here too ) tts_model_wrapper = TTSModelWrapper(tts_model) print("TTS model loaded with fallback options!") except Exception as e2: print(f"โŒ All attempts to load TTS model failed: {e2}") print("Will continue without TTS model loaded.") # Reduce chunk size for faster streaming and lower latency def split_into_chunks(text, max_length=15): # Reduced from 30 to 15 sentence_markers = ['.', '?', '!', ';', ':', 'เฅค', 'เฅฅ'] chunks = [] current = "" for char in text: current += char if char in sentence_markers and current.strip(): chunks.append(current.strip()) current = "" if current.strip(): chunks.append(current.strip()) final_chunks = [] for chunk in chunks: if len(chunk) <= max_length: final_chunks.append(chunk) else: comma_splits = chunk.split(',') current_part = "" for part in comma_splits: if len(current_part) + len(part) <= max_length: if current_part: current_part += "," current_part += part else: if current_part: final_chunks.append(current_part.strip()) current_part = part if current_part: final_chunks.append(current_part.strip()) print(f"Split text into {len(final_chunks)} chunks") return final_chunks def load_asr_model(): global asr_model try: print(f"Loading ASR model from {asr_repo_id}...") asr_model = pipeline("automatic-speech-recognition", model=asr_repo_id, device=device) print("ASR model loaded successfully!") except Exception as e: print(f"Error loading ASR model: {e}") print("Will use Google's speech recognition API instead.") asr_model = None class SpeechRecognizer: def __init__(self): self.recognizer = sr.Recognizer() self.using_huggingface = asr_model is not None def recognize_from_file(self, audio_path, language="ml-IN"): """Recognize speech from audio file with fallback mechanisms""" print(f"Recognizing speech from {audio_path}") try: # Try Hugging Face model first if available if self.using_huggingface: try: result = asr_model(audio_path) transcription = result["text"] print(f"HF ASR result: {transcription}") return transcription except Exception as e: print(f"HF ASR failed: {e}, falling back to Google") # Fallback to Google's ASR with sr.AudioFile(audio_path) as source: audio_data = self.recognizer.record(source) text = self.recognizer.recognize_google(audio_data, language=language) print(f"Google ASR result: {text}") return text except Exception as e: print(f"Speech recognition failed: {e}") return "" def recognize_from_microphone(self, language="ml-IN", timeout=5): """Recognize speech from microphone""" print("Listening to microphone...") try: with sr.Microphone() as source: self.recognizer.adjust_for_ambient_noise(source) print("Speak now...") try: audio = self.recognizer.listen(source, timeout=timeout) print("Processing speech...") # Save audio to temporary file for potential HF model processing temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') temp_file.close() with open(temp_file.name, "wb") as f: f.write(audio.get_wav_data()) # Process with available model if self.using_huggingface: try: result = asr_model(temp_file.name) text = result["text"] print(f"HF ASR result: {text}") os.unlink(temp_file.name) return text except Exception as e: print(f"HF ASR failed: {e}, falling back to Google") # Fallback to Google text = self.recognizer.recognize_google(audio, language=language) print(f"Google ASR result: {text}") os.unlink(temp_file.name) return text except sr.WaitTimeoutError: print("No speech detected within timeout period") return "" except Exception as e: print(f"Speech recognition error: {e}") return "" except Exception as e: print(f"Microphone access error: {e}") return "" class ConversationManager: def __init__(self): self.conversation_history = [] self.system_prompt = ( #"You are a helpful, friendly assistant who speaks Malayalam fluently. " #"Keep your responses concise and conversational. " #"If the user speaks in English, you can respond in English. " #"If the user speaks in Malayalam, respond in Malayalam." "You are a helpful and friendly assistant who speaks Malayalam fluently. " "Respond like you're talking to a close friend over the phone โ€” casual, warm, and natural. " "Keep your responses short, to the point, and avoid sounding robotic or formal. " "Use Malayalam when the user uses Malayalam, and English when the user uses English. " "Use the kind of expressions and tone you'd use while chatting with someone from Kerala." ) def add_message(self, role, content): self.conversation_history.append({"role": role, "content": content}) def get_formatted_history(self): """Format conversation history for OpenAI API""" messages = [{"role": "system", "content": self.system_prompt}] for msg in self.conversation_history: if msg["role"] == "user": messages.append({"role": "user", "content": msg["content"]}) else: messages.append({"role": "assistant", "content": msg["content"]}) return messages def generate_response(self, user_input): """Generate response using GPT-3.5 Turbo""" if not openai.api_key: return "I'm sorry, but the language model is not available right now." self.add_message("user", user_input) try: # Format history for the model messages = self.get_formatted_history() print(f"Sending prompt to OpenAI: {len(messages)} messages") # Generate response with GPT-3.5 Turbo response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, max_tokens=300, temperature=0.7, top_p=0.9, ) # Extract text response response_text = response.choices[0].message["content"].strip() print(f"GPT-3.5 response: {response_text}") # Add to history self.add_message("assistant", response_text) return response_text except Exception as e: print(f"Error generating response: {e}") fallback_response = "I'm having trouble thinking right now. Can we try again?" self.add_message("assistant", fallback_response) return fallback_response def remove_noise(audio_data, threshold=0.01): """Apply simple noise gate to remove low-level noise""" if audio_data is None: return np.zeros(1000) # Convert to numpy if needed if isinstance(audio_data, torch.Tensor): audio_data = audio_data.detach().cpu().numpy() if isinstance(audio_data, list): audio_data = np.array(audio_data) # Apply noise gate noise_mask = np.abs(audio_data) < threshold clean_audio = audio_data.copy() clean_audio[noise_mask] = 0 return clean_audio def apply_smoothing(audio_data, window_size=5): """Apply gentle smoothing to reduce artifacts""" if audio_data is None or len(audio_data) < window_size*2: return audio_data # Simple moving average filter kernel = np.ones(window_size) / window_size smoothed = np.convolve(audio_data, kernel, mode='same') # Keep original at the edges smoothed[:window_size] = audio_data[:window_size] smoothed[-window_size:] = audio_data[-window_size:] return smoothed def enhance_audio(audio_data): """Process audio to improve quality and reduce noise""" if audio_data is None: return np.zeros(1000) # Ensure numpy array if isinstance(audio_data, torch.Tensor): audio_data = audio_data.detach().cpu().numpy() if isinstance(audio_data, list): audio_data = np.array(audio_data) # Ensure correct shape and dtype if len(audio_data.shape) > 1: audio_data = audio_data.flatten() if audio_data.dtype != np.float32: audio_data = audio_data.astype(np.float32) # Skip processing if audio is empty or too short if audio_data.size < 100: return audio_data # Check if the audio has reasonable amplitude rms = np.sqrt(np.mean(audio_data**2)) print(f"Initial RMS: {rms}") # Apply gain if needed if rms < 0.05: # Very quiet target_rms = 0.2 gain = target_rms / max(rms, 0.0001) print(f"Applying gain factor: {gain}") audio_data = audio_data * gain # Remove DC offset audio_data = audio_data - np.mean(audio_data) # Apply noise gate to remove low-level noise audio_data = remove_noise(audio_data, threshold=0.01) # Apply gentle smoothing to reduce artifacts audio_data = apply_smoothing(audio_data, window_size=3) # Apply soft limiting to prevent clipping max_amp = np.max(np.abs(audio_data)) if max_amp > 0.95: audio_data = 0.95 * audio_data / max_amp # Apply subtle compression for better audibility audio_data = np.tanh(audio_data * 1.1) * 0.9 return audio_data def split_into_chunks(text, max_length=8): """Split text into smaller chunks based on punctuation and length""" # First split by sentences sentence_markers = ['.', '?', '!', ';', ':', 'เฅค', 'เฅฅ'] chunks = [] current = "" # Initial coarse splitting by sentence markers for char in text: current += char if char in sentence_markers and current.strip(): chunks.append(current.strip()) current = "" if current.strip(): chunks.append(current.strip()) # Further break down long sentences final_chunks = [] for chunk in chunks: if len(chunk) <= max_length: final_chunks.append(chunk) else: # Try splitting by commas for long sentences comma_splits = chunk.split(',') current_part = "" for part in comma_splits: if len(current_part) + len(part) <= max_length: if current_part: current_part += "," current_part += part else: if current_part: final_chunks.append(current_part.strip()) current_part = part if current_part: final_chunks.append(current_part.strip()) print(f"Split text into {len(final_chunks)} chunks") return final_chunks class StreamingTTS: def __init__(self): self.is_generating = False self.should_stop = False self.temp_dir = None self.ref_audio_path = None self.output_file = None self.all_chunks = [] self.sample_rate = 24000 # Default sample rate self.current_text = "" # Track current text being processed # Create temp directory try: self.temp_dir = tempfile.mkdtemp() print(f"Created temp directory: {self.temp_dir}") except Exception as e: print(f"Error creating temp directory: {e}") self.temp_dir = "." # Use current directory as fallback def prepare_ref_audio(self, ref_audio, ref_sr): """Prepare reference audio with enhanced quality""" try: if self.ref_audio_path is None: self.ref_audio_path = os.path.join(self.temp_dir, "ref_audio.wav") # Process the reference audio to ensure clean quality ref_audio = enhance_audio(ref_audio) # Save the reference audio sf.write(self.ref_audio_path, ref_audio, ref_sr, format='WAV', subtype='FLOAT') print(f"Saved reference audio to: {self.ref_audio_path}") # Verify file was created if os.path.exists(self.ref_audio_path): print(f"Reference audio saved successfully: {os.path.getsize(self.ref_audio_path)} bytes") else: print("โš ๏ธ Failed to create reference audio file!") # Create output file if self.output_file is None: self.output_file = os.path.join(self.temp_dir, "output.wav") print(f"Output will be saved to: {self.output_file}") except Exception as e: print(f"Error preparing reference audio: {e}") def cleanup(self): """Clean up temporary files""" if self.temp_dir: try: if os.path.exists(self.ref_audio_path): os.remove(self.ref_audio_path) if os.path.exists(self.output_file): os.remove(self.output_file) os.rmdir(self.temp_dir) self.temp_dir = None print("Cleaned up temporary files") except Exception as e: print(f"Error cleaning up: {e}") def generate(self, text, ref_audio, ref_sr, ref_text): """Start generation in a new thread with validation""" if self.is_generating: print("Already generating speech, please wait") return # Store the text for verification self.current_text = text print(f"Setting current text to: '{self.current_text}'") # Check model is loaded if tts_model_wrapper is None or tts_model is None: print("โš ๏ธ Model is not loaded. Cannot generate speech.") return self.is_generating = True self.should_stop = False self.all_chunks = [] # Start in a new thread threading.Thread( target=self._process_streaming, args=(text, ref_audio, ref_sr, ref_text), daemon=True ).start() def _process_streaming(self, text, ref_audio, ref_sr, ref_text): """Process text in chunks with high-quality audio generation""" try: # Double check text matches what we expect if text != self.current_text: print(f"โš ๏ธ Text mismatch detected! Expected: '{self.current_text}', Got: '{text}'") # Use the stored text to be safe text = self.current_text # Prepare reference audio self.prepare_ref_audio(ref_audio, ref_sr) # Print the text we're actually going to process print(f"Processing text: '{text}'") # Split text into smaller chunks for faster processing chunks = split_into_chunks(text) print(f"Processing {len(chunks)} chunks") combined_audio = None total_start_time = time.time() # Process each chunk for i, chunk in enumerate(chunks): if self.should_stop: print("Stopping generation as requested") break chunk_start = time.time() print(f"Processing chunk {i+1}/{len(chunks)}: '{chunk}'") # Generate speech for this chunk try: # Set timeout for inference chunk_timeout = 30 # 30 seconds timeout per chunk with torch.inference_mode(): # Explicitly pass the chunk text chunk_audio = tts_model_wrapper.generate( text=chunk, # Make sure we're using the current chunk ref_audio_path=self.ref_audio_path, ref_text=ref_text ) if chunk_audio is None or (hasattr(chunk_audio, 'size') and chunk_audio.size == 0): print("โš ๏ธ Empty audio returned for this chunk") chunk_audio = np.zeros(int(24000 * 0.5)) # 0.5s silence # Process the audio to improve quality chunk_audio = enhance_audio(chunk_audio) chunk_time = time.time() - chunk_start print(f"โœ“ Chunk {i+1} processed in {chunk_time:.2f}s") # Add small silence between chunks silence = np.zeros(int(24000 * 0.1)) # 0.1s silence chunk_audio = np.concatenate([chunk_audio, silence]) # Add to our collection self.all_chunks.append(chunk_audio) # Combine all chunks so far if combined_audio is None: combined_audio = chunk_audio else: combined_audio = np.concatenate([combined_audio, chunk_audio]) # Process combined audio for consistent quality processed_audio = enhance_audio(combined_audio) # Write intermediate output sf.write(self.output_file, processed_audio, 24000, format='WAV', subtype='FLOAT') except Exception as e: print(f"Error processing chunk {i+1}: {str(e)[:100]}") continue total_time = time.time() - total_start_time print(f"Total generation time: {total_time:.2f}s") except Exception as e: print(f"Error in streaming TTS: {str(e)[:200]}") # Try to write whatever we have so far if len(self.all_chunks) > 0: try: combined = np.concatenate(self.all_chunks) sf.write(self.output_file, combined, 24000, format='WAV', subtype='FLOAT') print("Saved partial output") except Exception as e2: print(f"Failed to save partial output: {e2}") finally: self.is_generating = False print("Generation complete") def get_current_audio(self): """Get current audio file path for Gradio""" if self.output_file and os.path.exists(self.output_file): file_size = os.path.getsize(self.output_file) if file_size > 0: return self.output_file return None class ConversationEngine: def __init__(self): self.conversation_history = [] self.system_prompt = "You are a helpful assistant that speaks Malayalam fluently. Always respond in Malayalam script with proper formatting." self.saved_voice = None self.saved_voice_text = "" self.tts_cache = {} # Cache for TTS outputs # TTS background processing queue self.tts_queue = queue.Queue() self.tts_thread = threading.Thread(target=self.tts_worker, daemon=True) self.tts_thread.start() # Initialize streaming TTS self.streaming_tts = StreamingTTS() def tts_worker(self): """Background worker to process TTS requests""" while True: try: # Get text and callback from queue text, callback = self.tts_queue.get() # Generate speech audio_path = self._generate_tts(text) # Execute callback with result if callback: callback(audio_path) # Mark task as done self.tts_queue.task_done() except Exception as e: print(f"Error in TTS worker: {e}") traceback.print_exc() def transcribe_audio(self, audio_data, language="ml-IN"): """Convert audio to text using speech recognition""" if audio_data is None: print("No audio data received") return "No audio detected", "" # Make sure we have audio data in the expected format try: if isinstance(audio_data, tuple) and len(audio_data) == 2: # Expected format: (sample_rate, audio_samples) sample_rate, audio_samples = audio_data else: print(f"Unexpected audio format: {type(audio_data)}") return "Invalid audio format", "" if len(audio_samples) == 0: print("Empty audio samples") return "No speech detected", "" # Save the audio temporarily temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") temp_file.close() # Save the audio data to the temporary file sf.write(temp_file.name, audio_samples, sample_rate) # Use speech recognition on the file recognizer = sr.Recognizer() with sr.AudioFile(temp_file.name) as source: audio = recognizer.record(source) text = recognizer.recognize_google(audio, language=language) print(f"Recognized: {text}") return text, text except sr.UnknownValueError: print("Speech recognition could not understand audio") return "Could not understand audio", "" except sr.RequestError as e: print(f"Could not request results from Google Speech Recognition service: {e}") return f"Speech recognition service error: {str(e)}", "" except Exception as e: print(f"Error processing audio: {e}") traceback.print_exc() return f"Error processing audio: {str(e)}", "" finally: # Clean up temporary file if 'temp_file' in locals() and os.path.exists(temp_file.name): try: os.unlink(temp_file.name) except Exception as e: print(f"Error deleting temporary file: {e}") def save_reference_voice(self, audio_data, reference_text): """Save the reference voice for future TTS generation""" if audio_data is None or not reference_text.strip(): return "Error: Both reference audio and text are required" self.saved_voice = audio_data self.saved_voice_text = reference_text.strip() # Clear TTS cache when voice changes self.tts_cache.clear() # Debug info sample_rate, audio_samples = audio_data print(f"Saved reference voice: {len(audio_samples)} samples at {sample_rate}Hz") print(f"Reference text: {reference_text}") return f"Voice saved successfully! Reference text: {reference_text}" def process_text_input(self, text): """Process text input from user""" if text and text.strip(): return text, text return "No input provided", "" def generate_response(self, input_text): """Generate AI response using GPT-3.5 Turbo""" if not input_text or not input_text.strip(): return "เด‡เตปเดชเตเดŸเตเดŸเต เดฒเดญเดฟเดšเตเดšเดฟเดฒเตเดฒ. เดตเต€เดฃเตเดŸเตเด‚ เดถเตเดฐเดฎเดฟเด•เตเด•เตเด•.", None # "No input received. Please try again." try: # Prepare conversation context from history messages = [{"role": "system", "content": self.system_prompt}] # Add previous conversations for context for entry in self.conversation_history: role = "user" if entry["role"] == "user" else "assistant" messages.append({"role": role, "content": entry["content"]}) # Add current input messages.append({"role": "user", "content": input_text}) # Call OpenAI API response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, max_tokens=500, temperature=0.7 ) response_text = response.choices[0].message["content"].strip() return response_text, None except Exception as e: error_msg = f"เดŽเดฑเตผ: GPT เดฎเต‹เดกเดฒเดฟเตฝ เดจเดฟเดจเตเดจเต เด‰เดคเตเดคเดฐเด‚ เดฒเดญเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เดชเตเดฐเดถเตเดจเดฎเตเดฃเตเดŸเดพเดฏเดฟ: {str(e)}" print(f"Error in GPT response: {e}") traceback.print_exc() return error_msg, None def resample_audio(self, audio, orig_sr, target_sr): """Resample audio to match target sample rate only if necessary""" if orig_sr != target_sr: print(f"Resampling audio from {orig_sr}Hz to {target_sr}Hz") return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) return audio def _generate_tts(self, text): """Internal method to generate TTS without threading""" if not text or not text.strip(): print("No text provided for TTS generation") return None # Check cache first if text in self.tts_cache: print("Using cached TTS output") return self.tts_cache[text] try: # Check if we have a saved voice and the TTS model if self.saved_voice is not None and tts_model is not None: sample_rate, audio_data = self.saved_voice # Create a temporary file for the reference audio ref_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") ref_temp_file.close() print(f"Saving reference audio to {ref_temp_file.name}") # Save the reference audio data sf.write(ref_temp_file.name, audio_data, sample_rate) # Create a temporary file for the output audio output_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") output_temp_file.close() try: # Generate speech using IndicF5 - simplified approach from second file print(f"Generating speech with IndicF5. Text: {text[:30]}...") start_time = time.time() # Use torch.no_grad() to save memory and computation with torch.no_grad(): # Run the inference using the wrapper synth_audio = tts_model_wrapper.generate( text, ref_audio_path=ref_temp_file.name, ref_text=self.saved_voice_text ) end_time = time.time() print(f"Speech generation completed in {end_time - start_time:.2f} seconds") # Process audio for better quality synth_audio = enhance_audio(synth_audio) # Save the synthesized audio sf.write(output_temp_file.name, synth_audio, 24000) # IndicF5 uses 24kHz # Add to cache self.tts_cache[text] = output_temp_file.name print(f"TTS output saved to {output_temp_file.name}") return output_temp_file.name except Exception as e: print(f"Error generating speech: {e}") traceback.print_exc() return None finally: # We don't delete the output file as it's returned to the caller # But clean up reference file try: os.unlink(ref_temp_file.name) except Exception as e: print(f"Error cleaning up reference file: {e}") else: print("No saved voice reference or TTS model not loaded") return None except Exception as e: print(f"Error in TTS processing: {e}") traceback.print_exc() return None def queue_tts_generation(self, text, callback=None): """Queue TTS generation in background thread""" print(f"Queueing TTS generation for text: {text[:30]}...") self.tts_queue.put((text, callback)) def generate_streamed_speech(self, text): """Generate speech in a streaming manner for low latency""" if not self.saved_voice: print("No reference voice saved") return None if not text or not text.strip(): print("No text provided for streaming TTS") return None sample_rate, audio_data = self.saved_voice # Start streaming generation self.streaming_tts.generate( text=text, ref_audio=audio_data, ref_sr=sample_rate, ref_text=self.saved_voice_text ) # Return the path that will be populated return self.streaming_tts.output_file def update_history(self, user_input, ai_response): """Update conversation history""" if user_input and user_input.strip(): self.conversation_history.append({"role": "user", "content": user_input}) if ai_response and ai_response.strip(): self.conversation_history.append({"role": "assistant", "content": ai_response}) # Limit history size if len(self.conversation_history) > 20: self.conversation_history = self.conversation_history[-20:] # Initialize global conversation engine conversation_engine = ConversationEngine() speech_recognizer = SpeechRecognizer() class ConversationEngine: def __init__(self): self.conversation_history = [] self.system_prompt = "You are a helpful assistant that speaks Malayalam fluently. Always respond in Malayalam script with proper formatting." self.saved_voice = None self.saved_voice_text = "" self.tts_cache = {} # Cache for TTS outputs # TTS background processing queue self.tts_queue = queue.Queue() self.tts_thread = threading.Thread(target=self.tts_worker, daemon=True) self.tts_thread.start() # Initialize IndicF5 TTS model if available self.tts_model = None self.device = None try: self.initialize_tts_model() # Test the model if it was loaded successfully if self.tts_model is not None: print("TTS model initialized successfully") except Exception as e: print(f"Error initializing TTS model: {e}") traceback.print_exc() def initialize_tts_model(self): """Initialize the IndicF5 TTS model with optimizations""" try: # Check for HF token in environment and use it if available hf_token = os.getenv("HF_TOKEN") if hf_token: print("Logging into Hugging Face with the provided token.") login(token=hf_token) if torch.cuda.is_available(): self.device = torch.device("cuda") print(f"Using GPU: {torch.cuda.get_device_name(0)}") else: self.device = torch.device("cpu") print("Using CPU") # Enable performance optimizations torch.backends.cudnn.benchmark = True # Load TTS model and move it to the appropriate device (GPU/CPU) print("Loading TTS model from ai4bharat/IndicF5...") repo_id = "ai4bharat/IndicF5" self.tts_model = AutoModel.from_pretrained(repo_id, trust_remote_code=True) self.tts_model = self.tts_model.to(self.device) # Set model to evaluation mode for faster inference self.tts_model.eval() print("TTS model loaded successfully") except Exception as e: print(f"Failed to load TTS model: {e}") self.tts_model = None traceback.print_exc() def tts_worker(self): """Background worker to process TTS requests""" while True: try: # Get text and callback from queue text, callback = self.tts_queue.get() # Generate speech audio_path = self._generate_tts(text) # Execute callback with result if callback: callback(audio_path) # Mark task as done self.tts_queue.task_done() except Exception as e: print(f"Error in TTS worker: {e}") traceback.print_exc() def transcribe_audio(self, audio_data, language="ml-IN"): """Convert audio to text using speech recognition""" if audio_data is None: print("No audio data received") return "No audio detected", "" # Make sure we have audio data in the expected format try: if isinstance(audio_data, tuple) and len(audio_data) == 2: # Expected format: (sample_rate, audio_samples) sample_rate, audio_samples = audio_data else: print(f"Unexpected audio format: {type(audio_data)}") return "Invalid audio format", "" if len(audio_samples) == 0: print("Empty audio samples") return "No speech detected", "" # Save the audio temporarily temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") temp_file.close() # Save the audio data to the temporary file sf.write(temp_file.name, audio_samples, sample_rate) # Use speech recognition on the file recognizer = sr.Recognizer() with sr.AudioFile(temp_file.name) as source: audio = recognizer.record(source) text = recognizer.recognize_google(audio, language=language) print(f"Recognized: {text}") return text, text except sr.UnknownValueError: print("Speech recognition could not understand audio") return "Could not understand audio", "" except sr.RequestError as e: print(f"Could not request results from Google Speech Recognition service: {e}") return f"Speech recognition service error: {str(e)}", "" except Exception as e: print(f"Error processing audio: {e}") traceback.print_exc() return f"Error processing audio: {str(e)}", "" finally: # Clean up temporary file if 'temp_file' in locals() and os.path.exists(temp_file.name): try: os.unlink(temp_file.name) except Exception as e: print(f"Error deleting temporary file: {e}") def save_reference_voice(self, audio_data, reference_text): """Save the reference voice for future TTS generation""" if audio_data is None or not reference_text.strip(): return "Error: Both reference audio and text are required" self.saved_voice = audio_data self.saved_voice_text = reference_text.strip() # Clear TTS cache when voice changes self.tts_cache.clear() # Debug info sample_rate, audio_samples = audio_data print(f"Saved reference voice: {len(audio_samples)} samples at {sample_rate}Hz") print(f"Reference text: {reference_text}") return f"Voice saved successfully! Reference text: {reference_text}" def process_text_input(self, text): """Process text input from user""" if text and text.strip(): return text, text return "No input provided", "" def generate_response(self, input_text): """Generate AI response using GPT-3.5 Turbo""" if not input_text or not input_text.strip(): return "เด‡เตปเดชเตเดŸเตเดŸเต เดฒเดญเดฟเดšเตเดšเดฟเดฒเตเดฒ. เดตเต€เดฃเตเดŸเตเด‚ เดถเตเดฐเดฎเดฟเด•เตเด•เตเด•.", None # "No input received. Please try again." try: # Prepare conversation context from history messages = [{"role": "system", "content": self.system_prompt}] # Add previous conversations for context for entry in self.conversation_history: role = "user" if entry["role"] == "user" else "assistant" messages.append({"role": role, "content": entry["content"]}) # Add current input messages.append({"role": "user", "content": input_text}) # Call OpenAI API response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, max_tokens=500, temperature=0.7 ) response_text = response.choices[0].message.content.strip() return response_text, None except Exception as e: error_msg = f"เดŽเดฑเตผ: GPT เดฎเต‹เดกเดฒเดฟเตฝ เดจเดฟเดจเตเดจเต เด‰เดคเตเดคเดฐเด‚ เดฒเดญเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เดชเตเดฐเดถเตเดจเดฎเตเดฃเตเดŸเดพเดฏเดฟ: {str(e)}" print(f"Error in GPT response: {e}") traceback.print_exc() return error_msg, None def resample_audio(self, audio, orig_sr, target_sr): """Resample audio to match target sample rate only if necessary""" if orig_sr != target_sr: print(f"Resampling audio from {orig_sr}Hz to {target_sr}Hz") return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) return audio def _generate_tts(self, text): """Internal method to generate TTS without threading""" if not text or not text.strip(): print("No text provided for TTS generation") return None # Check cache first if text in self.tts_cache: print("Using cached TTS output") return self.tts_cache[text] try: # Check if we have a saved voice and the TTS model if self.saved_voice is not None and self.tts_model is not None: sample_rate, audio_data = self.saved_voice # Create a temporary file for the reference audio ref_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") ref_temp_file.close() print(f"Saving reference audio to {ref_temp_file.name}") # Save the reference audio data sf.write(ref_temp_file.name, audio_data, sample_rate) # Create a temporary file for the output audio output_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") output_temp_file.close() try: # Generate speech using IndicF5 - simplified approach from second file print(f"Generating speech with IndicF5. Text: {text[:30]}...") start_time = time.time() # Use torch.no_grad() to save memory and computation with torch.no_grad(): # Run the inference - directly use the model as in the second file synth_audio = self.tts_model( text, ref_audio_path=ref_temp_file.name, ref_text=self.saved_voice_text ) end_time = time.time() print(f"Speech generation completed in {(end_time - start_time)} seconds") # Normalize output if needed if synth_audio.dtype == np.int16: synth_audio = synth_audio.astype(np.float32) / 32768.0 # Resample the generated audio to match the reference audio's sample rate synth_audio = self.resample_audio(synth_audio, orig_sr=24000, target_sr=sample_rate) # Save the synthesized audio print(f"Saving synthesized audio to {output_temp_file.name}") sf.write(output_temp_file.name, synth_audio, sample_rate) # Cache the result self.tts_cache[text] = output_temp_file.name print(f"TTS generation successful, output file: {output_temp_file.name}") return output_temp_file.name except Exception as e: print(f"IndicF5 TTS failed with error: {e}") traceback.print_exc() # Fall back to Google TTS return self.fallback_tts(text, output_temp_file.name) finally: # Clean up reference audio file if os.path.exists(ref_temp_file.name): try: os.unlink(ref_temp_file.name) except Exception as e: print(f"Error deleting temporary file: {e}") else: if self.saved_voice is None: print("No saved voice available for TTS") if self.tts_model is None: print("TTS model not initialized") # No saved voice or TTS model, use fallback temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") temp_file.close() return self.fallback_tts(text, temp_file.name) except Exception as e: print(f"Error in TTS processing: {e}") traceback.print_exc() return None def speak_with_indicf5(self, text, callback=None): """Queue text for TTS generation""" if not text or not text.strip(): if callback: callback(None) return None # Check cache first for immediate response if text in self.tts_cache: print("Using cached TTS output") if callback: callback(self.tts_cache[text]) return self.tts_cache[text] # If no callback provided, generate synchronously if callback is None: return self._generate_tts(text) # Otherwise, queue for async processing self.tts_queue.put((text, callback)) return None def fallback_tts(self, text, output_path): """Fallback to Google TTS if IndicF5 fails""" try: from gtts import gTTS # Determine if text is Malayalam is_malayalam = any('\u0D00' <= c <= '\u0D7F' for c in text) lang = 'ml' if is_malayalam else 'en' print(f"Using fallback Google TTS with language: {lang}") tts = gTTS(text=text, lang=lang, slow=False) tts.save(output_path) # Cache the result self.tts_cache[text] = output_path print(f"Fallback TTS saved to: {output_path}") return output_path except Exception as e: print(f"Fallback TTS also failed: {e}") traceback.print_exc() return None def add_message(self, role, content): """Add a message to the conversation history""" timestamp = datetime.now().strftime("%H:%M:%S") self.conversation_history.append({ "role": role, "content": content, "timestamp": timestamp }) def clear_conversation(self): """Clear the conversation history""" self.conversation_history = [] def cleanup(self): """Clean up resources when shutting down""" print("Cleaning up resources...") # Load example Malayalam voices def load_audio_from_url(url): """Load audio from a URL""" try: response = requests.get(url) if response.status_code == 200: audio_data, sample_rate = sf.read(io.BytesIO(response.content)) return sample_rate, audio_data except Exception as e: print(f"Error loading audio from URL: {e}") return None, None # Malayalam voice examples EXAMPLE_VOICES = [ { "name": "Aparna Voice", "url": "https://raw.githubusercontent.com/Aparna0112/voicerecording-_TTS/main/Aparna%20Voice.wav", "transcript": "เดžเดพเตป เด’เดฐเต เดซเต‹เดฃเดฟเดจเตโ€เดฑเต† เด•เดตเตผ เดจเต‹เด•เตเด•เตเด•เดฏเดพเดฃเต. เดŽเดจเดฟเด•เตเด•เต เดธเตเดฎเดพเตผเดŸเตเดŸเต เดซเต‹เดฃเดฟเดจเต เด•เดตเตผ เดตเต‡เดฃเด‚" }, { "name": "KC Voice", "url": "https://raw.githubusercontent.com/Aparna0112/voicerecording-_TTS/main/KC%20Voice.wav", "transcript": "เดนเดฒเต‹ เด‡เดคเต เด…เดชเดฐเดจเต† เด…เดฒเตเดฒเต‡ เดžเดพเตป เดœเด—เดฆเต€เดชเต เด†เดฃเต เดตเดฟเดณเดฟเด•เตเด•เตเดจเตเดจเดคเต เด‡เดชเตเดชเต‹เตพ เดซเตเดฐเต€เดฏเดพเดฃเต‹ เดธเด‚เดธเดพเดฐเดฟเด•เตเด•เดพเดฎเต‹" } ] # Preload example voices for voice in EXAMPLE_VOICES: sample_rate, audio_data = load_audio_from_url(voice["url"]) if sample_rate is not None: voice["audio"] = (sample_rate, audio_data) print(f"Loaded example voice: {voice['name']}") else: print(f"Failed to load voice: {voice['name']}") def create_chatbot_interface(): """Create a single-page chatbot interface with voice input, output, and voice selection""" # Initialize conversation engine engine = ConversationEngine() # CSS for styling the chat interface css = """ .chatbot-container { display: flex; flex-direction: column; height: 100%; max-width: 800px; margin: 0 auto; } .chat-window { flex-grow: 1; overflow-y: auto; padding: 1rem; background: #f5f7f9; border-radius: 0.5rem; margin-bottom: 1rem; min-height: 400px; } .input-area { display: flex; gap: 0.5rem; padding: 0.5rem; align-items: center; } .message { margin-bottom: 1rem; padding: 0.8rem; border-radius: 0.5rem; position: relative; max-width: 80%; } .user-message { background: #e1f5fe; align-self: flex-end; margin-left: auto; } .bot-message { background: #f0f0f0; align-self: flex-start; } .timestamp { font-size: 0.7rem; color: #888; margin-top: 0.2rem; text-align: right; } .chatbot-header { text-align: center; color: #333; margin-bottom: 1rem; } .chat-controls { display: flex; justify-content: space-between; margin-bottom: 0.5rem; } .voice-selector { background: #f8f9fa; padding: 1rem; border-radius: 0.5rem; margin-bottom: 1rem; } .progress-bar { height: 4px; background-color: #e0e0e0; position: relative; margin: 10px 0; border-radius: 2px; } .progress-bar-fill { height: 100%; background-color: #4CAF50; border-radius: 2px; transition: width 0.3s ease-in-out; } """ with gr.Blocks(css=css, title="Malayalam Voice Chatbot") as interface: gr.Markdown("# ๐Ÿค– Malayalam Voice Chatbot with Voice Selection", elem_classes=["chatbot-header"]) # Create a state variable for TTS progress tts_progress_state = gr.State(0) audio_output_state = gr.State(None) with gr.Row(elem_classes=["chatbot-container"]): with gr.Column(): # Voice selection section - fixed to use Accordion instead of Box with gr.Accordion("๐ŸŽค Voice Selection", open=True): # Select from example voices or record your own voice_selector = gr.Dropdown( choices=[voice["name"] for voice in EXAMPLE_VOICES], value=EXAMPLE_VOICES[0]["name"] if EXAMPLE_VOICES else None, label="Select Voice Example" ) # Display selected voice info voice_info = gr.Textbox( value=EXAMPLE_VOICES[0]["transcript"] if EXAMPLE_VOICES else "", label="Voice Sample Transcript", lines=2, interactive=True ) # Play selected example voice example_audio = gr.Audio( value=None, label="Example Voice", interactive=False ) # Or record your own voice gr.Markdown("### OR Record Your Own Voice") custom_voice = gr.Audio( sources=["microphone", "upload"], type="numpy", label="Record/Upload Your Voice" ) custom_transcript = gr.Textbox( value="", label="Your Voice Transcript (what you said in Malayalam)", lines=2 ) # Button to save the selected/recorded voice save_voice_btn = gr.Button("๐Ÿ’พ Save Voice for Chat", variant="primary") voice_status = gr.Textbox(label="Voice Status", value="No voice saved yet") # Language selector and controls for chat with gr.Row(elem_classes=["chat-controls"]): language_selector = gr.Dropdown( choices=["ml-IN", "en-US", "hi-IN", "ta-IN", "te-IN", "kn-IN"], value="ml-IN", label="Speech Recognition Language" ) clear_btn = gr.Button("๐Ÿงน Clear Chat", scale=0) # Chat display area chatbot = gr.Chatbot( [], elem_id="chatbox", bubble_full_width=False, height=450, elem_classes=["chat-window"] ) # Progress bar for TTS generation with gr.Row(): tts_progress = gr.Slider( minimum=0, maximum=100, value=0, label="TTS Progress", interactive=False ) # Audio output for the bot's response audio_output = gr.Audio( label="Bot's Voice Response", type="filepath", autoplay=True, visible=True ) # Status message for debugging status_msg = gr.Textbox( label="Status", value="Ready", interactive=False ) # Input area with separate components with gr.Row(elem_classes=["input-area"]): audio_msg = gr.Textbox( label="Message", placeholder="Type a message or record audio", lines=1 ) audio_input = gr.Audio( sources=["microphone"], type="numpy", label="Record", elem_classes=["audio-input"] ) submit_btn = gr.Button("๐Ÿš€ Send", variant="primary") # Function to update voice example info def update_voice_example(voice_name): for voice in EXAMPLE_VOICES: if voice["name"] == voice_name and "audio" in voice: return voice["transcript"], voice["audio"] return "", None # Function to save voice for TTS def save_voice_for_tts(example_name, example_audio, custom_audio, example_transcript, custom_transcript): try: # Check if we're using an example voice or custom recorded voice if custom_audio is not None: # Use custom recorded voice if not custom_transcript.strip(): return "Error: Please provide a transcript for your recorded voice" voice_audio = custom_audio transcript = custom_transcript source = "custom recording" elif example_audio is not None: # Use selected example voice voice_audio = example_audio transcript = example_transcript source = f"example: {example_name}" else: return "Error: No voice selected or recorded" # Save the voice in the engine result = engine.save_reference_voice(voice_audio, transcript) return f"Voice saved successfully! Using {source}" except Exception as e: print(f"Error saving voice: {e}") traceback.print_exc() return f"Error saving voice: {str(e)}" # Function to update TTS progress def update_tts_progress(progress): return progress # Audio generated callback def on_tts_generated(audio_path): print(f"TTS generation callback received path: {audio_path}") return audio_path, 100, "Response ready" # audio path, 100% progress, status message # Function to process user input and generate response def process_input(audio, text_input, history, language, progress): try: # Update status status = "Processing input..." # Reset progress bar progress = 0 # Check which input mode we're using if audio is not None: # Audio input transcribed_text, input_text = engine.transcribe_audio(audio, language) if not input_text: status = "Could not understand audio. Please try again." return history, None, status, text_input, progress elif text_input and text_input.strip(): # Text input input_text = text_input.strip() transcribed_text = input_text else: # No valid input status = "No input detected. Please speak or type a message." return history, None, status, text_input, progress # Add user message to conversation history engine.add_message("user", input_text) # Update the Gradio chatbot display immediately with user message updated_history = history + [[transcribed_text, None]] # Update status and progress status = "Generating response..." progress = 30 # Generate response response_text, _ = engine.generate_response(input_text) # Add assistant response to conversation history engine.add_message("assistant", response_text) # Update the Gradio chatbot with the assistant's response updated_history = history + [[transcribed_text, response_text]] # Update status and progress status = "Generating speech..." progress = 60 # Generate speech for response synchronously (for better debugging) audio_path = engine._generate_tts(response_text) if audio_path: status = f"Response ready: {audio_path}" progress = 100 print(f"Audio generated successfully: {audio_path}") else: status = "Failed to generate speech" # Clear the text input return updated_history, audio_path, status, "", progress except Exception as e: # Catch any unexpected errors error_message = f"Error: {str(e)}" print(error_message) traceback.print_exc() return history, None, error_message, text_input, progress # Function to clear chat history def clear_chat(): engine.clear_conversation() return [], None, "Chat history cleared", "", 0 # Connect event handlers # Voice selection handlers voice_selector.change( update_voice_example, inputs=[voice_selector], outputs=[voice_info, example_audio] ) # Save voice button handler save_voice_btn.click( save_voice_for_tts, inputs=[voice_selector, example_audio, custom_voice, voice_info, custom_transcript], outputs=[voice_status] ) # Chat handlers submit_btn.click( process_input, inputs=[audio_input, audio_msg, chatbot, language_selector, tts_progress_state], outputs=[chatbot, audio_output, status_msg, audio_msg, tts_progress] ) # Allow sending by pressing Enter key in the text input audio_msg.submit( process_input, inputs=[audio_input, audio_msg, chatbot, language_selector, tts_progress_state], outputs=[chatbot, audio_output, status_msg, audio_msg, tts_progress] ) # Clear button handler clear_btn.click( clear_chat, inputs=[], outputs=[chatbot, audio_output, status_msg, audio_msg, tts_progress] ) # Setup cleanup on exit def exit_handler(): engine.cleanup() import atexit atexit.register(exit_handler) # Enable queueing for better responsiveness interface.queue() return interface # Start the interface if __name__ == "__main__": print("Starting Malayalam Voice Chatbot with IndicF5 Voice Selection...") interface = create_chatbot_interface() interface.launch(debug=True) # Enable debug mode to see errors in the console