File size: 7,387 Bytes
167bfa7
 
fc90c72
167bfa7
 
 
 
 
 
 
 
fc90c72
167bfa7
 
 
 
cd58f6d
167bfa7
 
fc90c72
167bfa7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc90c72
167bfa7
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import os
import re
import gradio as gr
import tempfile
from pydub import AudioSegment
from pydub.utils import which
import edge_tts
import asyncio
import nest_asyncio
nest_asyncio.apply()
from openai import OpenAI

sync_client = OpenAI(
    base_url="https://t2t.fanheroapi.com/v1",
    api_key="tela"
)

# Ensuring pydub can locate ffmpeg
AudioSegment.converter = which("ffmpeg")

# TELA endpoint for text-to-text generation
TELA_API_URL = "https://t2t.fanheroapi.com/v1/chat/completions"

# Headers for API request
headers = {
    "Content-Type": "application/json",
    "Accept": "application/json"
}

# TELA endpoint for speech-to-text generation
TELA_TRANSCRIPT_AUDIO_URL = "http://104.171.203.212:8000/speech-to-text/"

system_instruction = """
Responda e mantenha a conversa de forma amigavel, concisa, clara e aberta. 
Evite qualquer desnecessaria introducao. 
Responda em um tom normal, de conversacao e sempre amigavel e suportivo.
"""

# Function to convert audio to mp3 using pydub
def convert_to_mp3(audio_file_path):
    temp_mp3 = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
    try:
        audio = AudioSegment.from_file(audio_file_path)
        audio.export(temp_mp3.name, format="mp3")
        return temp_mp3.name
    except Exception as e:
        print(f"Error converting audio: {e}")
        return None

# Function to send audio to the speech-to-text endpoint
def transcript(audio_file_path):
    if audio_file_path is None:
        return {"data": "failed", "error": "No audio file provided."}

    mp3_file_path = convert_to_mp3(audio_file_path)
    if not mp3_file_path:
        return {"data": "failed", "error": "Failed to convert audio to mp3."}

    try:
        print(f"Transcription API URL: {TELA_TRANSCRIPT_AUDIO_URL}")
        with open(mp3_file_path, 'rb') as f:
            files = {'file': f}
            response = requests.post(TELA_TRANSCRIPT_AUDIO_URL, files=files)

        print(f"Response Status: {response.status_code}")
        print(f"Response Text: {response.text}")

        if response.status_code == 200:
            return response.json()
        else:
            return {"data": "failed", "error": f"Error {response.status_code}: {response.text}"}

    except Exception as e:
        return {"data": "failed", "error": str(e)}
    finally:
        if mp3_file_path and os.path.exists(mp3_file_path):
            try:
                os.remove(mp3_file_path)
            except OSError as e:
                print(f"Error deleting temporary file: {e}")

# Function to extract user input from transcription
def extract_user_input(transcription_response):
    try:
        transcript_segments = transcription_response.get('result', [])
        user_input = "".join([segment['text'] for segment in transcript_segments])
        return user_input.strip()
    except KeyError:
        return ""


# Function to format the AI response
def format_generated_response(response):
    if response is None:
        return "Error: No valid response received."
    try:
        # Extract the generated text from the response
        generated_text = response['choices'][0]['message']['content']
        partial_text = re.sub(r'<.*?>', '', generated_text)
        cleaned_text = re.sub(r'#.*?\n', '', partial_text)
        return cleaned_text.strip()
    except (KeyError, IndexError) as e:
        return f"Error: Missing key or index {e} in response."

# Function to generate speech using edge_tts
def generate_speech(text):
    tts_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
    async def generate_tts():
        tts = edge_tts.Communicate(text, voice="pt-BR-AntonioNeural")
        await tts.save(tts_file.name)

    try:
        asyncio.run(generate_tts())
        print(f"TTS audio saved to: {tts_file.name}")
        return tts_file.name
    except Exception as e:
        print(f"Error generating TTS: {e}")
        return None

# Main chatbot conversation function
def chatbot_conversation(audio_file_path):
    try:
        transcription = transcript(audio_file_path)
        user_input = extract_user_input(transcription)

        if not user_input:
            return "I could not generate the text. Please try again.", None
        
        if history is None:
            history = []
        else:
            for val in history:
                if val[0]:
                    messages.append({"role": "user", "content": val[0]})
                if val[1]:
                    messages.append({"role": "assistant", "content": val[1]})

        response = ""
        
        for message in sync_client.chat.completions.create(
        model="tela-gpt4o",
        messages=[
            {"role": "system", "content": system_message},
        ],
        stream=True,
        max_tokens=1024,  # Still concise response
        temperature=0,  # Creative output
        response_format={"type": "text"}
        ):
            token = message.choices[0].delta.content
            response += token
            yield response
        
        if response:
            history.append([
                {"role": "user", "content": user_input},
                {"role": "assistant", "content": response}
            ])
            tts_file_name = generate_speech(response)
            if tts_file_name:
                return formatted_output, tts_file_name
            else:
                return formatted_output, None
        else:
            return "I could not synthesize the audio. Please try again.", None    
        
        #def respond(
        #    message,
        #    history: list[tuple[str, str]],
        #    system_message,
        #    max_tokens,
        #    temperature,
        #    top_p,
        #):
        #    messages = []
        
        #    if history is None:
        #        history = []
        #    else:
            
        
        #    messages.append({"role": "user", "content": message})
        
        #    response = ""
        
        #    for message in client.chat_completion(
        #        messages,
        #        max_tokens=max_tokens,
        #        stream=True,
        #        temperature=temperature,
        #        top_p=top_p,
        #    ):
        #        token = message.choices[0].delta.content
        
        #        response += token
        #        yield response

        #if response:
        #    history.append([
        #        {"role": "user", "content": message},
        #        {"role": "assistant", "content": response}
        #    ])
        #    tts_file_name = generate_speech(response)
        #    if tts_file_name:
        #        return formatted_output, tts_file_name
        #    else:
        #        return formatted_output, None
        #else:
        #    return "I could not synthesize the audio. Please try again.", None

    except Exception as e:
        print(f"Error: {e}")
        return "I could not understand you. Please try again.", None

# Gradio interface setup
gr.Interface(
    fn=chatbot_conversation,
    inputs=gr.Audio(label="User", type="filepath", streaming=True, container=True),
    outputs=[
        gr.Textbox(label="Transcription"),
        gr.Audio(type="filepath", autoplay=True, label="MAGIC Chat")
    ],
    title="MAGIC VoiceChat",
    description="A simple example of audio conversational AI",
    theme="sudeepshouche/minimalist",
    live=True
).launch()