Spaces:
Runtime error
Runtime error
import os | |
import re | |
import gradio as gr | |
import tempfile | |
from pydub import AudioSegment | |
from pydub.utils import which | |
import edge_tts | |
import asyncio | |
import nest_asyncio | |
nest_asyncio.apply() | |
from openai import OpenAI | |
sync_client = OpenAI( | |
base_url="https://t2t.fanheroapi.com/v1", | |
api_key="tela" | |
) | |
# Ensuring pydub can locate ffmpeg | |
AudioSegment.converter = which("ffmpeg") | |
# TELA endpoint for text-to-text generation | |
TELA_API_URL = "https://t2t.fanheroapi.com/v1/chat/completions" | |
# Headers for API request | |
headers = { | |
"Content-Type": "application/json", | |
"Accept": "application/json" | |
} | |
# TELA endpoint for speech-to-text generation | |
TELA_TRANSCRIPT_AUDIO_URL = "http://104.171.203.212:8000/speech-to-text/" | |
system_instruction = """ | |
Responda e mantenha a conversa de forma amigavel, concisa, clara e aberta. | |
Evite qualquer desnecessaria introducao. | |
Responda em um tom normal, de conversacao e sempre amigavel e suportivo. | |
""" | |
# Function to convert audio to mp3 using pydub | |
def convert_to_mp3(audio_file_path): | |
temp_mp3 = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) | |
try: | |
audio = AudioSegment.from_file(audio_file_path) | |
audio.export(temp_mp3.name, format="mp3") | |
return temp_mp3.name | |
except Exception as e: | |
print(f"Error converting audio: {e}") | |
return None | |
# Function to send audio to the speech-to-text endpoint | |
def transcript(audio_file_path): | |
if audio_file_path is None: | |
return {"data": "failed", "error": "No audio file provided."} | |
mp3_file_path = convert_to_mp3(audio_file_path) | |
if not mp3_file_path: | |
return {"data": "failed", "error": "Failed to convert audio to mp3."} | |
try: | |
print(f"Transcription API URL: {TELA_TRANSCRIPT_AUDIO_URL}") | |
with open(mp3_file_path, 'rb') as f: | |
files = {'file': f} | |
response = requests.post(TELA_TRANSCRIPT_AUDIO_URL, files=files) | |
print(f"Response Status: {response.status_code}") | |
print(f"Response Text: {response.text}") | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return {"data": "failed", "error": f"Error {response.status_code}: {response.text}"} | |
except Exception as e: | |
return {"data": "failed", "error": str(e)} | |
finally: | |
if mp3_file_path and os.path.exists(mp3_file_path): | |
try: | |
os.remove(mp3_file_path) | |
except OSError as e: | |
print(f"Error deleting temporary file: {e}") | |
# Function to extract user input from transcription | |
def extract_user_input(transcription_response): | |
try: | |
transcript_segments = transcription_response.get('result', []) | |
user_input = "".join([segment['text'] for segment in transcript_segments]) | |
return user_input.strip() | |
except KeyError: | |
return "" | |
# Function to format the AI response | |
def format_generated_response(response): | |
if response is None: | |
return "Error: No valid response received." | |
try: | |
# Extract the generated text from the response | |
generated_text = response['choices'][0]['message']['content'] | |
partial_text = re.sub(r'<.*?>', '', generated_text) | |
cleaned_text = re.sub(r'#.*?\n', '', partial_text) | |
return cleaned_text.strip() | |
except (KeyError, IndexError) as e: | |
return f"Error: Missing key or index {e} in response." | |
# Function to generate speech using edge_tts | |
def generate_speech(text): | |
tts_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
async def generate_tts(): | |
tts = edge_tts.Communicate(text, voice="pt-BR-AntonioNeural") | |
await tts.save(tts_file.name) | |
try: | |
asyncio.run(generate_tts()) | |
print(f"TTS audio saved to: {tts_file.name}") | |
return tts_file.name | |
except Exception as e: | |
print(f"Error generating TTS: {e}") | |
return None | |
# Main chatbot conversation function | |
def chatbot_conversation(audio_file_path): | |
try: | |
transcription = transcript(audio_file_path) | |
user_input = extract_user_input(transcription) | |
if not user_input: | |
return "I could not generate the text. Please try again.", None | |
if history is None: | |
history = [] | |
else: | |
for val in history: | |
if val[0]: | |
messages.append({"role": "user", "content": val[0]}) | |
if val[1]: | |
messages.append({"role": "assistant", "content": val[1]}) | |
response = "" | |
for message in sync_client.chat.completions.create( | |
model="tela-gpt4o", | |
messages=[ | |
{"role": "system", "content": system_message}, | |
], | |
stream=True, | |
max_tokens=1024, # Still concise response | |
temperature=0, # Creative output | |
response_format={"type": "text"} | |
): | |
token = message.choices[0].delta.content | |
response += token | |
yield response | |
if response: | |
history.append([ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": response} | |
]) | |
tts_file_name = generate_speech(response) | |
if tts_file_name: | |
return formatted_output, tts_file_name | |
else: | |
return formatted_output, None | |
else: | |
return "I could not synthesize the audio. Please try again.", None | |
#def respond( | |
# message, | |
# history: list[tuple[str, str]], | |
# system_message, | |
# max_tokens, | |
# temperature, | |
# top_p, | |
#): | |
# messages = [] | |
# if history is None: | |
# history = [] | |
# else: | |
# messages.append({"role": "user", "content": message}) | |
# response = "" | |
# for message in client.chat_completion( | |
# messages, | |
# max_tokens=max_tokens, | |
# stream=True, | |
# temperature=temperature, | |
# top_p=top_p, | |
# ): | |
# token = message.choices[0].delta.content | |
# response += token | |
# yield response | |
#if response: | |
# history.append([ | |
# {"role": "user", "content": message}, | |
# {"role": "assistant", "content": response} | |
# ]) | |
# tts_file_name = generate_speech(response) | |
# if tts_file_name: | |
# return formatted_output, tts_file_name | |
# else: | |
# return formatted_output, None | |
#else: | |
# return "I could not synthesize the audio. Please try again.", None | |
except Exception as e: | |
print(f"Error: {e}") | |
return "I could not understand you. Please try again.", None | |
# Gradio interface setup | |
gr.Interface( | |
fn=chatbot_conversation, | |
inputs=gr.Audio(label="User", type="filepath", streaming=True, container=True), | |
outputs=[ | |
gr.Textbox(label="Transcription"), | |
gr.Audio(type="filepath", autoplay=True, label="MAGIC Chat") | |
], | |
title="MAGIC VoiceChat", | |
description="A simple example of audio conversational AI", | |
theme="sudeepshouche/minimalist", | |
live=True | |
).launch() | |