File size: 8,126 Bytes
167bfa7
 
fc90c72
167bfa7
 
 
 
 
 
24accb9
167bfa7
 
fc90c72
7dddd00
167bfa7
 
 
 
cd58f6d
167bfa7
 
fc90c72
167bfa7
 
 
 
 
 
 
 
 
 
24accb9
167bfa7
 
 
 
24accb9
167bfa7
 
24accb9
167bfa7
 
 
24accb9
167bfa7
24accb9
167bfa7
 
 
 
24accb9
167bfa7
 
 
24accb9
 
167bfa7
 
 
 
24accb9
 
167bfa7
 
24accb9
167bfa7
 
24accb9
167bfa7
 
 
24accb9
167bfa7
 
 
 
 
24accb9
167bfa7
24accb9
167bfa7
 
24accb9
167bfa7
 
 
24accb9
167bfa7
24accb9
 
167bfa7
 
202a516
 
 
 
 
 
 
 
 
 
 
 
 
 
167bfa7
 
24accb9
167bfa7
 
 
 
 
 
 
24accb9
167bfa7
 
24accb9
167bfa7
 
 
24accb9
167bfa7
 
 
 
 
24accb9
167bfa7
24accb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167bfa7
 
24accb9
 
 
 
 
 
 
 
 
 
 
 
 
 
202a516
24accb9
 
 
 
202a516
 
 
24accb9
167bfa7
24accb9
167bfa7
 
 
 
24accb9
167bfa7
 
24accb9
167bfa7
 
24accb9
167bfa7
 
24accb9
 
167bfa7
 
24accb9
167bfa7
 
fc90c72
167bfa7
4b4096d
167bfa7
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import os
import re
import gradio as gr
import tempfile
from pydub import AudioSegment
from pydub.utils import which
import edge_tts
import asyncio
import nest_asyncio
import requests
nest_asyncio.apply()
from openai import OpenAI

OPENAI_API_KEY = "-"
sync_client = OpenAI(
    base_url="https://t2t.fanheroapi.com/v1",
    api_key="tela"
)

# Ensuring pydub can locate ffmpeg
AudioSegment.converter = which("ffmpeg")

# TELA endpoint for speech-to-text generation
TELA_TRANSCRIPT_AUDIO_URL = "http://104.171.203.212:8000/speech-to-text/"

system_instruction = """
Responda e mantenha a conversa de forma amigavel, concisa, clara e aberta. 
Evite qualquer desnecessaria introducao. 
Responda em um tom normal, de conversacao e sempre amigavel e suportivo.
"""

def convert_to_mp3(audio_file_path):
    print("[DEBUG] Starting audio conversion to mp3.")
    temp_mp3 = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
    try:
        audio = AudioSegment.from_file(audio_file_path)
        audio.export(temp_mp3.name, format="mp3")
        print(f"[DEBUG] Successfully converted to mp3: {temp_mp3.name}")
        return temp_mp3.name
    except Exception as e:
        print(f"[ERROR] Error converting audio: {e}")
        return None

def transcript(audio_file_path):
    print("[DEBUG] Starting transcription process.")
    if audio_file_path is None:
        print("[ERROR] No audio file provided.")
        return {"data": "failed", "error": "No audio file provided."}

    mp3_file_path = convert_to_mp3(audio_file_path)
    if not mp3_file_path:
        print("[ERROR] Failed to convert audio to mp3.")
        return {"data": "failed", "error": "Failed to convert audio to mp3."}

    try:
        print("[DEBUG] Sending mp3 to transcription endpoint.")
        print(f"[DEBUG] Transcription API URL: {TELA_TRANSCRIPT_AUDIO_URL}")
        with open(mp3_file_path, 'rb') as f:
            files = {'file': f}
            response = requests.post(TELA_TRANSCRIPT_AUDIO_URL, files=files)

        print(f"[DEBUG] Response Status Code: {response.status_code}")
        print(f"[DEBUG] Response Text: {response.text}")

        if response.status_code == 200:
            print("[DEBUG] Successfully received transcription.")
            return response.json()
        else:
            print(f"[ERROR] Unexpected status code {response.status_code}: {response.text}")
            return {"data": "failed", "error": f"Error {response.status_code}: {response.text}"}

    except Exception as e:
        print(f"[ERROR] Exception during transcription: {e}")
        return {"data": "failed", "error": str(e)}
    finally:
        if mp3_file_path and os.path.exists(mp3_file_path):
            try:
                os.remove(mp3_file_path)
                print("[DEBUG] Temporary mp3 file deleted.")
            except OSError as e:
                print(f"[ERROR] Error deleting temporary file: {e}")

def extract_user_input(transcription_response):
    print("[DEBUG] Extracting user input from transcription response.")
    try:
        transcript_segments = transcription_response.get('result', [])
        user_input = "".join([segment['text'] for segment in transcript_segments])
        print(f"[DEBUG] Extracted user input: {user_input.strip()}")
        return user_input.strip()
    except KeyError as e:
        print(f"[ERROR] KeyError in transcription response: {e}")
        return ""

#def format_generated_response(response):
#    print("[DEBUG] Formatting the generated response.")
#    if response is None:
#        print("[ERROR] No response to format.")
#        return "Error: No valid response received."
#    try:
#        generated_text = response['choices'][0]['message']['content']
#        partial_text = re.sub(r'<.*?>', '', generated_text)
#        cleaned_text = re.sub(r'#.*?\n', '', partial_text)
#        print(f"[DEBUG] Formatted response: {cleaned_text.strip()}")
#        return cleaned_text.strip()
#    except (KeyError, IndexError) as e:
#        print(f"[ERROR] Error formatting response: {e}")
#        return f"Error: Missing key or index {e} in response."

def generate_speech(text):
    print("[DEBUG] Generating speech from text.")
    tts_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
    async def generate_tts():
        tts = edge_tts.Communicate(text, voice="pt-BR-AntonioNeural")
        await tts.save(tts_file.name)

    try:
        asyncio.run(generate_tts())
        print(f"[DEBUG] TTS audio saved to: {tts_file.name}")
        return tts_file.name
    except Exception as e:
        print(f"[ERROR] Error generating TTS: {e}")
        return None

def chatbot_conversation(audio_file_path):
    print("[DEBUG] Starting chatbot conversation.")
    try:
        transcription = transcript(audio_file_path)
        user_input = extract_user_input(transcription)

        if not user_input:
            print("[ERROR] No user input extracted from transcription.")
            return "I could not generate the text. Please try again.", None

        # Ensure we have a system_message and history variables
        system_message = system_instruction
        history = []  # If history is meant to persist, consider storing it externally
        messages = []

        # If you had previous conversation history, you could reconstruct messages here.
        for val in history:
            if val[0]:
                messages.append({"role": "user", "content": val[0]})
            if val[1]:
                messages.append({"role": "assistant", "content": val[1]})

        # Include the current user input
        messages.append({"role": "user", "content": user_input})
        messages.insert(0, {"role": "system", "content": system_message})

        print("[DEBUG] Sending request to sync_client for chat completion.")
        print(f"[DEBUG] Messages: {messages}")

        response = ""
        # Streaming response from the API
        try:
            for message in sync_client.chat.completions.create(
                model="tela-gpt4o",
                messages=messages,
                stream=True,
                max_tokens=1024,
                temperature=0,
                response_format={"type": "text"}
            ):
                token = message.choices[0].delta.content
                response += token
                # Optional: print tokens as they arrive for debugging
                print(f"[DEBUG] Partial response token received: {token}")
                yield response
        except Exception as e:
            print(f"[ERROR] Error during streaming response: {e}")
            return "I could not understand you. Please try again.", None

        #formatted_output = format_generated_response(
        #    {"choices": [{"message": {"content": response}}]}
        #)

        if response:
            # Append the conversation turn to history
            history.append([
                {"role": "user", "content": user_input},
                {"role": "assistant", "content": response}
            ])
            print("[DEBUG] Generating TTS for response.")
            tts_file_name = generate_speech(response)
            if tts_file_name:
                print("[DEBUG] Returning final response and TTS file.")
                return formatted_output, tts_file_name
            else:
                print("[ERROR] Failed to generate TTS.")
                return formatted_output, None
        else:
            print("[ERROR] No response generated.")
            return "I could not synthesize the audio. Please try again.", None

    except Exception as e:
        print(f"[ERROR] Exception in chatbot_conversation: {e}")
        return "I could not understand you. Please try again.", None

gr.Interface(
    fn=chatbot_conversation,
    inputs=gr.Audio(label="User", type="filepath", streaming=False, container=True),
    outputs=[
        gr.Textbox(label="Transcription"),
        gr.Audio(type="filepath", autoplay=True, label="MAGIC Chat")
    ],
    title="MAGIC VoiceChat",
    description="A simple example of audio conversational AI",
    theme="sudeepshouche/minimalist",
).launch()