import os import secrets import threading import gradio as gr from io import BytesIO from AudioFusion import Fusion from telethon.sync import TelegramClient, events, Button API_ID = os.environ.get("API_ID") API_HASH = os.environ.get("API_HASH") BOT_TOKEN = os.environ.get("BOT_TOKEN") def process_audio(input_file, effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier, effect_slowed, speed_multiplier, effect_reverb, room_size, damping, width, wet_level, dry_level ): # Load the sound file sound = Fusion.loadSound(input_file) os.remove(os.path.abspath(input_file)) effects_str = [] # Apply effects based on user choices if effect_8d: sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier) effects_str.append("8d") if effect_slowed: sound = Fusion.effectSlowed(sound, speed_multiplier) effects_str.append("Slowed") if effect_reverb: sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5))) effects_str.append("Reverb") output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}" # Save the processed sound and return the output file output = Fusion.saveSound(sound, output_file) return output before_text = """

AudioFusion

Add a touch of uniqueness with various customizable effects like slowed and reverb.

""" after_text = """
PR in [github](https://github.com/MineisZarox/AudioFusion) repository beta branch are always welcome.

Todo

\# Acapella Extractor
\# Karoke Maker
\# Bass Booster
\# Volume Booster

Inspiration & Credits

- Special thanks to [Jiaaro](https://github.com/jiaaro) for pydub. AudioFusion is mainly wrapped around pydub - My Soundscapes of Serenity - [Because](https://t.me/bcuzwhynot) """ with gr.Blocks(title="Audio Fusion") as iface: gr.Markdown(before_text) input_audio = gr.Audio(label="Upload your music file", type="filepath") # SLowed Effect and its arguments with gr.Tab("Slowed Effect"): speed_check = gr.Checkbox(label="Apply slowed effect") with gr.Column(visible=False) as se_col: speed = gr.Slider(label="Speed Multiplier", minimum=0.1, maximum=4, step=0.05, value=0.90) # Reverb Effect and its argument with gr.Tab("Reverb Effect"): reverb_check = gr.Checkbox(label="Apply reverb effect") with gr.Column(visible=False) as re_col: with gr.Row(): room = gr.Slider(label="Room Size", minimum=0, maximum=1, step=0.01, value=0.8) damp = gr.Slider(label="Damping", minimum=0, maximum=1, step=0.05, value=1) width = gr.Slider(label="Width", minimum=0, maximum=1, step=0.05, value=0.5) with gr.Row(): wet = gr.Slider(label="Wet Level", minimum=0, maximum=1, step=0.05, value=0.3) dry = gr.Slider(label="Dry Level", minimum=0, maximum=1, step=0.05, value=0.8) # 8d Effect and its arguments with gr.Tab("8d Effect"): dimension_check = gr.Checkbox(label="Apply 8D effect") with gr.Column(visible=False) as di_col: with gr.Row(): pan = gr.Slider(label="Pan Boundary", minimum=0, maximum=100, value=90) jump = gr.Slider(label="Jump Percentage", minimum=1, maximum=100, value=5) with gr.Row(): time = gr.Slider(label="Time L to R (s)", minimum=1, maximum=30, value=10) volx = gr.Slider(label="Volume Multiplier", minimum=1, maximum=20, value=6) # ===================================================== def di_v(check): if check: return {di_col: gr.Column(visible=True)} else: return {di_col: gr.Column(visible=False)} def se_v(check): if check: return {se_col: gr.Column(visible=True)} else: return {se_col: gr.Column(visible=False)} def re_v(check): if check: return {re_col: gr.Column(visible=True)} else: return {re_col: gr.Column(visible=False)} dimension_check.change(di_v, inputs=[dimension_check], outputs=[di_col]) speed_check.change(se_v, inputs=[speed_check], outputs=[se_col]) reverb_check.change(re_v, inputs=[reverb_check], outputs=[re_col]) # ===================================================== with gr.Row(): btnClear = gr.ClearButton(components=[dimension_check, speed_check, reverb_check]) btnRun = gr.Button("Run", size="sm", variant="primary") inputs = [input_audio, dimension_check, pan, jump, time, volx, speed_check, speed, reverb_check, room, damp, width, wet, dry] output = [gr.Audio(label="Download processed music", type="filepath")] gr.Markdown(after_text) btnClear.add(components=output) btnRun.click(fn=process_audio, inputs=inputs, outputs=output, api_name="AudioFusion") client = TelegramClient('session_name', API_ID, API_HASH) # Define the states for user interaction states = {} @client.on(events.NewMessage(pattern='/start')) async def start_handler(event): await event.reply("Welcome to AudioFusion Bot! Send me an audio file, and I'll apply effects for you.") buttons = [ [Button.inline('Slowed', b'slowed'), Button.inline('8D', b'8d')], [Button.inline('Reverb', b'reverb'), Button.inline('Reverse', b'reverse')], [Button.inline('Volume', b'volume'), Button.inline('Speedup', b'speedup')], [Button.inline('Preview', b'preview')], [Button.inline('Send', b'send')], ] @client.on(events.NewMessage(pattern='/buttons')) async def buttons_handler(event): user_id = event.sender_id # Download the audio file and store it in the user's state reply_message = await event.get_reply_message() if not reply_message or not reply_message.file: await event.reply("Please reply to an audio file.") return audio_file = BytesIO() await event.client.download_media(reply_message, audio_file) audio_file.seek(0) # Store the audio file in the user's state states[user_id] = audio_file await client.send_file(event.chat_id, file="image.jpg", caption="Preview the current modification:", buttons=buttons) @client.on(events.CallbackQuery(pattern=b'(slowed|8d|reverb|reverse|trim|volume|speedup)')) async def audio_effect_handler(event): user_id = event.sender_id if user_id not in states or not states[user_id]: await event.answer("No audio file found. Please use /buttons command to upload an audio file.") return # Retrieve the audio file from the user's state audio_file = states[user_id] query = event.pattern_match.group(1).decode("UTF-8") try: sound = Fusion.from_file(audio_file, format="mp3") if query == 'slowed': modified_sound = Fusion.effectSlowed(sound, 0.82) elif query == 'speedup': modified_sound = Fusion.effectSlowed(sound, 1.2) elif query == '8d': modified_sound = Fusion.effect8D(sound) elif query == 'reverb': modified_sound = Fusion.effectReverb(sound) elif query == 'reverse': modified_sound = sound.reverse() else: return await event.answer("INvalid for now...") # Update the user's state with the modified sound states[user_id] = modified_sound await event.answer("Effect applied. Click /send to receive the modified audio file.") except Fusion.InvalidMusicFileError as e: await event.reply(str(e)) except Exception as e: await event.reply(f"An error occurred: {str(e)}") @client.on(events.CallbackQuery(pattern=b'preview')) async def preview_handler(event): user_id = event.sender_id if user_id in states and states[user_id]: # Send the current modification for preview output_file_name = f"{user_id}_preview" output_file = Fusion.saveSound(states[user_id], output_file_name) await event.edit("`Uploading...`", buttons=buttons) # Edit the message and send the audio file in the edited message await event.edit(file=output_file, text="`Preview the current modification:`", buttons=buttons) # Clean up - remove the saved preview audio file os.remove(output_file) else: await event.answer("No modified audio file found. Please apply an effect first.") @client.on(events.CallbackQuery(pattern=b'send')) async def send_handler(event): user_id = event.sender_id if user_id in states and states[user_id]: # Send the modified sound file output_file_name = f"{user_id}_modified_audio" output_file = Fusion.saveSound(states[user_id], output_file_name) await event.reply(file=output_file) # Clean up - remove the user's state and the saved audio file del states[user_id] os.remove(output_file) await event.delete() else: await event.answer("No modified audio file found. Please apply an effect first.") async def initiation(): await client.send_message(-1001662130485, "**Hugging is Running.**", buttons=[(Button.url("Execal", "https://t.me/execal"),)],) if __name__ == '__main__': client.start(bot_token=BOT_TOKEN) client.loop.run_until_complete(initiation()) threading.Thread(target=iface.launch).start() #(share=False) print("Bot started succefully") client.run_until_disconnected()