Assistant / app.py
jsbeaudry's picture
Update app.py
a5c19ce verified
import asyncio
import torch
import librosa
import numpy as np
import soundfile as sf
from transformers import (
AutoProcessor, AutoModelForSpeechSeq2Seq,
AutoModelForCausalLM, AutoTokenizer,
pipeline, SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
)
from datasets import load_dataset
import logging
from typing import Optional, Dict, Any
import time
from pathlib import Path
from kokoro import KPipeline
from IPython.display import display, Audio
import gradio as gr
import asyncio
import os
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
system_prompt_0 = """You are a highly trained U.S. Tax Assistant AI, designed to help individuals and small businesses understand, plan, and file their taxes according to federal and state tax laws. You explain complex tax concepts in simple, accurate, and actionable terms, using IRS guidelines, up-to-date tax code knowledge, and best practices for compliance and savings. You act as an explainer, educator, and assistant—not a certified tax preparer or legal advisor. Be short in your answer, less than 100 chars"""
class AsyncAIConversation:
def __init__(self):
self.stt_processor = None
self.stt_model = None
self.llm_tokenizer = None
self.llm_model = None
self.tts_synthesizer = None
self.speaker_embedding = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
async def initialize_models(self):
"""Initialize all models asynchronously"""
logger.info("Initializing models...")
# Initialize STT model
await self._init_stt_model()
# Initialize LLM model
await self._init_llm_model()
# Initialize TTS model
await self._init_tts_model()
logger.info("All models initialized successfully!")
async def _init_stt_model(self):
"""Initialize Speech-to-Text model"""
logger.info("Loading STT model...")
try:
stt_model_id = "unsloth/whisper-small"
#unsloth/whisper-large-v3-turbo
self.stt_processor = AutoProcessor.from_pretrained(stt_model_id)
self.stt_model = AutoModelForSpeechSeq2Seq.from_pretrained(stt_model_id)
self.stt_model.to(self.device)
logger.info("STT model loaded successfully")
except Exception as e:
logger.error(f"Error loading STT model: {e}")
raise
async def _init_llm_model(self):
"""Initialize Large Language Model"""
logger.info("Loading LLM model...")
try:
model_name = "unsloth/Qwen3-0.6B"
#unsloth/Qwen3-0.6B-unsloth-bnb-4bit
self.llm_tokenizer = AutoTokenizer.from_pretrained(model_name)
self.llm_model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto",
device_map="auto"
)
logger.info("LLM model loaded successfully")
except Exception as e:
logger.error(f"Error loading LLM model: {e}")
raise
async def _init_tts_model(self):
"""Initialize Text-to-Speech model"""
logger.info("Loading TTS model...")
try:
# Initialize Kokoro TTS pipeline
self.tts_synthesizer = KPipeline(lang_code='a')
logger.info("TTS model loaded successfully")
except Exception as e:
logger.error(f"Error loading TTS model: {e}")
raise
async def speech_to_text(self, audio_file_path: str) -> str:
"""Convert speech to text asynchronously"""
logger.info(f"Processing audio file: {audio_file_path}")
try:
# Load audio in a separate thread to avoid blocking
def load_audio():
return librosa.load(audio_file_path, sr=16000)
loop = asyncio.get_event_loop()
speech_array, sampling_rate = await loop.run_in_executor(None, load_audio)
# Convert to tensor
speech_array_pt = torch.from_numpy(speech_array).unsqueeze(0).to(self.device)
# Process input features
input_features = self.stt_processor(
speech_array,
sampling_rate=sampling_rate,
return_tensors="pt"
).input_features.to(self.device)
# Generate predictions
with torch.no_grad():
predicted_ids = self.stt_model.generate(input_features)
# Decode predictions
transcription = self.stt_processor.batch_decode(predicted_ids, skip_special_tokens=True)
result = transcription[0] if transcription else ""
logger.info(f"STT result: {result}")
return result
except Exception as e:
logger.error(f"Error in speech_to_text: {e}")
return ""
async def process_with_llm(self, text: str, system_prompt: Optional[str] = None) -> Dict[str, str]:
"""Process text with LLM and return both thinking and content"""
logger.info(f"Processing text with LLM: {text[:50]}...")
try:
# Prepare messages
messages = [
{"role": "user", "content": text}
]
if system_prompt:
messages.insert(0, {"role": "system", "content": system_prompt})
# Apply chat template
formatted_text = self.llm_tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False
)
# Tokenize
model_inputs = self.llm_tokenizer([formatted_text], return_tensors="pt").to(self.llm_model.device)
# Generate response
with torch.no_grad():
generated_ids = self.llm_model.generate(
**model_inputs,
max_new_tokens=512,
temperature=0.7,
do_sample=True,
pad_token_id=self.llm_tokenizer.eos_token_id
)
# Extract new tokens
output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist()
# Parse thinking content
try:
# Find the end of thinking token (</think>)
index = len(output_ids) - output_ids[::-1].index(151668)
except ValueError:
index = 0
thinking_content = self.llm_tokenizer.decode(output_ids[:index], skip_special_tokens=True).strip("\n")
content = self.llm_tokenizer.decode(output_ids[index:], skip_special_tokens=True).strip("\n")
result = {
"thinking": thinking_content,
"response": content
}
logger.info(f"LLM response generated: {content[:50]}...")
return result
except Exception as e:
logger.error(f"Error in process_with_llm: {e}")
return {"thinking": "", "response": "Sorry, I encountered an error processing your request."}
async def text_to_speech(self, text: str, output_path: str = "response.wav") -> str:
"""Convert text to speech asynchronously"""
logger.info(f"Converting text to speech: {text[:50]}...")
try:
# Generate speech in a separate thread to avoid blocking
def generate_speech():
# Generate audio using Kokoro TTS
generator = self.tts_synthesizer(text, voice='af_heart')
# Get the first generated audio chunk
for i, (gs, ps, audio) in enumerate(generator):
if i == 0: # Use the first chunk
return audio
return None
loop = asyncio.get_event_loop()
audio_data = await loop.run_in_executor(None, generate_speech)
if audio_data is None:
raise ValueError("Failed to generate audio")
# Save audio file with Kokoro's default sample rate (24000 Hz)
sf.write(output_path, audio_data, samplerate=24000)
logger.info(f"Audio saved to: {output_path}")
return output_path
except Exception as e:
logger.error(f"Error in text_to_speech: {e}")
return ""
async def process_conversation(self, audio_file_path: str, system_prompt: Optional[str] = None) -> Dict[str, Any]:
"""Complete conversation pipeline: STT -> LLM -> TTS"""
start_time = time.time()
logger.info("Starting conversation processing...")
try:
# Step 1: Speech to Text
stt_start = time.time()
transcribed_text = await self.speech_to_text(audio_file_path)
stt_time = time.time() - stt_start
if not transcribed_text:
return {"error": "Failed to transcribe audio"}
# Step 2: Process with LLM
llm_start = time.time()
llm_result = await self.process_with_llm(transcribed_text, system_prompt)
llm_time = time.time() - llm_start
# Step 3: Text to Speech
tts_start = time.time()
audio_output_path = await self.text_to_speech(llm_result["response"])
tts_time = time.time() - tts_start
total_time = time.time() - start_time
result = {
"input_audio": audio_file_path,
"transcribed_text": transcribed_text,
"thinking": llm_result["thinking"],
"response_text": llm_result["response"],
"output_audio": audio_output_path,
"processing_times": {
"stt": stt_time,
"llm": llm_time,
"tts": tts_time,
"total": total_time
}
}
logger.info(f"Conversation processed successfully in {total_time:.2f} seconds")
return result
except Exception as e:
logger.error(f"Error in process_conversation: {e}")
return {"error": str(e)}
async def batch_process(self, audio_files: list, system_prompt: Optional[str] = None) -> list:
"""Process multiple audio files concurrently"""
logger.info(f"Processing {len(audio_files)} audio files...")
# Create tasks for concurrent processing
tasks = [
self.process_conversation(audio_file, system_prompt)
for audio_file in audio_files
]
# Process all files concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
logger.info("Batch processing completed")
return results
# Initialize the conversation system
ai_conversation = AsyncAIConversation()
# Usage example and demo functions
async def demo_conversation():
"""Demonstration of the conversation system"""
# Initialize all models
await ai_conversation.initialize_models()
# Create the async function wrapper for Gradio
async def process_audio_gradio(audio_file, system_prompt_input):
"""Processes audio file and system prompt for Gradio interface."""
if audio_file is None:
return "Please upload an audio file.", "", "", None
# Gradio provides the file path
audio_path = audio_file
# Process the conversation using the initialized ai_conversation instance
try:
result = await ai_conversation.process_conversation(
audio_file_path=audio_path,
system_prompt=system_prompt_input
)
if "error" in result:
return f"Error: {result['error']}", "", "", None
else:
return (
f"Transcribed: {result['transcribed_text']}\nThinking: {result['thinking']}",
result['response_text'],
result['output_audio'],
result['processing_times']
)
except Exception as e:
return f"An unexpected error occurred: {e}", "", "", None
# Define the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Asynchronous AI Conversation System")
gr.Markdown("Upload an audio file and provide a system prompt to get a response.")
with gr.Column():
audio_input = gr.Audio(label="Upload Audio File", type="filepath")
process_button = gr.Button("Process Conversation")
system_prompt_input = gr.Textbox(label="System Prompt", value=system_prompt_0)
with gr.Column():
status_output = gr.Textbox(label="Status/Transcription/Thinking", interactive=False)
response_text_output = gr.Textbox(label="AI Response Text", interactive=False)
response_audio_output = gr.Audio(label="AI Response Audio", interactive=False)
processing_times_output = gr.JSON(label="Processing Times")
# Link button click to the async function
process_button.click(
fn=process_audio_gradio,
inputs=[audio_input, system_prompt_input],
outputs=[status_output, response_text_output, response_audio_output, processing_times_output]
)
if __name__ == "__main__":
def initiate():
asyncio.run(demo_conversation())
initiate()
# Gradio launch itself runs an event loop.
# Ensure ai_conversation is initialized in the notebook before this cell is run. s
demo.launch(debug=False, share=True)