Spaces:
Runtime error
Runtime error
import os | |
import re | |
import gradio as gr | |
import tempfile | |
from pydub import AudioSegment | |
from pydub.utils import which | |
import edge_tts | |
import asyncio | |
import nest_asyncio | |
import requests | |
nest_asyncio.apply() | |
from openai import OpenAI | |
OPENAI_API_KEY = "-" | |
sync_client = OpenAI( | |
base_url="https://t2t.fanheroapi.com/v1", | |
api_key="tela" | |
) | |
# Ensuring pydub can locate ffmpeg | |
AudioSegment.converter = which("ffmpeg") | |
# TELA endpoint for speech-to-text generation | |
TELA_TRANSCRIPT_AUDIO_URL = "http://104.171.203.212:8000/speech-to-text/" | |
system_instruction = """ | |
Responda e mantenha a conversa de forma amigavel, concisa, clara e aberta. | |
Evite qualquer desnecessaria introducao. | |
Responda em um tom normal, de conversacao e sempre amigavel e suportivo. | |
""" | |
def convert_to_mp3(audio_file_path): | |
print("[DEBUG] Starting audio conversion to mp3.") | |
temp_mp3 = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) | |
try: | |
audio = AudioSegment.from_file(audio_file_path) | |
audio.export(temp_mp3.name, format="mp3") | |
print(f"[DEBUG] Successfully converted to mp3: {temp_mp3.name}") | |
return temp_mp3.name | |
except Exception as e: | |
print(f"[ERROR] Error converting audio: {e}") | |
return None | |
def transcript(audio_file_path): | |
print("[DEBUG] Starting transcription process.") | |
if audio_file_path is None: | |
print("[ERROR] No audio file provided.") | |
return {"data": "failed", "error": "No audio file provided."} | |
mp3_file_path = convert_to_mp3(audio_file_path) | |
if not mp3_file_path: | |
print("[ERROR] Failed to convert audio to mp3.") | |
return {"data": "failed", "error": "Failed to convert audio to mp3."} | |
try: | |
print("[DEBUG] Sending mp3 to transcription endpoint.") | |
print(f"[DEBUG] Transcription API URL: {TELA_TRANSCRIPT_AUDIO_URL}") | |
with open(mp3_file_path, 'rb') as f: | |
files = {'file': f} | |
response = requests.post(TELA_TRANSCRIPT_AUDIO_URL, files=files) | |
print(f"[DEBUG] Response Status Code: {response.status_code}") | |
print(f"[DEBUG] Response Text: {response.text}") | |
if response.status_code == 200: | |
print("[DEBUG] Successfully received transcription.") | |
return response.json() | |
else: | |
print(f"[ERROR] Unexpected status code {response.status_code}: {response.text}") | |
return {"data": "failed", "error": f"Error {response.status_code}: {response.text}"} | |
except Exception as e: | |
print(f"[ERROR] Exception during transcription: {e}") | |
return {"data": "failed", "error": str(e)} | |
finally: | |
if mp3_file_path and os.path.exists(mp3_file_path): | |
try: | |
os.remove(mp3_file_path) | |
print("[DEBUG] Temporary mp3 file deleted.") | |
except OSError as e: | |
print(f"[ERROR] Error deleting temporary file: {e}") | |
def extract_user_input(transcription_response): | |
print("[DEBUG] Extracting user input from transcription response.") | |
try: | |
transcript_segments = transcription_response.get('result', []) | |
user_input = "".join([segment['text'] for segment in transcript_segments]) | |
print(f"[DEBUG] Extracted user input: {user_input.strip()}") | |
return user_input.strip() | |
except KeyError as e: | |
print(f"[ERROR] KeyError in transcription response: {e}") | |
return "" | |
#def format_generated_response(response): | |
# print("[DEBUG] Formatting the generated response.") | |
# if response is None: | |
# print("[ERROR] No response to format.") | |
# return "Error: No valid response received." | |
# try: | |
# generated_text = response['choices'][0]['message']['content'] | |
# partial_text = re.sub(r'<.*?>', '', generated_text) | |
# cleaned_text = re.sub(r'#.*?\n', '', partial_text) | |
# print(f"[DEBUG] Formatted response: {cleaned_text.strip()}") | |
# return cleaned_text.strip() | |
# except (KeyError, IndexError) as e: | |
# print(f"[ERROR] Error formatting response: {e}") | |
# return f"Error: Missing key or index {e} in response." | |
def generate_speech(text): | |
print("[DEBUG] Generating speech from text.") | |
tts_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
async def generate_tts(): | |
tts = edge_tts.Communicate(text, voice="pt-BR-AntonioNeural") | |
await tts.save(tts_file.name) | |
try: | |
asyncio.run(generate_tts()) | |
print(f"[DEBUG] TTS audio saved to: {tts_file.name}") | |
return tts_file.name | |
except Exception as e: | |
print(f"[ERROR] Error generating TTS: {e}") | |
return None | |
def chatbot_conversation(audio_file_path): | |
print("[DEBUG] Starting chatbot conversation.") | |
try: | |
transcription = transcript(audio_file_path) | |
user_input = extract_user_input(transcription) | |
if not user_input: | |
print("[ERROR] No user input extracted from transcription.") | |
return "I could not generate the text. Please try again.", None | |
# Ensure we have a system_message and history variables | |
system_message = system_instruction | |
history = [] # If history is meant to persist, consider storing it externally | |
messages = [] | |
# If you had previous conversation history, you could reconstruct messages here. | |
for val in history: | |
if val[0]: | |
messages.append({"role": "user", "content": val[0]}) | |
if val[1]: | |
messages.append({"role": "assistant", "content": val[1]}) | |
# Include the current user input | |
messages.append({"role": "user", "content": user_input}) | |
messages.insert(0, {"role": "system", "content": system_message}) | |
print("[DEBUG] Sending request to sync_client for chat completion.") | |
print(f"[DEBUG] Messages: {messages}") | |
response = "" | |
# Streaming response from the API | |
try: | |
for message in sync_client.chat.completions.create( | |
model="tela-gpt4o", | |
messages=messages, | |
stream=True, | |
max_tokens=1024, | |
temperature=0, | |
response_format={"type": "text"} | |
): | |
token = message.choices[0].delta.content | |
response += token | |
# Optional: print tokens as they arrive for debugging | |
print(f"[DEBUG] Partial response token received: {token}") | |
yield response | |
except Exception as e: | |
print(f"[ERROR] Error during streaming response: {e}") | |
return "I could not understand you. Please try again.", None | |
#formatted_output = format_generated_response( | |
# {"choices": [{"message": {"content": response}}]} | |
#) | |
if response: | |
# Append the conversation turn to history | |
history.append([ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": response} | |
]) | |
print("[DEBUG] Generating TTS for response.") | |
tts_file_name = generate_speech(response) | |
if tts_file_name: | |
print("[DEBUG] Returning final response and TTS file.") | |
return formatted_output, tts_file_name | |
else: | |
print("[ERROR] Failed to generate TTS.") | |
return formatted_output, None | |
else: | |
print("[ERROR] No response generated.") | |
return "I could not synthesize the audio. Please try again.", None | |
except Exception as e: | |
print(f"[ERROR] Exception in chatbot_conversation: {e}") | |
return "I could not understand you. Please try again.", None | |
gr.Interface( | |
fn=chatbot_conversation, | |
inputs=gr.Audio(label="User", type="filepath", streaming=False, container=True), | |
outputs=[ | |
gr.Textbox(label="Transcription"), | |
gr.Audio(type="filepath", autoplay=True, label="MAGIC Chat") | |
], | |
title="MAGIC VoiceChat", | |
description="A simple example of audio conversational AI", | |
theme="sudeepshouche/minimalist", | |
).launch() | |