|
import gradio as gr |
|
import torch |
|
import numpy as np |
|
import librosa |
|
import soundfile as sf |
|
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM |
|
import warnings |
|
import json |
|
import time |
|
from datetime import datetime |
|
import os |
|
|
|
|
|
try: |
|
from TTS.api import TTS |
|
TTS_AVAILABLE = True |
|
except ImportError: |
|
print("β οΈ TTS not available, using text-only mode") |
|
TTS_AVAILABLE = False |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
asr_pipe = None |
|
qwen_model = None |
|
qwen_tokenizer = None |
|
tts_model = None |
|
conversation_history = [] |
|
|
|
class ConversationManager: |
|
def __init__(self, max_exchanges=5): |
|
self.history = [] |
|
self.max_exchanges = max_exchanges |
|
self.current_emotion = "neutral" |
|
|
|
def add_exchange(self, user_input, ai_response, emotion="neutral"): |
|
self.history.append({ |
|
"timestamp": datetime.now().isoformat(), |
|
"user": user_input, |
|
"ai": ai_response, |
|
"emotion": emotion |
|
}) |
|
|
|
if len(self.history) > self.max_exchanges: |
|
self.history = self.history[-self.max_exchanges:] |
|
|
|
def get_context(self): |
|
context = "" |
|
for exchange in self.history[-3:]: |
|
context += f"User: {exchange['user']}\nAI: {exchange['ai']}\n" |
|
return context |
|
|
|
def clear(self): |
|
self.history = [] |
|
self.current_emotion = "neutral" |
|
|
|
def load_models(): |
|
"""Load all models with proper error handling""" |
|
global asr_pipe, qwen_model, qwen_tokenizer, tts_model |
|
|
|
print("π Loading models...") |
|
|
|
|
|
print("π€ Loading Whisper for ASR...") |
|
try: |
|
asr_pipe = pipeline( |
|
"automatic-speech-recognition", |
|
model="openai/whisper-base", |
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
device=0 if torch.cuda.is_available() else -1 |
|
) |
|
print("β
Whisper ASR loaded successfully!") |
|
except Exception as e: |
|
print(f"β Error loading Whisper: {e}") |
|
return False |
|
|
|
|
|
print("π§ Loading Qwen2.5-1.5B for conversation...") |
|
try: |
|
model_name = "Qwen/Qwen2.5-1.5B-Instruct" |
|
qwen_tokenizer = AutoTokenizer.from_pretrained( |
|
model_name, |
|
trust_remote_code=True |
|
) |
|
qwen_model = AutoModelForCausalLM.from_pretrained( |
|
model_name, |
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
device_map="auto" if torch.cuda.is_available() else None, |
|
trust_remote_code=True |
|
) |
|
print("β
Qwen loaded successfully!") |
|
except Exception as e: |
|
print(f"β Error loading Qwen: {e}") |
|
return False |
|
|
|
|
|
print("ποΈ Loading TTS model...") |
|
if TTS_AVAILABLE: |
|
try: |
|
|
|
tts_model = TTS(model_name="tts_models/en/ljspeech/tacotron2-DDC", progress_bar=False) |
|
if torch.cuda.is_available(): |
|
tts_model = tts_model.to("cuda") |
|
print("β
TTS loaded successfully!") |
|
except Exception as e: |
|
print(f"β οΈ TTS failed to load: {e}") |
|
tts_model = None |
|
else: |
|
print("β οΈ TTS not available, using text-only mode") |
|
tts_model = None |
|
|
|
return True |
|
|
|
def detect_emotion_from_text(text): |
|
"""Simple emotion detection from text""" |
|
text_lower = text.lower() |
|
|
|
|
|
if any(word in text_lower for word in ['happy', 'great', 'awesome', 'wonderful', 'excited', 'laugh', 'amazing', 'fantastic']): |
|
return 'happy' |
|
elif any(word in text_lower for word in ['sad', 'upset', 'disappointed', 'cry', 'terrible', 'awful', 'depressed']): |
|
return 'sad' |
|
elif any(word in text_lower for word in ['angry', 'mad', 'furious', 'annoyed', 'frustrated', 'hate']): |
|
return 'angry' |
|
elif any(word in text_lower for word in ['wow', 'incredible', 'surprised', 'unbelievable', 'shocking']): |
|
return 'surprised' |
|
else: |
|
return 'neutral' |
|
|
|
def speech_to_text_with_emotion(audio_input): |
|
"""Convert speech to text and detect emotion""" |
|
try: |
|
if audio_input is None: |
|
return "", "neutral" |
|
|
|
|
|
if isinstance(audio_input, tuple): |
|
sample_rate, audio_data = audio_input |
|
|
|
if audio_data.dtype != np.float32: |
|
audio_data = audio_data.astype(np.float32) |
|
if len(audio_data.shape) > 1: |
|
audio_data = audio_data.mean(axis=1) |
|
else: |
|
audio_data = audio_input |
|
sample_rate = 16000 |
|
|
|
|
|
if len(audio_data) > 0: |
|
max_val = np.max(np.abs(audio_data)) |
|
if max_val > 0: |
|
audio_data = audio_data / max_val |
|
|
|
|
|
if sample_rate != 16000: |
|
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000) |
|
|
|
|
|
result = asr_pipe(audio_data, sampling_rate=16000) |
|
transcription = result['text'].strip() |
|
|
|
|
|
emotion = detect_emotion_from_text(transcription) |
|
|
|
return transcription, emotion |
|
|
|
except Exception as e: |
|
print(f"Error in STT: {e}") |
|
return "Sorry, I couldn't understand that.", "neutral" |
|
|
|
def generate_contextual_response(user_input, emotion, conversation_manager): |
|
"""Generate contextual response using Qwen""" |
|
try: |
|
context = conversation_manager.get_context() |
|
|
|
|
|
emotional_prompts = { |
|
"happy": "Respond with enthusiasm and joy. Use positive language and show excitement.", |
|
"sad": "Respond with empathy and comfort. Be gentle, understanding, and supportive.", |
|
"angry": "Respond calmly and try to help. Be patient and de-escalate the situation.", |
|
"surprised": "Share in the surprise and show curiosity. Be engaging and interested.", |
|
"neutral": "Respond naturally and conversationally. Be helpful and friendly." |
|
} |
|
|
|
system_prompt = f"""You are Maya, a friendly and emotionally intelligent AI assistant. |
|
{emotional_prompts.get(emotion, emotional_prompts['neutral'])} |
|
|
|
Previous conversation context: |
|
{context} |
|
|
|
Current user emotion: {emotion} |
|
|
|
Guidelines: |
|
- Keep responses concise (1-2 sentences maximum) |
|
- Match the user's emotional tone appropriately |
|
- Be natural and conversational |
|
- Show empathy and understanding |
|
- Provide helpful responses |
|
""" |
|
|
|
messages = [ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": user_input} |
|
] |
|
|
|
|
|
text = qwen_tokenizer.apply_chat_template( |
|
messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
|
|
model_inputs = qwen_tokenizer([text], return_tensors="pt") |
|
if torch.cuda.is_available(): |
|
model_inputs = model_inputs.to(qwen_model.device) |
|
|
|
with torch.no_grad(): |
|
generated_ids = qwen_model.generate( |
|
model_inputs.input_ids, |
|
max_new_tokens=80, |
|
do_sample=True, |
|
temperature=0.7, |
|
top_p=0.9, |
|
pad_token_id=qwen_tokenizer.eos_token_id |
|
) |
|
|
|
generated_ids = [ |
|
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
|
] |
|
|
|
response = qwen_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
|
|
return response.strip() |
|
|
|
except Exception as e: |
|
print(f"Error in response generation: {e}") |
|
return "I'm sorry, I'm having trouble processing that right now. Could you please try again?" |
|
|
|
def text_to_speech_emotional(text, emotion="neutral"): |
|
"""Convert text to speech with emotional context""" |
|
try: |
|
if tts_model is None: |
|
print(f"π Maya says ({emotion}): {text}") |
|
return None |
|
|
|
|
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
|
|
emotional_prefixes = { |
|
"happy": "[Speaking with joy] ", |
|
"sad": "[Speaking gently] ", |
|
"angry": "[Speaking calmly] ", |
|
"surprised": "[Speaking with excitement] ", |
|
"neutral": "" |
|
} |
|
|
|
enhanced_text = f"{emotional_prefixes.get(emotion, '')}{text}" |
|
|
|
print(f"Generating TTS for: {enhanced_text}") |
|
|
|
|
|
audio_output = tts_model.tts(text=enhanced_text) |
|
|
|
|
|
if isinstance(audio_output, list): |
|
audio_output = np.array(audio_output, dtype=np.float32) |
|
elif torch.is_tensor(audio_output): |
|
audio_output = audio_output.cpu().numpy().astype(np.float32) |
|
|
|
|
|
if len(audio_output) > 0: |
|
max_val = np.max(np.abs(audio_output)) |
|
if max_val > 1.0: |
|
audio_output = audio_output / max_val * 0.95 |
|
|
|
return (22050, audio_output) |
|
|
|
except Exception as e: |
|
print(f"Error in TTS: {e}") |
|
print(f"π Maya says ({emotion}): {text}") |
|
return None |
|
|
|
|
|
conv_manager = ConversationManager() |
|
|
|
def start_call(): |
|
"""Initialize call and return greeting""" |
|
conv_manager.clear() |
|
greeting_text = "Hello! I'm Maya, your AI assistant. How can I help you today?" |
|
greeting_audio = text_to_speech_emotional(greeting_text, "happy") |
|
|
|
return greeting_audio, greeting_text, "Call started! π Ready to chat!" |
|
|
|
def process_conversation(audio_input): |
|
"""Main conversation processing pipeline""" |
|
if audio_input is None: |
|
return None, "Please record some audio first.", "", "β No audio input received." |
|
|
|
try: |
|
|
|
user_text, emotion = speech_to_text_with_emotion(audio_input) |
|
|
|
if not user_text or user_text.strip() == "": |
|
return None, "I didn't catch that. Could you please repeat?", "", "β No speech detected." |
|
|
|
|
|
ai_response = generate_contextual_response(user_text, emotion, conv_manager) |
|
|
|
|
|
response_audio = text_to_speech_emotional(ai_response, emotion) |
|
|
|
|
|
conv_manager.add_exchange(user_text, ai_response, emotion) |
|
|
|
status = f"β
Processed successfully! | Emotion: {emotion} | Exchange: {len(conv_manager.history)}/5" |
|
|
|
return response_audio, ai_response, user_text, status |
|
|
|
except Exception as e: |
|
error_msg = f"β Error processing conversation: {str(e)}" |
|
return None, "I'm sorry, I encountered an error. Please try again.", "", error_msg |
|
|
|
def get_conversation_history(): |
|
"""Return formatted conversation history""" |
|
if not conv_manager.history: |
|
return "No conversation history yet. Start a call to begin chatting!" |
|
|
|
history_text = "π **Conversation History:**\n\n" |
|
for i, exchange in enumerate(conv_manager.history, 1): |
|
timestamp = exchange['timestamp'][:19].replace('T', ' ') |
|
history_text += f"**Exchange {i}** ({timestamp}) - Emotion: {exchange['emotion']}\n" |
|
history_text += f"π€ **You:** {exchange['user']}\n" |
|
history_text += f"π€ **Maya:** {exchange['ai']}\n\n" |
|
|
|
return history_text |
|
|
|
def end_call(): |
|
"""End call and clear conversation""" |
|
farewell_text = "Thank you for talking with me! Have a wonderful day!" |
|
farewell_audio = text_to_speech_emotional(farewell_text, "happy") |
|
conv_manager.clear() |
|
|
|
return farewell_audio, farewell_text, "Call ended. πβ Thanks for chatting!" |
|
|
|
def create_interface(): |
|
"""Create the Gradio interface""" |
|
with gr.Blocks( |
|
title="Maya AI - Speech-to-Speech Assistant", |
|
theme=gr.themes.Soft(), |
|
css=""" |
|
.main-header { |
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
border-radius: 15px; |
|
padding: 20px; |
|
text-align: center; |
|
margin-bottom: 20px; |
|
} |
|
.call-button { background: linear-gradient(45deg, #FF6B6B, #4ECDC4) !important; } |
|
.process-button { background: linear-gradient(45deg, #45B7D1, #96CEB4) !important; } |
|
.end-button { background: linear-gradient(45deg, #FFA07A, #FF6347) !important; } |
|
""" |
|
) as demo: |
|
|
|
gr.HTML(""" |
|
<div class="main-header"> |
|
<h1 style="color: white; margin: 0; font-size: 2.5em;">ποΈ Maya AI</h1> |
|
<p style="color: white; margin: 10px 0; font-size: 1.2em;">Advanced Speech-to-Speech Conversational AI</p> |
|
<p style="color: #E8E8E8; margin: 0;">Natural β’ Emotional β’ Contextual</p> |
|
</div> |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
|
|
gr.HTML("<h3>π Call Controls</h3>") |
|
start_btn = gr.Button("π Start Call", elem_classes="call-button", size="lg") |
|
end_btn = gr.Button("πβ End Call", elem_classes="end-button", size="lg") |
|
|
|
|
|
gr.HTML("<h3>π€ Voice Input</h3>") |
|
audio_input = gr.Audio( |
|
label="Record Your Message", |
|
sources=["microphone"], |
|
type="numpy" |
|
) |
|
|
|
process_btn = gr.Button("π― Process Message", elem_classes="process-button", variant="primary", size="lg") |
|
|
|
|
|
status_display = gr.Textbox( |
|
label="π Status", |
|
interactive=False, |
|
lines=2, |
|
value="Ready to start! Click 'Start Call' to begin." |
|
) |
|
|
|
with gr.Column(scale=2): |
|
|
|
gr.HTML("<h3>π Maya's Response</h3>") |
|
response_audio = gr.Audio( |
|
label="Maya's Voice Response", |
|
type="numpy", |
|
interactive=False |
|
) |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
user_text_display = gr.Textbox( |
|
label="π€ What You Said", |
|
interactive=False, |
|
lines=3, |
|
placeholder="Your speech will appear here..." |
|
) |
|
|
|
with gr.Column(): |
|
ai_text_display = gr.Textbox( |
|
label="π€ Maya's Response", |
|
interactive=False, |
|
lines=3, |
|
placeholder="Maya's response will appear here..." |
|
) |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.HTML("<h3>π Conversation History</h3>") |
|
history_btn = gr.Button("π Show History", variant="secondary") |
|
history_display = gr.Markdown( |
|
value="No conversation history yet. Start a call to begin chatting!", |
|
label="Conversation Log" |
|
) |
|
|
|
|
|
start_btn.click( |
|
fn=start_call, |
|
outputs=[response_audio, ai_text_display, status_display] |
|
) |
|
|
|
process_btn.click( |
|
fn=process_conversation, |
|
inputs=[audio_input], |
|
outputs=[response_audio, ai_text_display, user_text_display, status_display] |
|
) |
|
|
|
end_btn.click( |
|
fn=end_call, |
|
outputs=[response_audio, ai_text_display, status_display] |
|
) |
|
|
|
history_btn.click( |
|
fn=get_conversation_history, |
|
outputs=[history_display] |
|
) |
|
|
|
|
|
gr.HTML(""" |
|
<div style="margin-top: 20px; padding: 20px; background: #f8f9fa; border-radius: 10px; border-left: 5px solid #007bff;"> |
|
<h3>π‘ How to Use Maya AI:</h3> |
|
<ol> |
|
<li><strong>Start Call:</strong> Click "π Start Call" to initialize Maya</li> |
|
<li><strong>Record:</strong> Use the microphone to record your message</li> |
|
<li><strong>Process:</strong> Click "π― Process Message" to get Maya's response</li> |
|
<li><strong>Listen:</strong> Maya will respond with natural, emotional speech</li> |
|
<li><strong>Continue:</strong> Keep chatting (up to 5 exchanges with context)</li> |
|
<li><strong>End:</strong> Click "πβ End Call" when finished</li> |
|
</ol> |
|
|
|
<h4>π Features:</h4> |
|
<ul> |
|
<li>π€ <strong>Speech Recognition:</strong> Powered by Whisper</li> |
|
<li>π§ <strong>Smart Responses:</strong> Using Qwen2.5-1.5B</li> |
|
<li>π <strong>Emotion Detection:</strong> Automatic emotion recognition</li> |
|
<li>π <strong>Natural Speech:</strong> High-quality TTS with emotions</li> |
|
<li>π <strong>Context Memory:</strong> Remembers conversation flow</li> |
|
</ul> |
|
</div> |
|
""") |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
print("π Initializing Maya AI System...") |
|
print("π§ Checking GPU availability...") |
|
|
|
if torch.cuda.is_available(): |
|
print(f"β
GPU detected: {torch.cuda.get_device_name()}") |
|
print(f"πΎ GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") |
|
else: |
|
print("β οΈ No GPU detected, using CPU") |
|
|
|
if load_models(): |
|
print("β
All models loaded successfully!") |
|
print("π Launching Maya AI Interface...") |
|
|
|
demo = create_interface() |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=True, |
|
show_error=True, |
|
debug=False |
|
) |
|
else: |
|
print("β Failed to load models. Please check the logs above for details.") |
|
|