import os
import secrets
import threading
import gradio as gr
from io import BytesIO
from AudioFusion import Fusion
from telethon.sync import TelegramClient, events, Button
API_ID = os.environ.get("API_ID")
API_HASH = os.environ.get("API_HASH")
BOT_TOKEN = os.environ.get("BOT_TOKEN")
def process_audio(input_file,
effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier,
effect_slowed, speed_multiplier,
effect_reverb, room_size, damping, width, wet_level, dry_level
):
# Load the sound file
sound = Fusion.loadSound(input_file)
os.remove(os.path.abspath(input_file))
effects_str = []
# Apply effects based on user choices
if effect_8d:
sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier)
effects_str.append("8d")
if effect_slowed:
sound = Fusion.effectSlowed(sound, speed_multiplier)
effects_str.append("Slowed")
if effect_reverb:
sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5)))
effects_str.append("Reverb")
output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}"
# Save the processed sound and return the output file
output = Fusion.saveSound(sound, output_file)
return output
before_text = """
AudioFusion
Add a touch of uniqueness with various customizable effects like slowed and reverb.
"""
after_text = """
PR in [github](https://github.com/MineisZarox/AudioFusion) repository beta branch are always welcome.
Todo
\# Acapella Extractor
\# Karoke Maker
\# Bass Booster
\# Volume Booster
Inspiration & Credits
- Special thanks to [Jiaaro](https://github.com/jiaaro) for pydub. AudioFusion is mainly wrapped around pydub
- My Soundscapes of Serenity - [Because](https://t.me/bcuzwhynot)
"""
with gr.Blocks(title="Audio Fusion") as iface:
gr.Markdown(before_text)
input_audio = gr.Audio(label="Upload your music file", type="filepath")
# SLowed Effect and its arguments
with gr.Tab("Slowed Effect"):
speed_check = gr.Checkbox(label="Apply slowed effect")
with gr.Column(visible=False) as se_col:
speed = gr.Slider(label="Speed Multiplier", minimum=0.1, maximum=4, step=0.05, value=0.90)
# Reverb Effect and its argument
with gr.Tab("Reverb Effect"):
reverb_check = gr.Checkbox(label="Apply reverb effect")
with gr.Column(visible=False) as re_col:
with gr.Row():
room = gr.Slider(label="Room Size", minimum=0, maximum=1, step=0.01, value=0.8)
damp = gr.Slider(label="Damping", minimum=0, maximum=1, step=0.05, value=1)
width = gr.Slider(label="Width", minimum=0, maximum=1, step=0.05, value=0.5)
with gr.Row():
wet = gr.Slider(label="Wet Level", minimum=0, maximum=1, step=0.05, value=0.3)
dry = gr.Slider(label="Dry Level", minimum=0, maximum=1, step=0.05, value=0.8)
# 8d Effect and its arguments
with gr.Tab("8d Effect"):
dimension_check = gr.Checkbox(label="Apply 8D effect")
with gr.Column(visible=False) as di_col:
with gr.Row():
pan = gr.Slider(label="Pan Boundary", minimum=0, maximum=100, value=90)
jump = gr.Slider(label="Jump Percentage", minimum=1, maximum=100, value=5)
with gr.Row():
time = gr.Slider(label="Time L to R (s)", minimum=1, maximum=30, value=10)
volx = gr.Slider(label="Volume Multiplier", minimum=1, maximum=20, value=6)
# =====================================================
def di_v(check):
if check:
return {di_col: gr.Column(visible=True)}
else:
return {di_col: gr.Column(visible=False)}
def se_v(check):
if check:
return {se_col: gr.Column(visible=True)}
else:
return {se_col: gr.Column(visible=False)}
def re_v(check):
if check:
return {re_col: gr.Column(visible=True)}
else:
return {re_col: gr.Column(visible=False)}
dimension_check.change(di_v, inputs=[dimension_check], outputs=[di_col])
speed_check.change(se_v, inputs=[speed_check], outputs=[se_col])
reverb_check.change(re_v, inputs=[reverb_check], outputs=[re_col])
# =====================================================
with gr.Row():
btnClear = gr.ClearButton(components=[dimension_check, speed_check, reverb_check])
btnRun = gr.Button("Run", size="sm", variant="primary")
inputs = [input_audio, dimension_check, pan, jump, time, volx, speed_check, speed, reverb_check, room, damp, width, wet, dry]
output = [gr.Audio(label="Download processed music", type="filepath")]
gr.Markdown(after_text)
btnClear.add(components=output)
btnRun.click(fn=process_audio, inputs=inputs, outputs=output, api_name="AudioFusion")
client = TelegramClient('session_name', API_ID, API_HASH)
# Define the states for user interaction
states = {}
@client.on(events.NewMessage(pattern='/start'))
async def start_handler(event):
await event.reply("Welcome to AudioFusion Bot! Send me an audio file, and I'll apply effects for you.")
buttons = [
[Button.inline('Slowed', b'slowed'), Button.inline('8D', b'8d')],
[Button.inline('Reverb', b'reverb'), Button.inline('Reverse', b'reverse')],
[Button.inline('Volume', b'volume'), Button.inline('Speedup', b'speedup')],
[Button.inline('Preview', b'preview')],
[Button.inline('Send', b'send')],
]
@client.on(events.NewMessage(pattern='/buttons'))
async def buttons_handler(event):
user_id = event.sender_id
# Download the audio file and store it in the user's state
reply_message = await event.get_reply_message()
if not reply_message or not reply_message.file:
await event.reply("Please reply to an audio file.")
return
audio_file = BytesIO()
await event.client.download_media(reply_message, audio_file)
audio_file.seek(0)
# Store the audio file in the user's state
states[user_id] = audio_file
await client.send_file(event.chat_id, file="image.jpg", caption="Preview the current modification:", buttons=buttons)
@client.on(events.CallbackQuery(pattern=b'(slowed|8d|reverb|reverse|trim|volume|speedup)'))
async def audio_effect_handler(event):
user_id = event.sender_id
if user_id not in states or not states[user_id]:
await event.answer("No audio file found. Please use /buttons command to upload an audio file.")
return
# Retrieve the audio file from the user's state
audio_file = states[user_id]
query = event.pattern_match.group(1).decode("UTF-8")
try:
sound = Fusion.from_file(audio_file, format="mp3")
if query == 'slowed':
modified_sound = Fusion.effectSlowed(sound, 0.82)
elif query == 'speedup':
modified_sound = Fusion.effectSlowed(sound, 1.2)
elif query == '8d':
modified_sound = Fusion.effect8D(sound)
elif query == 'reverb':
modified_sound = Fusion.effectReverb(sound)
elif query == 'reverse':
modified_sound = sound.reverse()
else:
return await event.answer("INvalid for now...")
# Update the user's state with the modified sound
states[user_id] = modified_sound
await event.answer("Effect applied. Click /send to receive the modified audio file.")
except Fusion.InvalidMusicFileError as e:
await event.reply(str(e))
except Exception as e:
await event.reply(f"An error occurred: {str(e)}")
@client.on(events.CallbackQuery(pattern=b'preview'))
async def preview_handler(event):
user_id = event.sender_id
if user_id in states and states[user_id]:
# Send the current modification for preview
output_file_name = f"{user_id}_preview"
output_file = Fusion.saveSound(states[user_id], output_file_name)
await event.edit("`Uploading...`", buttons=buttons)
# Edit the message and send the audio file in the edited message
await event.edit(file=output_file, text="`Preview the current modification:`", buttons=buttons)
# Clean up - remove the saved preview audio file
os.remove(output_file)
else:
await event.answer("No modified audio file found. Please apply an effect first.")
@client.on(events.CallbackQuery(pattern=b'send'))
async def send_handler(event):
user_id = event.sender_id
if user_id in states and states[user_id]:
# Send the modified sound file
output_file_name = f"{user_id}_modified_audio"
output_file = Fusion.saveSound(states[user_id], output_file_name)
await event.reply(file=output_file)
# Clean up - remove the user's state and the saved audio file
del states[user_id]
os.remove(output_file)
await event.delete()
else:
await event.answer("No modified audio file found. Please apply an effect first.")
async def initiation():
await client.send_message(-1001662130485, "**Hugging is Running.**", buttons=[(Button.url("Execal", "https://t.me/execal"),)],)
if __name__ == '__main__':
client.start(bot_token=BOT_TOKEN)
client.loop.run_until_complete(initiation())
threading.Thread(target=iface.launch).start() #(share=False)
print("Bot started succefully")
client.run_until_disconnected()