Spaces:
Sleeping
Sleeping
File size: 13,550 Bytes
35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 3ffb6e9 0b9e85c 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 6dff5db 28b8f48 6dff5db 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc e09173a 0245c16 e09173a 0245c16 35e66cc 28b8f48 35e66cc 28b8f48 35e66cc 28b8f48 99a4d6a e69b450 99a4d6a e69b450 28b8f48 a5c19ce 28b8f48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
import asyncio
import torch
import librosa
import numpy as np
import soundfile as sf
from transformers import (
AutoProcessor, AutoModelForSpeechSeq2Seq,
AutoModelForCausalLM, AutoTokenizer,
pipeline, SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
)
from datasets import load_dataset
import logging
from typing import Optional, Dict, Any
import time
from pathlib import Path
from kokoro import KPipeline
from IPython.display import display, Audio
import gradio as gr
import asyncio
import os
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
system_prompt_0 = """You are a highly trained U.S. Tax Assistant AI, designed to help individuals and small businesses understand, plan, and file their taxes according to federal and state tax laws. You explain complex tax concepts in simple, accurate, and actionable terms, using IRS guidelines, up-to-date tax code knowledge, and best practices for compliance and savings. You act as an explainer, educator, and assistant—not a certified tax preparer or legal advisor. Be short in your answer, less than 100 chars"""
class AsyncAIConversation:
def __init__(self):
self.stt_processor = None
self.stt_model = None
self.llm_tokenizer = None
self.llm_model = None
self.tts_synthesizer = None
self.speaker_embedding = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
async def initialize_models(self):
"""Initialize all models asynchronously"""
logger.info("Initializing models...")
# Initialize STT model
await self._init_stt_model()
# Initialize LLM model
await self._init_llm_model()
# Initialize TTS model
await self._init_tts_model()
logger.info("All models initialized successfully!")
async def _init_stt_model(self):
"""Initialize Speech-to-Text model"""
logger.info("Loading STT model...")
try:
stt_model_id = "unsloth/whisper-small"
#unsloth/whisper-large-v3-turbo
self.stt_processor = AutoProcessor.from_pretrained(stt_model_id)
self.stt_model = AutoModelForSpeechSeq2Seq.from_pretrained(stt_model_id)
self.stt_model.to(self.device)
logger.info("STT model loaded successfully")
except Exception as e:
logger.error(f"Error loading STT model: {e}")
raise
async def _init_llm_model(self):
"""Initialize Large Language Model"""
logger.info("Loading LLM model...")
try:
model_name = "unsloth/Qwen3-0.6B"
#unsloth/Qwen3-0.6B-unsloth-bnb-4bit
self.llm_tokenizer = AutoTokenizer.from_pretrained(model_name)
self.llm_model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto",
device_map="auto"
)
logger.info("LLM model loaded successfully")
except Exception as e:
logger.error(f"Error loading LLM model: {e}")
raise
async def _init_tts_model(self):
"""Initialize Text-to-Speech model"""
logger.info("Loading TTS model...")
try:
# Initialize Kokoro TTS pipeline
self.tts_synthesizer = KPipeline(lang_code='a')
logger.info("TTS model loaded successfully")
except Exception as e:
logger.error(f"Error loading TTS model: {e}")
raise
async def speech_to_text(self, audio_file_path: str) -> str:
"""Convert speech to text asynchronously"""
logger.info(f"Processing audio file: {audio_file_path}")
try:
# Load audio in a separate thread to avoid blocking
def load_audio():
return librosa.load(audio_file_path, sr=16000)
loop = asyncio.get_event_loop()
speech_array, sampling_rate = await loop.run_in_executor(None, load_audio)
# Convert to tensor
speech_array_pt = torch.from_numpy(speech_array).unsqueeze(0).to(self.device)
# Process input features
input_features = self.stt_processor(
speech_array,
sampling_rate=sampling_rate,
return_tensors="pt"
).input_features.to(self.device)
# Generate predictions
with torch.no_grad():
predicted_ids = self.stt_model.generate(input_features)
# Decode predictions
transcription = self.stt_processor.batch_decode(predicted_ids, skip_special_tokens=True)
result = transcription[0] if transcription else ""
logger.info(f"STT result: {result}")
return result
except Exception as e:
logger.error(f"Error in speech_to_text: {e}")
return ""
async def process_with_llm(self, text: str, system_prompt: Optional[str] = None) -> Dict[str, str]:
"""Process text with LLM and return both thinking and content"""
logger.info(f"Processing text with LLM: {text[:50]}...")
try:
# Prepare messages
messages = [
{"role": "user", "content": text}
]
if system_prompt:
messages.insert(0, {"role": "system", "content": system_prompt})
# Apply chat template
formatted_text = self.llm_tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False
)
# Tokenize
model_inputs = self.llm_tokenizer([formatted_text], return_tensors="pt").to(self.llm_model.device)
# Generate response
with torch.no_grad():
generated_ids = self.llm_model.generate(
**model_inputs,
max_new_tokens=512,
temperature=0.7,
do_sample=True,
pad_token_id=self.llm_tokenizer.eos_token_id
)
# Extract new tokens
output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist()
# Parse thinking content
try:
# Find the end of thinking token (</think>)
index = len(output_ids) - output_ids[::-1].index(151668)
except ValueError:
index = 0
thinking_content = self.llm_tokenizer.decode(output_ids[:index], skip_special_tokens=True).strip("\n")
content = self.llm_tokenizer.decode(output_ids[index:], skip_special_tokens=True).strip("\n")
result = {
"thinking": thinking_content,
"response": content
}
logger.info(f"LLM response generated: {content[:50]}...")
return result
except Exception as e:
logger.error(f"Error in process_with_llm: {e}")
return {"thinking": "", "response": "Sorry, I encountered an error processing your request."}
async def text_to_speech(self, text: str, output_path: str = "response.wav") -> str:
"""Convert text to speech asynchronously"""
logger.info(f"Converting text to speech: {text[:50]}...")
try:
# Generate speech in a separate thread to avoid blocking
def generate_speech():
# Generate audio using Kokoro TTS
generator = self.tts_synthesizer(text, voice='af_heart')
# Get the first generated audio chunk
for i, (gs, ps, audio) in enumerate(generator):
if i == 0: # Use the first chunk
return audio
return None
loop = asyncio.get_event_loop()
audio_data = await loop.run_in_executor(None, generate_speech)
if audio_data is None:
raise ValueError("Failed to generate audio")
# Save audio file with Kokoro's default sample rate (24000 Hz)
sf.write(output_path, audio_data, samplerate=24000)
logger.info(f"Audio saved to: {output_path}")
return output_path
except Exception as e:
logger.error(f"Error in text_to_speech: {e}")
return ""
async def process_conversation(self, audio_file_path: str, system_prompt: Optional[str] = None) -> Dict[str, Any]:
"""Complete conversation pipeline: STT -> LLM -> TTS"""
start_time = time.time()
logger.info("Starting conversation processing...")
try:
# Step 1: Speech to Text
stt_start = time.time()
transcribed_text = await self.speech_to_text(audio_file_path)
stt_time = time.time() - stt_start
if not transcribed_text:
return {"error": "Failed to transcribe audio"}
# Step 2: Process with LLM
llm_start = time.time()
llm_result = await self.process_with_llm(transcribed_text, system_prompt)
llm_time = time.time() - llm_start
# Step 3: Text to Speech
tts_start = time.time()
audio_output_path = await self.text_to_speech(llm_result["response"])
tts_time = time.time() - tts_start
total_time = time.time() - start_time
result = {
"input_audio": audio_file_path,
"transcribed_text": transcribed_text,
"thinking": llm_result["thinking"],
"response_text": llm_result["response"],
"output_audio": audio_output_path,
"processing_times": {
"stt": stt_time,
"llm": llm_time,
"tts": tts_time,
"total": total_time
}
}
logger.info(f"Conversation processed successfully in {total_time:.2f} seconds")
return result
except Exception as e:
logger.error(f"Error in process_conversation: {e}")
return {"error": str(e)}
async def batch_process(self, audio_files: list, system_prompt: Optional[str] = None) -> list:
"""Process multiple audio files concurrently"""
logger.info(f"Processing {len(audio_files)} audio files...")
# Create tasks for concurrent processing
tasks = [
self.process_conversation(audio_file, system_prompt)
for audio_file in audio_files
]
# Process all files concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
logger.info("Batch processing completed")
return results
# Initialize the conversation system
ai_conversation = AsyncAIConversation()
# Usage example and demo functions
async def demo_conversation():
"""Demonstration of the conversation system"""
# Initialize all models
await ai_conversation.initialize_models()
# Create the async function wrapper for Gradio
async def process_audio_gradio(audio_file, system_prompt_input):
"""Processes audio file and system prompt for Gradio interface."""
if audio_file is None:
return "Please upload an audio file.", "", "", None
# Gradio provides the file path
audio_path = audio_file
# Process the conversation using the initialized ai_conversation instance
try:
result = await ai_conversation.process_conversation(
audio_file_path=audio_path,
system_prompt=system_prompt_input
)
if "error" in result:
return f"Error: {result['error']}", "", "", None
else:
return (
f"Transcribed: {result['transcribed_text']}\nThinking: {result['thinking']}",
result['response_text'],
result['output_audio'],
result['processing_times']
)
except Exception as e:
return f"An unexpected error occurred: {e}", "", "", None
# Define the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Asynchronous AI Conversation System")
gr.Markdown("Upload an audio file and provide a system prompt to get a response.")
with gr.Column():
audio_input = gr.Audio(label="Upload Audio File", type="filepath")
process_button = gr.Button("Process Conversation")
system_prompt_input = gr.Textbox(label="System Prompt", value=system_prompt_0)
with gr.Column():
status_output = gr.Textbox(label="Status/Transcription/Thinking", interactive=False)
response_text_output = gr.Textbox(label="AI Response Text", interactive=False)
response_audio_output = gr.Audio(label="AI Response Audio", interactive=False)
processing_times_output = gr.JSON(label="Processing Times")
# Link button click to the async function
process_button.click(
fn=process_audio_gradio,
inputs=[audio_input, system_prompt_input],
outputs=[status_output, response_text_output, response_audio_output, processing_times_output]
)
if __name__ == "__main__":
def initiate():
asyncio.run(demo_conversation())
initiate()
# Gradio launch itself runs an event loop.
# Ensure ai_conversation is initialized in the notebook before this cell is run. s
demo.launch(debug=False, share=True) |