audio-to-text / app.py
rodrigomasini's picture
Update app.py
167bfa7 verified
raw
history blame
7.39 kB
import os
import re
import gradio as gr
import tempfile
from pydub import AudioSegment
from pydub.utils import which
import edge_tts
import asyncio
import nest_asyncio
nest_asyncio.apply()
from openai import OpenAI
sync_client = OpenAI(
base_url="https://t2t.fanheroapi.com/v1",
api_key="tela"
)
# Ensuring pydub can locate ffmpeg
AudioSegment.converter = which("ffmpeg")
# TELA endpoint for text-to-text generation
TELA_API_URL = "https://t2t.fanheroapi.com/v1/chat/completions"
# Headers for API request
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
# TELA endpoint for speech-to-text generation
TELA_TRANSCRIPT_AUDIO_URL = "http://104.171.203.212:8000/speech-to-text/"
system_instruction = """
Responda e mantenha a conversa de forma amigavel, concisa, clara e aberta.
Evite qualquer desnecessaria introducao.
Responda em um tom normal, de conversacao e sempre amigavel e suportivo.
"""
# Function to convert audio to mp3 using pydub
def convert_to_mp3(audio_file_path):
temp_mp3 = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
try:
audio = AudioSegment.from_file(audio_file_path)
audio.export(temp_mp3.name, format="mp3")
return temp_mp3.name
except Exception as e:
print(f"Error converting audio: {e}")
return None
# Function to send audio to the speech-to-text endpoint
def transcript(audio_file_path):
if audio_file_path is None:
return {"data": "failed", "error": "No audio file provided."}
mp3_file_path = convert_to_mp3(audio_file_path)
if not mp3_file_path:
return {"data": "failed", "error": "Failed to convert audio to mp3."}
try:
print(f"Transcription API URL: {TELA_TRANSCRIPT_AUDIO_URL}")
with open(mp3_file_path, 'rb') as f:
files = {'file': f}
response = requests.post(TELA_TRANSCRIPT_AUDIO_URL, files=files)
print(f"Response Status: {response.status_code}")
print(f"Response Text: {response.text}")
if response.status_code == 200:
return response.json()
else:
return {"data": "failed", "error": f"Error {response.status_code}: {response.text}"}
except Exception as e:
return {"data": "failed", "error": str(e)}
finally:
if mp3_file_path and os.path.exists(mp3_file_path):
try:
os.remove(mp3_file_path)
except OSError as e:
print(f"Error deleting temporary file: {e}")
# Function to extract user input from transcription
def extract_user_input(transcription_response):
try:
transcript_segments = transcription_response.get('result', [])
user_input = "".join([segment['text'] for segment in transcript_segments])
return user_input.strip()
except KeyError:
return ""
# Function to format the AI response
def format_generated_response(response):
if response is None:
return "Error: No valid response received."
try:
# Extract the generated text from the response
generated_text = response['choices'][0]['message']['content']
partial_text = re.sub(r'<.*?>', '', generated_text)
cleaned_text = re.sub(r'#.*?\n', '', partial_text)
return cleaned_text.strip()
except (KeyError, IndexError) as e:
return f"Error: Missing key or index {e} in response."
# Function to generate speech using edge_tts
def generate_speech(text):
tts_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
async def generate_tts():
tts = edge_tts.Communicate(text, voice="pt-BR-AntonioNeural")
await tts.save(tts_file.name)
try:
asyncio.run(generate_tts())
print(f"TTS audio saved to: {tts_file.name}")
return tts_file.name
except Exception as e:
print(f"Error generating TTS: {e}")
return None
# Main chatbot conversation function
def chatbot_conversation(audio_file_path):
try:
transcription = transcript(audio_file_path)
user_input = extract_user_input(transcription)
if not user_input:
return "I could not generate the text. Please try again.", None
if history is None:
history = []
else:
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
response = ""
for message in sync_client.chat.completions.create(
model="tela-gpt4o",
messages=[
{"role": "system", "content": system_message},
],
stream=True,
max_tokens=1024, # Still concise response
temperature=0, # Creative output
response_format={"type": "text"}
):
token = message.choices[0].delta.content
response += token
yield response
if response:
history.append([
{"role": "user", "content": user_input},
{"role": "assistant", "content": response}
])
tts_file_name = generate_speech(response)
if tts_file_name:
return formatted_output, tts_file_name
else:
return formatted_output, None
else:
return "I could not synthesize the audio. Please try again.", None
#def respond(
# message,
# history: list[tuple[str, str]],
# system_message,
# max_tokens,
# temperature,
# top_p,
#):
# messages = []
# if history is None:
# history = []
# else:
# messages.append({"role": "user", "content": message})
# response = ""
# for message in client.chat_completion(
# messages,
# max_tokens=max_tokens,
# stream=True,
# temperature=temperature,
# top_p=top_p,
# ):
# token = message.choices[0].delta.content
# response += token
# yield response
#if response:
# history.append([
# {"role": "user", "content": message},
# {"role": "assistant", "content": response}
# ])
# tts_file_name = generate_speech(response)
# if tts_file_name:
# return formatted_output, tts_file_name
# else:
# return formatted_output, None
#else:
# return "I could not synthesize the audio. Please try again.", None
except Exception as e:
print(f"Error: {e}")
return "I could not understand you. Please try again.", None
# Gradio interface setup
gr.Interface(
fn=chatbot_conversation,
inputs=gr.Audio(label="User", type="filepath", streaming=True, container=True),
outputs=[
gr.Textbox(label="Transcription"),
gr.Audio(type="filepath", autoplay=True, label="MAGIC Chat")
],
title="MAGIC VoiceChat",
description="A simple example of audio conversational AI",
theme="sudeepshouche/minimalist",
live=True
).launch()