AudioFusion / app.py
zarox's picture
UPDATE
ebca844
raw
history blame
9.7 kB
import os
import secrets
import threading
import gradio as gr
from io import BytesIO
from AudioFusion import Fusion
from telethon.sync import TelegramClient, events, Button
API_ID = os.environ.get("API_ID")
API_HASH = os.environ.get("API_HASH")
BOT_TOKEN = os.environ.get("BOT_TOKEN")
def process_audio(input_file,
effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier,
effect_slowed, speed_multiplier,
effect_reverb, room_size, damping, width, wet_level, dry_level
):
# Load the sound file
sound = Fusion.loadSound(input_file)
os.remove(os.path.abspath(input_file))
effects_str = []
# Apply effects based on user choices
if effect_8d:
sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier)
effects_str.append("8d")
if effect_slowed:
sound = Fusion.effectSlowed(sound, speed_multiplier)
effects_str.append("Slowed")
if effect_reverb:
sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5)))
effects_str.append("Reverb")
output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}"
# Save the processed sound and return the output file
output = Fusion.saveSound(sound, output_file)
return output
before_text = """<div align="center">
<h1>AudioFusion</h1>
<i>Add a touch of uniqueness with various customizable effects like slowed and reverb.</i>
</div>
<hr>"""
after_text = """<hr>
PR in [github](https://github.com/MineisZarox/AudioFusion) repository beta branch are always welcome.
<h3>Todo</h3>
\# Acapella Extractor<br>
\# Karoke Maker<br>
\# Bass Booster<br>
\# Volume Booster<br>
<h3>Inspiration & Credits</h3>
- Special thanks to [Jiaaro](https://github.com/jiaaro) for pydub. AudioFusion is mainly wrapped around pydub
- My Soundscapes of Serenity - [Because](https://t.me/bcuzwhynot)
"""
with gr.Blocks(title="Audio Fusion") as iface:
gr.Markdown(before_text)
input_audio = gr.Audio(label="Upload your music file", type="filepath")
# SLowed Effect and its arguments
with gr.Tab("Slowed Effect"):
speed_check = gr.Checkbox(label="Apply slowed effect")
with gr.Column(visible=False) as se_col:
speed = gr.Slider(label="Speed Multiplier", minimum=0.1, maximum=4, step=0.05, value=0.90)
# Reverb Effect and its argument
with gr.Tab("Reverb Effect"):
reverb_check = gr.Checkbox(label="Apply reverb effect")
with gr.Column(visible=False) as re_col:
with gr.Row():
room = gr.Slider(label="Room Size", minimum=0, maximum=1, step=0.01, value=0.8)
damp = gr.Slider(label="Damping", minimum=0, maximum=1, step=0.05, value=1)
width = gr.Slider(label="Width", minimum=0, maximum=1, step=0.05, value=0.5)
with gr.Row():
wet = gr.Slider(label="Wet Level", minimum=0, maximum=1, step=0.05, value=0.3)
dry = gr.Slider(label="Dry Level", minimum=0, maximum=1, step=0.05, value=0.8)
# 8d Effect and its arguments
with gr.Tab("8d Effect"):
dimension_check = gr.Checkbox(label="Apply 8D effect")
with gr.Column(visible=False) as di_col:
with gr.Row():
pan = gr.Slider(label="Pan Boundary", minimum=0, maximum=100, value=90)
jump = gr.Slider(label="Jump Percentage", minimum=1, maximum=100, value=5)
with gr.Row():
time = gr.Slider(label="Time L to R (s)", minimum=1, maximum=30, value=10)
volx = gr.Slider(label="Volume Multiplier", minimum=1, maximum=20, value=6)
# =====================================================
def di_v(check):
if check:
return {di_col: gr.Column(visible=True)}
else:
return {di_col: gr.Column(visible=False)}
def se_v(check):
if check:
return {se_col: gr.Column(visible=True)}
else:
return {se_col: gr.Column(visible=False)}
def re_v(check):
if check:
return {re_col: gr.Column(visible=True)}
else:
return {re_col: gr.Column(visible=False)}
dimension_check.change(di_v, inputs=[dimension_check], outputs=[di_col])
speed_check.change(se_v, inputs=[speed_check], outputs=[se_col])
reverb_check.change(re_v, inputs=[reverb_check], outputs=[re_col])
# =====================================================
with gr.Row():
btnClear = gr.ClearButton(components=[dimension_check, speed_check, reverb_check])
btnRun = gr.Button("Run", size="sm", variant="primary")
inputs = [input_audio, dimension_check, pan, jump, time, volx, speed_check, speed, reverb_check, room, damp, width, wet, dry]
output = [gr.Audio(label="Download processed music", type="filepath")]
gr.Markdown(after_text)
btnClear.add(components=output)
btnRun.click(fn=process_audio, inputs=inputs, outputs=output, api_name="AudioFusion")
client = TelegramClient('session_name', API_ID, API_HASH)
# Define the states for user interaction
states = {}
@client.on(events.NewMessage(pattern='/start'))
async def start_handler(event):
await event.reply("Welcome to AudioFusion Bot! Send me an audio file, and I'll apply effects for you.")
buttons = [
[Button.inline('Slowed', b'slowed'), Button.inline('8D', b'8d')],
[Button.inline('Reverb', b'reverb'), Button.inline('Reverse', b'reverse')],
[Button.inline('Volume', b'volume'), Button.inline('Speedup', b'speedup')],
[Button.inline('Preview', b'preview')],
[Button.inline('Send', b'send')],
]
@client.on(events.NewMessage(pattern='/buttons'))
async def buttons_handler(event):
user_id = event.sender_id
# Download the audio file and store it in the user's state
reply_message = await event.get_reply_message()
if not reply_message or not reply_message.file:
await event.reply("Please reply to an audio file.")
return
audio_file = BytesIO()
await event.client.download_media(reply_message, audio_file)
audio_file.seek(0)
# Store the audio file in the user's state
states[user_id] = audio_file
await client.send_file(event.chat_id, file="image.jpg", caption="Preview the current modification:", buttons=buttons)
@client.on(events.CallbackQuery(pattern=b'(slowed|8d|reverb|reverse|trim|volume|speedup)'))
async def audio_effect_handler(event):
user_id = event.sender_id
if user_id not in states or not states[user_id]:
await event.answer("No audio file found. Please use /buttons command to upload an audio file.")
return
# Retrieve the audio file from the user's state
audio_file = states[user_id]
query = event.pattern_match.group(1).decode("UTF-8")
try:
sound = Fusion.from_file(audio_file, format="mp3")
if query == 'slowed':
modified_sound = Fusion.effectSlowed(sound, 0.82)
elif query == 'speedup':
modified_sound = Fusion.effectSlowed(sound, 1.2)
elif query == '8d':
modified_sound = Fusion.effect8D(sound)
elif query == 'reverb':
modified_sound = Fusion.effectReverb(sound)
elif query == 'reverse':
modified_sound = sound.reverse()
else:
return await event.answer("INvalid for now...")
# Update the user's state with the modified sound
states[user_id] = modified_sound
await event.answer("Effect applied. Click /send to receive the modified audio file.")
except Fusion.InvalidMusicFileError as e:
await event.reply(str(e))
except Exception as e:
await event.reply(f"An error occurred: {str(e)}")
@client.on(events.CallbackQuery(pattern=b'preview'))
async def preview_handler(event):
user_id = event.sender_id
if user_id in states and states[user_id]:
# Send the current modification for preview
output_file_name = f"{user_id}_preview"
output_file = Fusion.saveSound(states[user_id], output_file_name)
await event.edit("`Uploading...`", buttons=buttons)
# Edit the message and send the audio file in the edited message
await event.edit(file=output_file, text="`Preview the current modification:`", buttons=buttons)
# Clean up - remove the saved preview audio file
os.remove(output_file)
else:
await event.answer("No modified audio file found. Please apply an effect first.")
@client.on(events.CallbackQuery(pattern=b'send'))
async def send_handler(event):
user_id = event.sender_id
if user_id in states and states[user_id]:
# Send the modified sound file
output_file_name = f"{user_id}_modified_audio"
output_file = Fusion.saveSound(states[user_id], output_file_name)
await event.reply(file=output_file)
# Clean up - remove the user's state and the saved audio file
del states[user_id]
os.remove(output_file)
await event.delete()
else:
await event.answer("No modified audio file found. Please apply an effect first.")
async def initiation():
await client.send_message(-1001662130485, "**Hugging is Running.**", buttons=[(Button.url("Execal", "https://t.me/execal"),)],)
if __name__ == '__main__':
client.start(bot_token=BOT_TOKEN)
client.loop.run_until_complete(initiation())
threading.Thread(target=iface.launch).start() #(share=False)
print("Bot started succefully")
client.run_until_disconnected()