File size: 8,053 Bytes
167bfa7
 
fc90c72
167bfa7
 
 
 
 
 
24accb9
167bfa7
 
fc90c72
7dddd00
167bfa7
 
 
 
cd58f6d
167bfa7
 
fc90c72
167bfa7
 
 
 
 
 
 
 
 
 
24accb9
167bfa7
 
 
 
24accb9
167bfa7
 
24accb9
167bfa7
 
 
24accb9
167bfa7
24accb9
167bfa7
 
 
 
24accb9
167bfa7
 
 
24accb9
 
167bfa7
 
 
 
24accb9
 
167bfa7
 
24accb9
167bfa7
 
24accb9
167bfa7
 
 
24accb9
167bfa7
 
 
 
 
24accb9
167bfa7
24accb9
167bfa7
 
24accb9
167bfa7
 
 
24accb9
167bfa7
24accb9
 
167bfa7
 
202a516
 
 
 
 
 
 
 
 
 
 
 
 
 
167bfa7
 
24accb9
167bfa7
 
 
 
 
 
 
24accb9
167bfa7
 
24accb9
167bfa7
 
 
24accb9
167bfa7
 
 
 
 
24accb9
7cdad84
 
24accb9
 
 
 
 
7cdad84
24accb9
 
 
 
 
 
7cdad84
24accb9
 
 
 
 
167bfa7
 
7cdad84
24accb9
 
84837eb
24accb9
 
 
 
 
 
 
 
7cdad84
 
 
 
24accb9
 
7cdad84
 
24accb9
7cdad84
167bfa7
 
 
 
 
7cdad84
167bfa7
 
24accb9
7cdad84
 
167bfa7
24accb9
7cdad84
167bfa7
24accb9
7cdad84
167bfa7
 
24accb9
7cdad84
167bfa7
fc90c72
167bfa7
4b4096d
167bfa7
 
 
 
 
 
 
e5c7b79
167bfa7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import os
import re
import gradio as gr
import tempfile
from pydub import AudioSegment
from pydub.utils import which
import edge_tts
import asyncio
import nest_asyncio
import requests
nest_asyncio.apply()
from openai import OpenAI

OPENAI_API_KEY = "-"
sync_client = OpenAI(
    base_url="https://t2t.fanheroapi.com/v1",
    api_key="tela"
)

# Ensuring pydub can locate ffmpeg
AudioSegment.converter = which("ffmpeg")

# TELA endpoint for speech-to-text generation
TELA_TRANSCRIPT_AUDIO_URL = "http://104.171.203.212:8000/speech-to-text/"

system_instruction = """
Responda e mantenha a conversa de forma amigavel, concisa, clara e aberta. 
Evite qualquer desnecessaria introducao. 
Responda em um tom normal, de conversacao e sempre amigavel e suportivo.
"""

def convert_to_mp3(audio_file_path):
    print("[DEBUG] Starting audio conversion to mp3.")
    temp_mp3 = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
    try:
        audio = AudioSegment.from_file(audio_file_path)
        audio.export(temp_mp3.name, format="mp3")
        print(f"[DEBUG] Successfully converted to mp3: {temp_mp3.name}")
        return temp_mp3.name
    except Exception as e:
        print(f"[ERROR] Error converting audio: {e}")
        return None

def transcript(audio_file_path):
    print("[DEBUG] Starting transcription process.")
    if audio_file_path is None:
        print("[ERROR] No audio file provided.")
        return {"data": "failed", "error": "No audio file provided."}

    mp3_file_path = convert_to_mp3(audio_file_path)
    if not mp3_file_path:
        print("[ERROR] Failed to convert audio to mp3.")
        return {"data": "failed", "error": "Failed to convert audio to mp3."}

    try:
        print("[DEBUG] Sending mp3 to transcription endpoint.")
        print(f"[DEBUG] Transcription API URL: {TELA_TRANSCRIPT_AUDIO_URL}")
        with open(mp3_file_path, 'rb') as f:
            files = {'file': f}
            response = requests.post(TELA_TRANSCRIPT_AUDIO_URL, files=files)

        print(f"[DEBUG] Response Status Code: {response.status_code}")
        print(f"[DEBUG] Response Text: {response.text}")

        if response.status_code == 200:
            print("[DEBUG] Successfully received transcription.")
            return response.json()
        else:
            print(f"[ERROR] Unexpected status code {response.status_code}: {response.text}")
            return {"data": "failed", "error": f"Error {response.status_code}: {response.text}"}

    except Exception as e:
        print(f"[ERROR] Exception during transcription: {e}")
        return {"data": "failed", "error": str(e)}
    finally:
        if mp3_file_path and os.path.exists(mp3_file_path):
            try:
                os.remove(mp3_file_path)
                print("[DEBUG] Temporary mp3 file deleted.")
            except OSError as e:
                print(f"[ERROR] Error deleting temporary file: {e}")

def extract_user_input(transcription_response):
    print("[DEBUG] Extracting user input from transcription response.")
    try:
        transcript_segments = transcription_response.get('result', [])
        user_input = "".join([segment['text'] for segment in transcript_segments])
        print(f"[DEBUG] Extracted user input: {user_input.strip()}")
        return user_input.strip()
    except KeyError as e:
        print(f"[ERROR] KeyError in transcription response: {e}")
        return ""

#def format_generated_response(response):
#    print("[DEBUG] Formatting the generated response.")
#    if response is None:
#        print("[ERROR] No response to format.")
#        return "Error: No valid response received."
#    try:
#        generated_text = response['choices'][0]['message']['content']
#        partial_text = re.sub(r'<.*?>', '', generated_text)
#        cleaned_text = re.sub(r'#.*?\n', '', partial_text)
#        print(f"[DEBUG] Formatted response: {cleaned_text.strip()}")
#        return cleaned_text.strip()
#    except (KeyError, IndexError) as e:
#        print(f"[ERROR] Error formatting response: {e}")
#        return f"Error: Missing key or index {e} in response."

def generate_speech(text):
    print("[DEBUG] Generating speech from text.")
    tts_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
    async def generate_tts():
        tts = edge_tts.Communicate(text, voice="pt-BR-AntonioNeural")
        await tts.save(tts_file.name)

    try:
        asyncio.run(generate_tts())
        print(f"[DEBUG] TTS audio saved to: {tts_file.name}")
        return tts_file.name
    except Exception as e:
        print(f"[ERROR] Error generating TTS: {e}")
        return None

def chatbot_conversation(audio_file_path):
    print("[DEBUG] Starting chatbot conversation.")
    try:
        transcription = transcript(audio_file_path)
        user_input = extract_user_input(transcription)

        if not user_input:
            print("[ERROR] No user input extracted from transcription.")
            yield "I could not generate the text. Please try again.", None
            return

        system_message = system_instruction
        history = []  # If history is meant to persist, consider storing it externally
        messages = []

        # Reconstruct history if needed (currently empty)
        for val in history:
            if val[0]:
                messages.append({"role": "user", "content": val[0]})
            if val[1]:
                messages.append({"role": "assistant", "content": val[1]})

        # Current user input
        messages.append({"role": "user", "content": user_input})
        messages.insert(0, {"role": "system", "content": system_message})

        print("[DEBUG] Sending request to sync_client for chat completion.")
        print(f"[DEBUG] Messages: {messages}")

        response = ""
        # Stream partial responses
        try:
            for message in sync_client.chat.completions.create(
                model="marco-o1",
                messages=messages,
                stream=True,
                max_tokens=1024,
                temperature=0,
                response_format={"type": "text"}
            ):
                token = message.choices[0].delta.content
                response += token
                # Yield partial text, no audio yet
                # The first output is the transcription (assistant message),
                # second output is audio, which we pass as None for now
                yield (response, None)
        except Exception as e:
            print(f"[ERROR] Error during streaming response: {e}")
            yield ("I could not understand you. Please try again.", None)
            return

        # Now that we have the full response, generate TTS
        if response:
            history.append([
                {"role": "user", "content": user_input},
                {"role": "assistant", "content": response}
            ])
            print("[DEBUG] Generating TTS for full response.")
            tts_file_name = generate_speech(response)
            if tts_file_name:
                print("[DEBUG] Returning final response and TTS file.")
                # Now yield again with final text and audio
                yield (response, tts_file_name)
            else:
                print("[ERROR] Failed to generate TTS.")
                yield (response, None)
        else:
            print("[ERROR] No response generated.")
            yield ("I could not synthesize the audio. Please try again.", None)

    except Exception as e:
        print(f"[ERROR] Exception in chatbot_conversation: {e}")
        yield ("I could not understand you. Please try again.", None)

gr.Interface(
    fn=chatbot_conversation,
    inputs=gr.Audio(label="User", type="filepath", streaming=False, container=True),
    outputs=[
        gr.Textbox(label="Transcription"),
        gr.Audio(type="filepath", autoplay=True, label="MAGIC Chat")
    ],
    title="MAGIC VoiceChat",
    description="A simple example of audio conversational AI",
    theme="sudeepshouche/minimalist",
    live=True
).launch()