Spaces:
Running
Running
import gradio as gr | |
import requests | |
from datetime import datetime | |
import json | |
from dotenv import load_dotenv | |
import os | |
import tempfile | |
load_dotenv() | |
API_BASE_URL = os.getenv("API_BASE_URL","http://localhost:8000") | |
class AudioState: | |
def __init__(self): | |
self.current_audio_id = None | |
self.current_transcription = None | |
self.audio_data = None | |
self.audio_url = None | |
state = AudioState() | |
def download_audio(url): | |
if not url: | |
return None | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
# Create a temporary file | |
temp_dir = tempfile.gettempdir() | |
temp_path = os.path.join(temp_dir, f"temp_audio_{datetime.now().timestamp()}.mp3") | |
# Write the content to the temporary file | |
with open(temp_path, 'wb') as f: | |
f.write(response.content) | |
return temp_path | |
except Exception as e: | |
print(f"Error downloading audio: {str(e)}") | |
return None | |
def upload_audio(audio_path, language_code): | |
if audio_path is None: | |
return None, "Please record or upload an audio file.", None, None, None | |
try: | |
with open(audio_path, 'rb') as audio_file: | |
files = { | |
'file': ('audio.wav', audio_file, 'audio/wav') | |
} | |
data = { | |
'language_code': language_code | |
} | |
response = requests.post(f"{API_BASE_URL}/upload/", files=files, data=data) | |
response.raise_for_status() | |
audio_data = response.json() | |
# Store the audio data in state | |
state.current_audio_id = audio_data.get('id') | |
state.current_transcription = audio_data.get('original_transcribed_text', '') | |
state.audio_data = audio_data | |
state.audio_url = audio_data.get('file_path') | |
# Download the audio file for playback | |
audio_filepath = download_audio(state.audio_url) | |
status = f"""Upload successful! | |
Audio ID: {state.current_audio_id} | |
Transcription completed: Yes""" | |
return ( | |
state.current_transcription, | |
status, | |
state.current_transcription, | |
f"Ready to edit transcription for Audio ID: {state.current_audio_id}", | |
audio_filepath | |
) | |
except requests.exceptions.RequestException as e: | |
error_msg = f"Error uploading audio: {str(e)}" | |
return None, error_msg, None, "Upload failed", None | |
def get_audio_by_id(audio_id): | |
try: | |
response = requests.get(f"{API_BASE_URL}/audio/{audio_id}/") | |
response.raise_for_status() | |
audio_data = response.json() | |
# Update state with fetched data | |
state.current_audio_id = audio_data['id'] | |
state.current_transcription = audio_data.get('original_transcribed_text', '') | |
state.audio_data = audio_data | |
state.audio_url = audio_data.get('file_path') | |
# Download the audio file for playback | |
audio_filepath = download_audio(state.audio_url) | |
return ( | |
audio_data.get('original_transcribed_text', ''), | |
audio_data.get('updated_transcribed_text', ''), | |
f"Loaded Audio ID: {audio_id}", | |
audio_filepath | |
) | |
except requests.exceptions.RequestException as e: | |
return None, None, f"Error loading audio: {str(e)}", None | |
def update_transcription(transcription, audio_id=None): | |
# Use either provided ID or stored ID | |
update_id = audio_id if audio_id else state.current_audio_id | |
if not update_id: | |
return "No audio ID provided or selected." | |
try: | |
response = requests.put( | |
f"{API_BASE_URL}/audio/{update_id}/", | |
json={"updated_transcribed_text": transcription} | |
) | |
response.raise_for_status() | |
updated_data = response.json() | |
return f"Transcription updated successfully for Audio ID: {update_id}!" | |
except requests.exceptions.RequestException as e: | |
return f"Error updating transcription: {str(e)}" | |
def list_audios(): | |
try: | |
response = requests.get(f"{API_BASE_URL}/audio/") | |
response.raise_for_status() | |
audios = response.json() | |
if not audios: | |
return "No audio files found." | |
result = [] | |
for audio in audios: | |
result.append(f"ID: {audio['id']}") | |
result.append(f"Original Transcription: {audio['original_transcribed_text']}") | |
if audio.get('updated_transcribed_text'): | |
result.append(f"Updated Transcription: {audio['updated_transcribed_text']}") | |
result.append("---") | |
return "\n".join(result) | |
except requests.exceptions.RequestException as e: | |
return f"Error listing audios: {str(e)}" | |
# Create the Gradio interface | |
with gr.Blocks(title="Audio Transcription System") as app: | |
gr.Markdown("# Audio Transcription System") | |
with gr.Tab("Record & Transcribe"): | |
with gr.Row(): | |
with gr.Column(): | |
audio_input = gr.Audio( | |
sources=["microphone", "upload"], | |
type="filepath", | |
label="Record or Upload Audio" | |
) | |
language_dropdown = gr.Dropdown( | |
choices=["rw", "en", "es", "fr", "de"], | |
value="rw", | |
label="Language Code" | |
) | |
upload_button = gr.Button("Process Audio", variant="primary") | |
with gr.Column(): | |
status_output = gr.Textbox( | |
label="Status", | |
interactive=False, | |
lines=5 | |
) | |
audio_playback = gr.Audio( | |
label="Audio Playback", | |
interactive=False, | |
type="filepath" | |
) | |
transcription_output = gr.Textbox( | |
label="Original Transcription", | |
interactive=False, | |
lines=3 | |
) | |
edit_transcription = gr.Textbox( | |
label="Edit Transcription", | |
lines=3, | |
interactive=True | |
) | |
edit_status = gr.Textbox( | |
label="Edit Status", | |
interactive=False | |
) | |
update_button = gr.Button("Update Transcription", variant="primary") | |
with gr.Tab("Edit by ID"): | |
with gr.Row(): | |
with gr.Column(): | |
audio_id_input = gr.Number( | |
label="Audio ID", | |
precision=0 | |
) | |
load_button = gr.Button("Load Audio", variant="primary") | |
audio_playback_by_id = gr.Audio( | |
label="Audio Playback", | |
interactive=False, | |
type="filepath" | |
) | |
with gr.Column(): | |
loaded_original = gr.Textbox( | |
label="Original Transcription", | |
interactive=False, | |
lines=3 | |
) | |
loaded_current = gr.Textbox( | |
label="Current Transcription", | |
interactive=False, | |
lines=3 | |
) | |
new_transcription = gr.Textbox( | |
label="New Transcription", | |
lines=3, | |
interactive=True | |
) | |
update_status = gr.Textbox( | |
label="Update Status", | |
interactive=False | |
) | |
update_by_id_button = gr.Button("Update Transcription", variant="primary") | |
with gr.Tab("View All Recordings"): | |
refresh_button = gr.Button("Refresh Audio List") | |
audio_list = gr.Textbox( | |
label="Audio Files", | |
lines=15, | |
interactive=False | |
) | |
# Event handlers | |
upload_button.click( | |
fn=upload_audio, | |
inputs=[audio_input, language_dropdown], | |
outputs=[transcription_output, status_output, edit_transcription, edit_status, audio_playback] | |
) | |
update_button.click( | |
fn=update_transcription, | |
inputs=[edit_transcription], | |
outputs=edit_status | |
) | |
load_button.click( | |
fn=get_audio_by_id, | |
inputs=[audio_id_input], | |
outputs=[loaded_original, loaded_current, update_status, audio_playback_by_id] | |
) | |
update_by_id_button.click( | |
fn=update_transcription, | |
inputs=[new_transcription, audio_id_input], | |
outputs=update_status | |
) | |
refresh_button.click( | |
fn=list_audios, | |
inputs=[], | |
outputs=audio_list | |
) | |
if __name__ == "__main__": | |
app.launch() |