Spaces:
Runtime error
Runtime error
File size: 7,387 Bytes
167bfa7 fc90c72 167bfa7 fc90c72 167bfa7 cd58f6d 167bfa7 fc90c72 167bfa7 fc90c72 167bfa7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
import os
import re
import gradio as gr
import tempfile
from pydub import AudioSegment
from pydub.utils import which
import edge_tts
import asyncio
import nest_asyncio
nest_asyncio.apply()
from openai import OpenAI
sync_client = OpenAI(
base_url="https://t2t.fanheroapi.com/v1",
api_key="tela"
)
# Ensuring pydub can locate ffmpeg
AudioSegment.converter = which("ffmpeg")
# TELA endpoint for text-to-text generation
TELA_API_URL = "https://t2t.fanheroapi.com/v1/chat/completions"
# Headers for API request
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
# TELA endpoint for speech-to-text generation
TELA_TRANSCRIPT_AUDIO_URL = "http://104.171.203.212:8000/speech-to-text/"
system_instruction = """
Responda e mantenha a conversa de forma amigavel, concisa, clara e aberta.
Evite qualquer desnecessaria introducao.
Responda em um tom normal, de conversacao e sempre amigavel e suportivo.
"""
# Function to convert audio to mp3 using pydub
def convert_to_mp3(audio_file_path):
temp_mp3 = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
try:
audio = AudioSegment.from_file(audio_file_path)
audio.export(temp_mp3.name, format="mp3")
return temp_mp3.name
except Exception as e:
print(f"Error converting audio: {e}")
return None
# Function to send audio to the speech-to-text endpoint
def transcript(audio_file_path):
if audio_file_path is None:
return {"data": "failed", "error": "No audio file provided."}
mp3_file_path = convert_to_mp3(audio_file_path)
if not mp3_file_path:
return {"data": "failed", "error": "Failed to convert audio to mp3."}
try:
print(f"Transcription API URL: {TELA_TRANSCRIPT_AUDIO_URL}")
with open(mp3_file_path, 'rb') as f:
files = {'file': f}
response = requests.post(TELA_TRANSCRIPT_AUDIO_URL, files=files)
print(f"Response Status: {response.status_code}")
print(f"Response Text: {response.text}")
if response.status_code == 200:
return response.json()
else:
return {"data": "failed", "error": f"Error {response.status_code}: {response.text}"}
except Exception as e:
return {"data": "failed", "error": str(e)}
finally:
if mp3_file_path and os.path.exists(mp3_file_path):
try:
os.remove(mp3_file_path)
except OSError as e:
print(f"Error deleting temporary file: {e}")
# Function to extract user input from transcription
def extract_user_input(transcription_response):
try:
transcript_segments = transcription_response.get('result', [])
user_input = "".join([segment['text'] for segment in transcript_segments])
return user_input.strip()
except KeyError:
return ""
# Function to format the AI response
def format_generated_response(response):
if response is None:
return "Error: No valid response received."
try:
# Extract the generated text from the response
generated_text = response['choices'][0]['message']['content']
partial_text = re.sub(r'<.*?>', '', generated_text)
cleaned_text = re.sub(r'#.*?\n', '', partial_text)
return cleaned_text.strip()
except (KeyError, IndexError) as e:
return f"Error: Missing key or index {e} in response."
# Function to generate speech using edge_tts
def generate_speech(text):
tts_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
async def generate_tts():
tts = edge_tts.Communicate(text, voice="pt-BR-AntonioNeural")
await tts.save(tts_file.name)
try:
asyncio.run(generate_tts())
print(f"TTS audio saved to: {tts_file.name}")
return tts_file.name
except Exception as e:
print(f"Error generating TTS: {e}")
return None
# Main chatbot conversation function
def chatbot_conversation(audio_file_path):
try:
transcription = transcript(audio_file_path)
user_input = extract_user_input(transcription)
if not user_input:
return "I could not generate the text. Please try again.", None
if history is None:
history = []
else:
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
response = ""
for message in sync_client.chat.completions.create(
model="tela-gpt4o",
messages=[
{"role": "system", "content": system_message},
],
stream=True,
max_tokens=1024, # Still concise response
temperature=0, # Creative output
response_format={"type": "text"}
):
token = message.choices[0].delta.content
response += token
yield response
if response:
history.append([
{"role": "user", "content": user_input},
{"role": "assistant", "content": response}
])
tts_file_name = generate_speech(response)
if tts_file_name:
return formatted_output, tts_file_name
else:
return formatted_output, None
else:
return "I could not synthesize the audio. Please try again.", None
#def respond(
# message,
# history: list[tuple[str, str]],
# system_message,
# max_tokens,
# temperature,
# top_p,
#):
# messages = []
# if history is None:
# history = []
# else:
# messages.append({"role": "user", "content": message})
# response = ""
# for message in client.chat_completion(
# messages,
# max_tokens=max_tokens,
# stream=True,
# temperature=temperature,
# top_p=top_p,
# ):
# token = message.choices[0].delta.content
# response += token
# yield response
#if response:
# history.append([
# {"role": "user", "content": message},
# {"role": "assistant", "content": response}
# ])
# tts_file_name = generate_speech(response)
# if tts_file_name:
# return formatted_output, tts_file_name
# else:
# return formatted_output, None
#else:
# return "I could not synthesize the audio. Please try again.", None
except Exception as e:
print(f"Error: {e}")
return "I could not understand you. Please try again.", None
# Gradio interface setup
gr.Interface(
fn=chatbot_conversation,
inputs=gr.Audio(label="User", type="filepath", streaming=True, container=True),
outputs=[
gr.Textbox(label="Transcription"),
gr.Audio(type="filepath", autoplay=True, label="MAGIC Chat")
],
title="MAGIC VoiceChat",
description="A simple example of audio conversational AI",
theme="sudeepshouche/minimalist",
live=True
).launch()
|