Spaces:
Sleeping
Sleeping
import asyncio | |
import gradio as gr | |
import os | |
from agent import AudioAgent | |
# Global agent instance | |
agent = None | |
# Global demo instance | |
demo = None | |
def get_share_url(path): | |
"""Get the share URL for a given path""" | |
agent_url = os.environ.get('AGENT_URL') | |
if agent_url: | |
return f"{agent_url}/gradio_api/file={path}" | |
if demo: | |
return f"{demo.share_url}/gradio_api/file={path}" | |
return path | |
def update_agent(model_name, temperature, api_key): | |
"""Update the agent with new configuration""" | |
global agent | |
try: | |
agent = AudioAgent( | |
model_name=model_name, | |
temperature=float(temperature), | |
api_key=api_key | |
) | |
return True, None | |
except Exception as e: | |
return False, str(e) | |
def user_input(user_message, audio_files, history, custom_history, model_name, temperature, api_key): | |
""" | |
Handle user input with text and audio files | |
""" | |
# Try to update agent configuration | |
success, error = update_agent(model_name, temperature, api_key) | |
if not success: | |
raise gr.Error(error) | |
if not user_message.strip() and not audio_files: | |
return "", audio_files, history, custom_history | |
# Process audio files into URLs | |
audio_file_urls = [] | |
if audio_files: | |
for audio_file in audio_files: | |
if hasattr(audio_file, 'name'): | |
file_path = audio_file.name | |
else: | |
file_path = str(audio_file) | |
audio_file_urls.append(get_share_url(file_path)) | |
# Add user message to history with input files | |
history.append({ | |
"role": "user", | |
"content": user_message, | |
}) | |
# Update custom history | |
custom_history.append({ | |
"role": "user", | |
"content": user_message, | |
"input_files": audio_file_urls | |
}) | |
return "", audio_files, history, custom_history | |
async def bot_response(history, audio_file_urls, custom_history): | |
""" | |
Generate bot response using the agent | |
""" | |
if not agent: | |
raise gr.Error("Please configure the agent first") | |
if not history or history[-1]["role"] != "user": | |
return history, [] | |
# Get the user message and input files | |
user_message = custom_history[-1]["content"] | |
input_files = custom_history[-1].get("input_files", []) | |
# If message is empty but we have audio files, provide default message | |
if not user_message.strip() and audio_file_urls: | |
user_message = "Please process these audio files" | |
try: | |
# Use the agent's run_agent method with history | |
result = await agent.run_agent(user_message, input_files, custom_history[:-1]) | |
# Extract the final response and audio files from the result | |
final_response = result["final_response"] | |
output_audio_files = result["output_audio_files"] | |
# Add assistant response to history with output files | |
history.append({ | |
"role": "assistant", | |
"content": final_response, | |
}) | |
# Update custom history | |
custom_history.append({ | |
"role": "assistant", | |
"content": final_response, | |
"output_files": output_audio_files | |
}) | |
return history, output_audio_files | |
except Exception as e: | |
history.pop() | |
custom_history.pop() | |
raise gr.Error(str(e)) | |
def bot_response_sync(history, audio_file_urls, custom_history): | |
""" | |
Synchronous wrapper for the async bot response | |
""" | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
return loop.run_until_complete(bot_response(history, audio_file_urls, custom_history)) | |
finally: | |
loop.close() | |
def create_interface(): | |
with gr.Blocks( | |
title="Audio Agent - Professional Audio Processing", | |
theme=gr.themes.Default(), | |
) as interface: | |
gr.Markdown(""" | |
# Audio Agent - Your AI Audio Assistant | |
Upload your audio files and tell me what you need. I'll handle the rest! | |
""") | |
# Hidden state to store audio file URLs and custom history | |
audio_urls_state = gr.State([]) | |
custom_history_state = gr.State([]) | |
with gr.Row(): | |
with gr.Column(scale=4): | |
chatbot = gr.Chatbot( | |
type="messages", | |
height=500, | |
show_copy_button=True, | |
show_share_button=False | |
) | |
msg = gr.Textbox( | |
label="Describe what you want to do?", | |
placeholder="e.g., 'Remove filler words and improve audio quality''", | |
lines=3, | |
submit_btn=True | |
) | |
with gr.Column(scale=1): | |
# Model Configuration | |
with gr.Group(): | |
model_name = gr.Dropdown( | |
choices=["gpt-4.1", "gpt-4.1-mini", "gpt-4o", "o3"], | |
value="gpt-4.1", | |
label="Model", | |
info="Select the model to use" | |
) | |
temperature = gr.Slider( | |
minimum=0.0, | |
maximum=1.0, | |
value=0.3, | |
step=0.1, | |
label="Temperature", | |
info="Higher values make output more random" | |
) | |
api_key = gr.Textbox( | |
label="OpenAI API Key", | |
placeholder="sk-...", | |
type="password", | |
info="Your OpenAI API key" | |
) | |
# Set temperature to 1.0 when o3 model is selected | |
def update_temperature(model): | |
if model == "o3": | |
return gr.update(value=1.0, interactive=False) | |
return gr.update(interactive=True) | |
model_name.change( | |
update_temperature, | |
inputs=[model_name], | |
outputs=[temperature] | |
) | |
with gr.Group(): | |
audio_files = gr.File( | |
file_count="multiple", | |
file_types=["audio"], | |
label="Upload Audio Files to Process", | |
height=150 | |
) | |
output_audio_files = gr.File( | |
file_count="multiple", | |
file_types=["audio"], | |
label="Download Generated Audio", | |
height=150, | |
interactive=False, | |
visible=False # Start hidden | |
) | |
# Handle user input and bot response | |
def handle_submit(message, files, history, custom_history, model, temp, key): | |
new_msg, new_files, updated_history, updated_custom_history = user_input( | |
message, files, history, custom_history, model, temp, key | |
) | |
return new_msg, new_files, updated_history, updated_custom_history | |
def handle_bot_response(history, audio_urls, custom_history): | |
updated_history, output_files = bot_response_sync(history, audio_urls, custom_history) | |
output_visible = bool(output_files) # True if there are files, else False | |
return updated_history, gr.update(value=output_files, visible=output_visible), custom_history | |
msg.submit( | |
handle_submit, | |
[msg, audio_files, chatbot, custom_history_state, model_name, temperature, api_key], | |
[msg, audio_files, chatbot, custom_history_state], | |
queue=False | |
).then( | |
handle_bot_response, | |
[chatbot, audio_urls_state, custom_history_state], | |
[chatbot, output_audio_files, custom_history_state] | |
) | |
gr.Markdown(""" | |
--- | |
""") | |
with gr.Row(): | |
gr.Markdown(""" | |
## 🎚️ What I Can Do For You | |
**Audio Manipulation:** | |
- Merge multiple audio files into one continuous track | |
- Cut or trim specific sections from any file | |
- Adjust volume levels (increase or decrease) | |
- Normalize audio levels for consistency | |
- Apply fade-in or fade-out effects for smooth transitions (Mono channel only) | |
- Change playback speed (faster or slower, with pitch change) | |
- Reverse audio for creative effects | |
- Remove silence from beginning or end of files | |
**Analysis & Transcription:** (English only) | |
- Transcribe speech in audio to text | |
- Analyze audio properties (duration, sample rate, etc.) | |
""") | |
gr.Markdown(""" | |
## 💡 Example Requests | |
- *"Merge these two audio files and add a fade-in effect"* | |
- *"Remove the silence at the beginning of this recording"* | |
- *"Transcribe the speech in this audio file"* | |
- *"Increase the volume of the first track and normalize both files"* | |
- *"Cut out the middle section from 1:30 to 2:45"* | |
- *"Make this audio play 1.5x faster"* | |
- *"Apply a fade-out effect to the end of this track"* | |
""") | |
return interface | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch() | |