Spaces:
Running
Running
import os | |
import secrets | |
import threading | |
import gradio as gr | |
from io import BytesIO | |
from fusion import Fusion | |
from telethon import TelegramClient, events, Button, types | |
API_ID = os.environ.get("API_ID") | |
API_HASH = os.environ.get("API_HASH") | |
BOT_TOKEN = os.environ.get("BOT_TOKEN") | |
client = TelegramClient('session_name', API_ID, API_HASH) | |
states = {} | |
async def start_handler(event): | |
await event.reply("Welcome to AudioFusion Bot! Send me an audio file, and I'll apply effects for you.") | |
buttons = [ | |
[Button.inline('Slowed', b'slowed'), Button.inline('8D', b'8d')], | |
[Button.inline('Reverb', b'reverb'), Button.inline('Reverse', b'reverse')], | |
[Button.inline('Volume', b'volume'), Button.inline('Speedup', b'speedup')], | |
[Button.inline('Preview', b'preview')], | |
[Button.inline('Send', b'send')], | |
] | |
async def buttons_handler(event): | |
user_id = event.sender_id | |
# Download the audio file and store it in the user's state | |
reply_message = await event.get_reply_message() | |
if not reply_message or not reply_message.file: | |
await event.reply("Please reply to an audio file.") | |
return | |
audio_file = BytesIO() | |
await event.client.download_media(reply_message, audio_file) | |
audio_file.seek(0) | |
# Store the audio file in the user's state | |
states[user_id] = {'audio': audio_file} | |
await client.send_file(event.chat_id, file="image.jpg", caption="Preview the current modification:", buttons=buttons) | |
async def audio_effect_handler(event): | |
user_id = event.sender_id | |
if user_id not in states or not states[user_id]: | |
await event.answer("No audio file found. Please use /buttons command to upload an audio file.") | |
return | |
# Retrieve the audio file from the user's state | |
audio_file = states[user_id]['audio'] | |
query = event.pattern_match.group(1).decode("UTF-8") | |
sound = Fusion.from_file(audio_file, format="mp3") | |
if query == 'slowed': | |
modified_sound = await Fusion.effectSlowed(sound) | |
elif query == 'speedup': | |
modified_sound = await Fusion.effectSlowed(sound, 1.1) | |
elif query == '8d': | |
modified_sound = await Fusion.effect8D(sound) | |
elif query == 'reverb': | |
modified_sound = await Fusion.effectReverb(sound) | |
elif query == 'reverse': | |
modified_sound = sound.reverse() | |
else: | |
return await event.answer("INvalid for now...") | |
audio_file = BytesIO() | |
audio = modified_sound.export(audio_file, format="mp3") | |
audio.seek(0) | |
# Update the user's state with the modified sound | |
states[user_id]['audio'] = audio | |
await event.answer("Effect applied. Click /send to receive the modified audio file.", alert=True) | |
async def preview_handler(event): | |
user_id = event.sender_id | |
if user_id in states and states[user_id]: | |
# Send the current modification for preview | |
output_file_name = f"{user_id}_preview" | |
output_file = await Fusion.saveSound(states[user_id]['audio'], output_file_name) | |
await event.edit("`Uploading...`", buttons=buttons) | |
# Edit the message and send the audio file in the edited message | |
await event.edit(file=output_file, text="`Preview the current modification:`", buttons=buttons) | |
# Clean up - remove the saved preview audio file | |
os.remove(output_file) | |
else: | |
await event.answer("No modified audio file found. Please apply an effect first.", alert=True) | |
async def send_handler(event): | |
user_id = event.sender_id | |
if user_id in states and states[user_id]: | |
# Send the modified sound file | |
# output_file_name = f"{user_id}_modified_audio" | |
# output_file = await Fusion.saveSound(states[user_id]["audio"], output_file_name) | |
# await event.reply(file=output_file) | |
await client.send_file(event.chat_id, file=states[user_id]["audio"], attributes=types.DocumentAttributeFilename(file_name="Audio.mp3")) | |
# Clean up - remove the user's state and the saved audio file | |
del states[user_id] | |
# os.remove(output_file) | |
await event.delete() | |
else: | |
await event.answer("No modified audio file found. Please apply an effect first.") | |
def process_audio(input_file, | |
effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier, | |
effect_slowed, speed_multiplier, | |
effect_reverb, room_size, damping, width, wet_level, dry_level | |
): | |
# Load the sound file | |
sound = Fusion.loadSound(input_file) | |
os.remove(os.path.abspath(input_file)) | |
effects_str = [] | |
# Apply effects based on user choices | |
if effect_8d: | |
sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier) | |
effects_str.append("8d") | |
if effect_slowed: | |
sound = Fusion.effectSlowed(sound, speed_multiplier) | |
effects_str.append("Slowed") | |
if effect_reverb: | |
sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5))) | |
effects_str.append("Reverb") | |
output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}" | |
# Save the processed sound and return the output file | |
return Fusion.saveSound(sound, output_file) | |
before_text = """<div align="center"> | |
<h1>AudioFusion</h1> | |
<i>Add a touch of uniqueness with various customizable effects like slowed and reverb.</i> | |
</div> | |
<hr>""" | |
after_text = """<hr> | |
PR in [github](https://github.com/MineisZarox/AudioFusion) repository beta branch are always welcome. | |
<h3>Todo</h3> | |
\# Acapella Extractor<br> | |
\# Karoke Maker<br> | |
\# Bass Booster<br> | |
\# Volume Booster<br> | |
<h3>Inspiration & Credits</h3> | |
- Special thanks to [Jiaaro](https://github.com/jiaaro) for pydub. AudioFusion is mainly wrapped around pydub | |
- My Soundscapes of Serenity - [Because](https://t.me/bcuzwhynot) | |
""" | |
with gr.Blocks(title="Audio Fusion") as iface: | |
gr.Markdown(before_text) | |
input_audio = gr.Audio(label="Upload your music file", type="filepath") | |
# SLowed Effect and its arguments | |
with gr.Tab("Slowed Effect"): | |
speed_check = gr.Checkbox(label="Apply slowed effect") | |
with gr.Column(visible=False) as se_col: | |
speed = gr.Slider(label="Speed Multiplier", minimum=0.1, maximum=4, step=0.05, value=0.90) | |
# Reverb Effect and its argument | |
with gr.Tab("Reverb Effect"): | |
reverb_check = gr.Checkbox(label="Apply reverb effect") | |
with gr.Column(visible=False) as re_col: | |
with gr.Row(): | |
room = gr.Slider(label="Room Size", minimum=0, maximum=1, step=0.01, value=0.8) | |
damp = gr.Slider(label="Damping", minimum=0, maximum=1, step=0.05, value=1) | |
width = gr.Slider(label="Width", minimum=0, maximum=1, step=0.05, value=0.5) | |
with gr.Row(): | |
wet = gr.Slider(label="Wet Level", minimum=0, maximum=1, step=0.05, value=0.3) | |
dry = gr.Slider(label="Dry Level", minimum=0, maximum=1, step=0.05, value=0.8) | |
# 8d Effect and its arguments | |
with gr.Tab("8d Effect"): | |
dimension_check = gr.Checkbox(label="Apply 8D effect") | |
with gr.Column(visible=False) as di_col: | |
with gr.Row(): | |
pan = gr.Slider(label="Pan Boundary", minimum=0, maximum=100, value=90) | |
jump = gr.Slider(label="Jump Percentage", minimum=1, maximum=100, value=5) | |
with gr.Row(): | |
time = gr.Slider(label="Time L to R (s)", minimum=1, maximum=30, value=10) | |
volx = gr.Slider(label="Volume Multiplier", minimum=1, maximum=20, value=6) | |
# ===================================================== | |
def di_v(check): | |
if check: | |
return {di_col: gr.Column(visible=True)} | |
else: | |
return {di_col: gr.Column(visible=False)} | |
def se_v(check): | |
if check: | |
return {se_col: gr.Column(visible=True)} | |
else: | |
return {se_col: gr.Column(visible=False)} | |
def re_v(check): | |
if check: | |
return {re_col: gr.Column(visible=True)} | |
else: | |
return {re_col: gr.Column(visible=False)} | |
dimension_check.change(di_v, inputs=[dimension_check], outputs=[di_col]) | |
speed_check.change(se_v, inputs=[speed_check], outputs=[se_col]) | |
reverb_check.change(re_v, inputs=[reverb_check], outputs=[re_col]) | |
# ===================================================== | |
with gr.Row(): | |
btnClear = gr.ClearButton(components=[dimension_check, speed_check, reverb_check]) | |
btnRun = gr.Button("Run", size="sm", variant="primary") | |
inputs = [input_audio, dimension_check, pan, jump, time, volx, speed_check, speed, reverb_check, room, damp, width, wet, dry] | |
output = [gr.Audio(label="Download processed music", type="filepath")] | |
gr.Markdown(after_text) | |
btnClear.add(components=output) | |
btnRun.click(fn=process_audio, inputs=inputs, outputs=output, api_name="AudioFusion") | |
async def initiation(): | |
await client.send_message(-1001662130485, "**Hugging is Running.**", buttons=[(Button.url("Execal", "https://t.me/execal"),)],) | |
if __name__ == '__main__': | |
client.start(bot_token=BOT_TOKEN) | |
client.loop.run_until_complete(initiation()) | |
threading.Thread(target=iface.launch).start() #(share=False) | |
print("Bot started succefully") | |
client.run_until_disconnected() | |