Spaces:
Running
Running
import os | |
import secrets | |
import threading | |
import gradio as gr | |
from AudioFusion import Fusion | |
from telethon.sync import TelegramClient, events, Button | |
API_ID = os.environ.get("API_ID") | |
API_HASH = os.environ.get("API_HASH") | |
BOT_TOKEN = os.environ.get("BOT_TOKEN") | |
def process_audio(input_file, | |
effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier, | |
effect_slowed, speed_multiplier, | |
effect_reverb, room_size, damping, width, wet_level, dry_level | |
): | |
# Load the sound file | |
sound = Fusion.loadSound(input_file) | |
os.remove(os.path.abspath(input_file)) | |
effects_str = [] | |
# Apply effects based on user choices | |
if effect_8d: | |
sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier) | |
effects_str.append("8d") | |
if effect_slowed: | |
sound = Fusion.effectSlowed(sound, speed_multiplier) | |
effects_str.append("Slowed") | |
if effect_reverb: | |
sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5))) | |
effects_str.append("Reverb") | |
output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}" | |
# Save the processed sound and return the output file | |
output = Fusion.saveSound(sound, output_file) | |
return output | |
before_text = """<div align="center"> | |
<h1>AudioFusion</h1> | |
<i>Add a touch of uniqueness with various customizable effects like slowed and reverb.</i> | |
</div> | |
<hr>""" | |
after_text = """<hr> | |
PR in [github](https://github.com/MineisZarox/AudioFusion) repository beta branch are always welcome. | |
<h3>Todo</h3> | |
\# Acapella Extractor<br> | |
\# Karoke Maker<br> | |
\# Bass Booster<br> | |
\# Volume Booster<br> | |
<h3>Inspiration & Credits</h3> | |
- Special thanks to [Jiaaro](https://github.com/jiaaro) for pydub. AudioFusion is mainly wrapped around pydub | |
- My Soundscapes of Serenity - [Because](https://t.me/bcuzwhynot) | |
""" | |
with gr.Blocks(title="Audio Fusion") as iface: | |
gr.Markdown(before_text) | |
input_audio = gr.Audio(label="Upload your music file", type="filepath") | |
# SLowed Effect and its arguments | |
with gr.Tab("Slowed Effect"): | |
speed_check = gr.Checkbox(label="Apply slowed effect") | |
with gr.Column(visible=False) as se_col: | |
speed = gr.Slider(label="Speed Multiplier", minimum=0.1, maximum=4, step=0.05, value=0.90) | |
# Reverb Effect and its argument | |
with gr.Tab("Reverb Effect"): | |
reverb_check = gr.Checkbox(label="Apply reverb effect") | |
with gr.Column(visible=False) as re_col: | |
with gr.Row(): | |
room = gr.Slider(label="Room Size", minimum=0, maximum=1, step=0.01, value=0.8) | |
damp = gr.Slider(label="Damping", minimum=0, maximum=1, step=0.05, value=1) | |
width = gr.Slider(label="Width", minimum=0, maximum=1, step=0.05, value=0.5) | |
with gr.Row(): | |
wet = gr.Slider(label="Wet Level", minimum=0, maximum=1, step=0.05, value=0.3) | |
dry = gr.Slider(label="Dry Level", minimum=0, maximum=1, step=0.05, value=0.8) | |
# 8d Effect and its arguments | |
with gr.Tab("8d Effect"): | |
dimension_check = gr.Checkbox(label="Apply 8D effect") | |
with gr.Column(visible=False) as di_col: | |
with gr.Row(): | |
pan = gr.Slider(label="Pan Boundary", minimum=0, maximum=100, value=90) | |
jump = gr.Slider(label="Jump Percentage", minimum=1, maximum=100, value=5) | |
with gr.Row(): | |
time = gr.Slider(label="Time L to R (s)", minimum=1, maximum=30, value=10) | |
volx = gr.Slider(label="Volume Multiplier", minimum=1, maximum=20, value=6) | |
# ===================================================== | |
def di_v(check): | |
if check: | |
return {di_col: gr.Column(visible=True)} | |
else: | |
return {di_col: gr.Column(visible=False)} | |
def se_v(check): | |
if check: | |
return {se_col: gr.Column(visible=True)} | |
else: | |
return {se_col: gr.Column(visible=False)} | |
def re_v(check): | |
if check: | |
return {re_col: gr.Column(visible=True)} | |
else: | |
return {re_col: gr.Column(visible=False)} | |
dimension_check.change(di_v, inputs=[dimension_check], outputs=[di_col]) | |
speed_check.change(se_v, inputs=[speed_check], outputs=[se_col]) | |
reverb_check.change(re_v, inputs=[reverb_check], outputs=[re_col]) | |
# ===================================================== | |
with gr.Row(): | |
btnClear = gr.ClearButton(components=[dimension_check, speed_check, reverb_check]) | |
btnRun = gr.Button("Run", size="sm", variant="primary") | |
inputs = [input_audio, dimension_check, pan, jump, time, volx, speed_check, speed, reverb_check, room, damp, width, wet, dry] | |
output = [gr.Audio(label="Download processed music", type="filepath")] | |
gr.Markdown(after_text) | |
btnClear.add(components=output) | |
btnRun.click(fn=process_audio, inputs=inputs, outputs=output, api_name="AudioFusion") | |
client = TelegramClient('session_name', API_ID, API_HASH) | |
# Define the available commands | |
commands = { | |
'/start': 'Welcome to Audio Bot! Send me an audio file to get started.', | |
'/help': 'List all available commands.', | |
'/trim': 'Trim the audio. Usage: /trim start_time end_time', | |
'/volume': 'Adjust volume. Usage: /volume level', | |
'/reverse': 'Reverse the audio.', | |
'/reverb': 'Apply reverb effect to the audio. Usage: /reverb roomSize damping width wetLevel dryLevel', | |
'/speedup': 'Increase the speed of the audio.', | |
'/slowdown': 'Decrease the speed of the audio.', | |
'/effect8D': 'Apply 8D effect to the audio. Usage: /effect8D panBoundary jumpPercentage timeLtoR volumeMultiplier', | |
'/save': 'Save the modified audio. Usage: /save outputFileName', | |
} | |
async def start_handler(event): | |
await event.respond(commands['/start']) | |
async def help_handler(event): | |
help_text = '\n'.join(f'{cmd}: {desc}' for cmd, desc in commands.items()) | |
await event.respond(help_text) | |
async def audio_handler(event): | |
reply = await event.get_reply_message() | |
if event.message.media and event.message.media.document.mime_type.startswith('audio') and event.is_private: | |
msg = await event.reply("`Downloading...`") | |
# Download the audio file | |
file_path = await client.download_media(event.message, f"downloads/{event.sender.id}_audio.mp3") | |
return await msg.edit("`Downloaded successfully...`") | |
elif event.is_group and event.text.startswith("/edit") and reply: | |
if reply.media and reply.message.media.document.mime_type.startswith('audio'): | |
msg = await event.reply("`Downloading...`") | |
file_path = await reply.download_media(f"downloads/{event.sender.id}_audio.mp3") | |
# try: | |
# # Load the audio file using your AudioFusion class | |
# audio = Fusion.loadSound(file_path) | |
# except Fusion.InvalidMusicFileError as e: | |
# await event.respond(f'Error: {e}') | |
# return os.remove(file_path) | |
return await msg.edit("`Downloaded successfully...`") | |
else: | |
return await event.respond("`Please send an audio file...`") | |
else: | |
if event.is_private: return await event.respond("`Please send an audio file...`") | |
async def audio_editor(event): | |
file_path = f"downloads/{event.sender.id}_audio.mp3" | |
if not os.path.exists(file_path): return | |
try: | |
# Load the audio file using your AudioFusion class | |
audio = Fusion.loadSound(file_path) | |
except Fusion.InvalidMusicFileError as e: | |
await event.reply(f'Error: {e}') | |
return os.remove(file_path) | |
# Process the user's command | |
if event.raw_text.startswith('/trim'): | |
# Parse parameters and apply the trim effect | |
# Example usage: /trim 5000 10000 | |
parameters = event.raw_text.split()[1:] | |
start_time, end_time = map(int, parameters) | |
audio = audio[start_time:end_time] | |
await event.reply("`Following effect added to your audio file...`") | |
elif event.raw_text.startswith('/volume'): | |
# Parse parameters and adjust volume | |
# Example usage: /volume 3 | |
parameters = event.raw_text.split()[1:] | |
volume_level = int(parameters[0]) | |
audio = audio + volume_level # Adjust volume using Pydub's volume adjustment | |
await event.reply("`Following effect added to your audio file...`") | |
elif event.raw_text.startswith('/reverse'): | |
# Reverse the audio | |
audio = audio.reverse() | |
await event.reply("`Following effect added to your audio file...`") | |
elif event.raw_text.startswith('/speedup'): | |
# Increase the speed of the audio | |
audio = Fusion.effectSlowed(audio, speedMultiplier=0.8) | |
await event.reply("`Following effect added to your audio file...`") | |
elif event.raw_text.startswith('/slowdown'): | |
# Decrease the speed of the audio | |
audio = Fusion.effectSlowed(audio, speedMultiplier=1.2) | |
await event.reply("`Following effect added to your audio file...`") | |
elif event.raw_text.startswith('/effect8D'): | |
# Parse parameters and apply the effect | |
# Example usage: /effect8D 100 5 10000 6 | |
parameters = event.raw_text.split()[1:] | |
audio = Fusion.effect8D(audio, *map(int, parameters)) | |
await event.reply("`Following effect added to your audio file...`") | |
elif event.raw_text.startswith('/effectReverb'): | |
# Parse parameters and apply the effect | |
# Example usage: /effectReverb 0.8 1 0.5 0.3 0.8 | |
parameters = event.raw_text.split()[1:] | |
audio = Fusion.effectReverb(audio, *map(float, parameters)) | |
await event.reply("`Following effect added to your audio file...`") | |
elif event.raw_text.startswith('/save'): | |
# Parse parameters and save the modified audio | |
# Example usage: /save outputFileName | |
parameters = event.raw_text.split()[1:] | |
output_file_name = parameters[0] | |
output_file_path = Fusion.saveSound(audio, output_file_name) | |
# Send the modified audio file to the user | |
await client.send_file(event.chat_id, output_file_path, caption=f'Modified audio: {output_file_name}.mp3') | |
# Clean up the temporary files | |
os.remove(output_file_path) | |
elif event.raw_text.startswith('/delete'): | |
os.remove(file_path) | |
async def initiation(): | |
await client.send_message(-1001662130485, "**Hugging is Running.**", buttons=[(Button.url("Execal", "https://t.me/execal"),)],) | |
if __name__ == '__main__': | |
client.start(bot_token=BOT_TOKEN) | |
client.loop.run_until_complete(initiation()) | |
threading.Thread(target=iface.launch).start() #(share=False) | |
print("Bot started succefully") | |
client.run_until_disconnected() | |