AudioFusion / app.py
zarox's picture
UPDATE
ffe661e
raw
history blame
11.2 kB
import os
import secrets
import threading
import gradio as gr
from AudioFusion import Fusion
from telethon.sync import TelegramClient, events, Button
API_ID = os.environ.get("API_ID")
API_HASH = os.environ.get("API_HASH")
BOT_TOKEN = os.environ.get("BOT_TOKEN")
def process_audio(input_file,
effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier,
effect_slowed, speed_multiplier,
effect_reverb, room_size, damping, width, wet_level, dry_level
):
# Load the sound file
sound = Fusion.loadSound(input_file)
os.remove(os.path.abspath(input_file))
effects_str = []
# Apply effects based on user choices
if effect_8d:
sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier)
effects_str.append("8d")
if effect_slowed:
sound = Fusion.effectSlowed(sound, speed_multiplier)
effects_str.append("Slowed")
if effect_reverb:
sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5)))
effects_str.append("Reverb")
output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}"
# Save the processed sound and return the output file
output = Fusion.saveSound(sound, output_file)
return output
before_text = """<div align="center">
<h1>AudioFusion</h1>
<i>Add a touch of uniqueness with various customizable effects like slowed and reverb.</i>
</div>
<hr>"""
after_text = """<hr>
PR in [github](https://github.com/MineisZarox/AudioFusion) repository beta branch are always welcome.
<h3>Todo</h3>
\# Acapella Extractor<br>
\# Karoke Maker<br>
\# Bass Booster<br>
\# Volume Booster<br>
<h3>Inspiration & Credits</h3>
- Special thanks to [Jiaaro](https://github.com/jiaaro) for pydub. AudioFusion is mainly wrapped around pydub
- My Soundscapes of Serenity - [Because](https://t.me/bcuzwhynot)
"""
with gr.Blocks(title="Audio Fusion") as iface:
gr.Markdown(before_text)
input_audio = gr.Audio(label="Upload your music file", type="filepath")
# SLowed Effect and its arguments
with gr.Tab("Slowed Effect"):
speed_check = gr.Checkbox(label="Apply slowed effect")
with gr.Column(visible=False) as se_col:
speed = gr.Slider(label="Speed Multiplier", minimum=0.1, maximum=4, step=0.05, value=0.90)
# Reverb Effect and its argument
with gr.Tab("Reverb Effect"):
reverb_check = gr.Checkbox(label="Apply reverb effect")
with gr.Column(visible=False) as re_col:
with gr.Row():
room = gr.Slider(label="Room Size", minimum=0, maximum=1, step=0.01, value=0.8)
damp = gr.Slider(label="Damping", minimum=0, maximum=1, step=0.05, value=1)
width = gr.Slider(label="Width", minimum=0, maximum=1, step=0.05, value=0.5)
with gr.Row():
wet = gr.Slider(label="Wet Level", minimum=0, maximum=1, step=0.05, value=0.3)
dry = gr.Slider(label="Dry Level", minimum=0, maximum=1, step=0.05, value=0.8)
# 8d Effect and its arguments
with gr.Tab("8d Effect"):
dimension_check = gr.Checkbox(label="Apply 8D effect")
with gr.Column(visible=False) as di_col:
with gr.Row():
pan = gr.Slider(label="Pan Boundary", minimum=0, maximum=100, value=90)
jump = gr.Slider(label="Jump Percentage", minimum=1, maximum=100, value=5)
with gr.Row():
time = gr.Slider(label="Time L to R (s)", minimum=1, maximum=30, value=10)
volx = gr.Slider(label="Volume Multiplier", minimum=1, maximum=20, value=6)
# =====================================================
def di_v(check):
if check:
return {di_col: gr.Column(visible=True)}
else:
return {di_col: gr.Column(visible=False)}
def se_v(check):
if check:
return {se_col: gr.Column(visible=True)}
else:
return {se_col: gr.Column(visible=False)}
def re_v(check):
if check:
return {re_col: gr.Column(visible=True)}
else:
return {re_col: gr.Column(visible=False)}
dimension_check.change(di_v, inputs=[dimension_check], outputs=[di_col])
speed_check.change(se_v, inputs=[speed_check], outputs=[se_col])
reverb_check.change(re_v, inputs=[reverb_check], outputs=[re_col])
# =====================================================
with gr.Row():
btnClear = gr.ClearButton(components=[dimension_check, speed_check, reverb_check])
btnRun = gr.Button("Run", size="sm", variant="primary")
inputs = [input_audio, dimension_check, pan, jump, time, volx, speed_check, speed, reverb_check, room, damp, width, wet, dry]
output = [gr.Audio(label="Download processed music", type="filepath")]
gr.Markdown(after_text)
btnClear.add(components=output)
btnRun.click(fn=process_audio, inputs=inputs, outputs=output, api_name="AudioFusion")
client = TelegramClient('session_name', API_ID, API_HASH)
# Define the available commands
commands = {
'/start': 'Welcome to Audio Bot! Send me an audio file to get started.',
'/help': 'List all available commands.',
'/trim': 'Trim the audio. Usage: /trim start_time end_time',
'/volume': 'Adjust volume. Usage: /volume level',
'/reverse': 'Reverse the audio.',
'/reverb': 'Apply reverb effect to the audio. Usage: /reverb roomSize damping width wetLevel dryLevel',
'/speedup': 'Increase the speed of the audio.',
'/slowdown': 'Decrease the speed of the audio.',
'/effect8D': 'Apply 8D effect to the audio. Usage: /effect8D panBoundary jumpPercentage timeLtoR volumeMultiplier',
'/save': 'Save the modified audio. Usage: /save outputFileName',
}
@client.on(events.NewMessage(pattern='/start'))
async def start_handler(event):
await event.respond(commands['/start'])
@client.on(events.NewMessage(pattern='/help'))
async def help_handler(event):
help_text = '\n'.join(f'{cmd}: {desc}' for cmd, desc in commands.items())
await event.respond(help_text)
@client.on(events.NewMessage(incoming=True))
async def audio_handler(event):
reply = await event.get_reply_message()
if event.message.media and event.message.media.document.mime_type.startswith('audio') and event.is_private:
msg = await event.reply("`Downloading...`")
# Download the audio file
file_path = await client.download_media(event.message, f"downloads/{event.sender.id}_audio.mp3")
return await msg.edit("`Downloaded successfully...`")
elif event.is_group and event.text.startswith("/edit") and reply:
if reply.media and reply.message.media.document.mime_type.startswith('audio'):
msg = await event.reply("`Downloading...`")
file_path = await reply.download_media(f"downloads/{event.sender.id}_audio.mp3")
# try:
# # Load the audio file using your AudioFusion class
# audio = Fusion.loadSound(file_path)
# except Fusion.InvalidMusicFileError as e:
# await event.respond(f'Error: {e}')
# return os.remove(file_path)
return await msg.edit("`Downloaded successfully...`")
else:
return await event.respond("`Please send an audio file...`")
else:
if event.is_private: return await event.respond("`Please send an audio file...`")
@client.on(events.NewMessage(incoming=True))
async def audio_editor(event):
file_path = f"downloads/{event.sender.id}_audio.mp3"
if not os.path.exists(file_path): return
try:
# Load the audio file using your AudioFusion class
audio = Fusion.loadSound(file_path)
except Fusion.InvalidMusicFileError as e:
await event.reply(f'Error: {e}')
return os.remove(file_path)
# Process the user's command
if event.raw_text.startswith('/trim'):
# Parse parameters and apply the trim effect
# Example usage: /trim 5000 10000
parameters = event.raw_text.split()[1:]
start_time, end_time = map(int, parameters)
audio = audio[start_time:end_time]
await event.reply("`Following effect added to your audio file...`")
elif event.raw_text.startswith('/volume'):
# Parse parameters and adjust volume
# Example usage: /volume 3
parameters = event.raw_text.split()[1:]
volume_level = int(parameters[0])
audio = audio + volume_level # Adjust volume using Pydub's volume adjustment
await event.reply("`Following effect added to your audio file...`")
elif event.raw_text.startswith('/reverse'):
# Reverse the audio
audio = audio.reverse()
await event.reply("`Following effect added to your audio file...`")
elif event.raw_text.startswith('/speedup'):
# Increase the speed of the audio
audio = Fusion.effectSlowed(audio, speedMultiplier=0.8)
await event.reply("`Following effect added to your audio file...`")
elif event.raw_text.startswith('/slowdown'):
# Decrease the speed of the audio
audio = Fusion.effectSlowed(audio, speedMultiplier=1.2)
await event.reply("`Following effect added to your audio file...`")
elif event.raw_text.startswith('/effect8D'):
# Parse parameters and apply the effect
# Example usage: /effect8D 100 5 10000 6
parameters = event.raw_text.split()[1:]
audio = Fusion.effect8D(audio, *map(int, parameters))
await event.reply("`Following effect added to your audio file...`")
elif event.raw_text.startswith('/effectReverb'):
# Parse parameters and apply the effect
# Example usage: /effectReverb 0.8 1 0.5 0.3 0.8
parameters = event.raw_text.split()[1:]
audio = Fusion.effectReverb(audio, *map(float, parameters))
await event.reply("`Following effect added to your audio file...`")
elif event.raw_text.startswith('/save'):
# Parse parameters and save the modified audio
# Example usage: /save outputFileName
parameters = event.raw_text.split()[1:]
output_file_name = parameters[0]
output_file_path = Fusion.saveSound(audio, output_file_name)
# Send the modified audio file to the user
await client.send_file(event.chat_id, output_file_path, caption=f'Modified audio: {output_file_name}.mp3')
# Clean up the temporary files
os.remove(output_file_path)
elif event.raw_text.startswith('/delete'):
os.remove(file_path)
async def initiation():
await client.send_message(-1001662130485, "**Hugging is Running.**", buttons=[(Button.url("Execal", "https://t.me/execal"),)],)
if __name__ == '__main__':
client.start(bot_token=BOT_TOKEN)
client.loop.run_until_complete(initiation())
threading.Thread(target=iface.launch).start() #(share=False)
print("Bot started succefully")
client.run_until_disconnected()