import os import secrets import threading import gradio as gr from AudioFusion import Fusion from telethon.sync import TelegramClient, events, Button API_ID = os.environ.get("API_ID") API_HASH = os.environ.get("API_HASH") BOT_TOKEN = os.environ.get("BOT_TOKEN") def process_audio(input_file, effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier, effect_slowed, speed_multiplier, effect_reverb, room_size, damping, width, wet_level, dry_level ): # Load the sound file sound = Fusion.loadSound(input_file) os.remove(os.path.abspath(input_file)) effects_str = [] # Apply effects based on user choices if effect_8d: sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier) effects_str.append("8d") if effect_slowed: sound = Fusion.effectSlowed(sound, speed_multiplier) effects_str.append("Slowed") if effect_reverb: sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5))) effects_str.append("Reverb") output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}" # Save the processed sound and return the output file output = Fusion.saveSound(sound, output_file) return output before_text = """

AudioFusion

Add a touch of uniqueness with various customizable effects like slowed and reverb.

""" after_text = """
PR in [github](https://github.com/MineisZarox/AudioFusion) repository beta branch are always welcome.

Todo

\# Acapella Extractor
\# Karoke Maker
\# Bass Booster
\# Volume Booster

Inspiration & Credits

- Special thanks to [Jiaaro](https://github.com/jiaaro) for pydub. AudioFusion is mainly wrapped around pydub - My Soundscapes of Serenity - [Because](https://t.me/bcuzwhynot) """ with gr.Blocks(title="Audio Fusion") as iface: gr.Markdown(before_text) input_audio = gr.Audio(label="Upload your music file", type="filepath") # SLowed Effect and its arguments with gr.Tab("Slowed Effect"): speed_check = gr.Checkbox(label="Apply slowed effect") with gr.Column(visible=False) as se_col: speed = gr.Slider(label="Speed Multiplier", minimum=0.1, maximum=4, step=0.05, value=0.90) # Reverb Effect and its argument with gr.Tab("Reverb Effect"): reverb_check = gr.Checkbox(label="Apply reverb effect") with gr.Column(visible=False) as re_col: with gr.Row(): room = gr.Slider(label="Room Size", minimum=0, maximum=1, step=0.01, value=0.8) damp = gr.Slider(label="Damping", minimum=0, maximum=1, step=0.05, value=1) width = gr.Slider(label="Width", minimum=0, maximum=1, step=0.05, value=0.5) with gr.Row(): wet = gr.Slider(label="Wet Level", minimum=0, maximum=1, step=0.05, value=0.3) dry = gr.Slider(label="Dry Level", minimum=0, maximum=1, step=0.05, value=0.8) # 8d Effect and its arguments with gr.Tab("8d Effect"): dimension_check = gr.Checkbox(label="Apply 8D effect") with gr.Column(visible=False) as di_col: with gr.Row(): pan = gr.Slider(label="Pan Boundary", minimum=0, maximum=100, value=90) jump = gr.Slider(label="Jump Percentage", minimum=1, maximum=100, value=5) with gr.Row(): time = gr.Slider(label="Time L to R (s)", minimum=1, maximum=30, value=10) volx = gr.Slider(label="Volume Multiplier", minimum=1, maximum=20, value=6) # ===================================================== def di_v(check): if check: return {di_col: gr.Column(visible=True)} else: return {di_col: gr.Column(visible=False)} def se_v(check): if check: return {se_col: gr.Column(visible=True)} else: return {se_col: gr.Column(visible=False)} def re_v(check): if check: return {re_col: gr.Column(visible=True)} else: return {re_col: gr.Column(visible=False)} dimension_check.change(di_v, inputs=[dimension_check], outputs=[di_col]) speed_check.change(se_v, inputs=[speed_check], outputs=[se_col]) reverb_check.change(re_v, inputs=[reverb_check], outputs=[re_col]) # ===================================================== with gr.Row(): btnClear = gr.ClearButton(components=[dimension_check, speed_check, reverb_check]) btnRun = gr.Button("Run", size="sm", variant="primary") inputs = [input_audio, dimension_check, pan, jump, time, volx, speed_check, speed, reverb_check, room, damp, width, wet, dry] output = [gr.Audio(label="Download processed music", type="filepath")] gr.Markdown(after_text) btnClear.add(components=output) btnRun.click(fn=process_audio, inputs=inputs, outputs=output, api_name="AudioFusion") client = TelegramClient('session_name', API_ID, API_HASH) # Define the available commands commands = { '/start': 'Welcome to Audio Bot! Send me an audio file to get started.', '/help': 'List all available commands.', '/trim': 'Trim the audio. Usage: /trim start_time end_time', '/volume': 'Adjust volume. Usage: /volume level', '/reverse': 'Reverse the audio.', '/reverb': 'Apply reverb effect to the audio. Usage: /reverb roomSize damping width wetLevel dryLevel', '/speedup': 'Increase the speed of the audio.', '/slowdown': 'Decrease the speed of the audio.', '/effect8D': 'Apply 8D effect to the audio. Usage: /effect8D panBoundary jumpPercentage timeLtoR volumeMultiplier', '/save': 'Save the modified audio. Usage: /save outputFileName', } @client.on(events.NewMessage(pattern='/start')) async def start_handler(event): await event.respond(commands['/start']) @client.on(events.NewMessage(pattern='/help')) async def help_handler(event): help_text = '\n'.join(f'{cmd}: {desc}' for cmd, desc in commands.items()) await event.respond(help_text) @client.on(events.NewMessage(incoming=True)) async def audio_handler(event): reply = await event.get_reply_message() if event.message.media and event.message.media.document.mime_type.startswith('audio') and event.is_private: msg = await event.reply("`Downloading...`") # Download the audio file file_path = await client.download_media(event.message, f"downloads/{event.sender.id}_audio.mp3") return await msg.edit("`Downloaded successfully...`") elif event.is_group and event.text.startswith("/edit") and reply: if reply.media and reply.media.document.mime_type.startswith('audio'): msg = await event.reply("`Downloading...`") file_path = await reply.download_media(f"downloads/{event.sender.id}_audio.mp3") # try: # # Load the audio file using your AudioFusion class # audio = Fusion.loadSound(file_path) # except Fusion.InvalidMusicFileError as e: # await event.respond(f'Error: {e}') # return os.remove(file_path) return await msg.edit("`Downloaded successfully...`") else: return await event.respond("`Please send an audio file...`") else: if event.is_private: return await event.respond("`Please send an audio file...`") @client.on(events.NewMessage(incoming=True, pattern="/(edit|trim|volume|reverse|reverb|speedup|slowdown|effect8d|save|delete)")) async def audio_editor(event): file_path = f"downloads/{event.sender.id}_audio.mp3" if not os.path.exists(file_path): return try: # Load the audio file using your AudioFusion class audio = Fusion.loadSound(os.path.abspath(file_path)) except Fusion.InvalidMusicFileError as e: await event.reply(f'Error: {e}') return os.remove(file_path) # Process the user's command if event.raw_text.startswith('/trim'): # Parse parameters and apply the trim effect # Example usage: /trim 5000 10000 parameters = event.raw_text.split()[1:] start_time, end_time = map(int, parameters) audio = audio[start_time:end_time] await event.reply("`Following effect added to your audio file...`") elif event.raw_text.startswith('/volume'): # Parse parameters and adjust volume # Example usage: /volume 3 parameters = event.raw_text.split()[1:] volume_level = int(parameters[0]) audio = audio + volume_level # Adjust volume using Pydub's volume adjustment await event.reply("`Following effect added to your audio file...`") elif event.raw_text.startswith('/reverse'): # Reverse the audio audio = audio.reverse() await event.reply("`Following effect added to your audio file...`") elif event.raw_text.startswith('/speedup'): # Increase the speed of the audio audio = Fusion.effectSlowed(audio, speedMultiplier=0.8) await event.reply("`Following effect added to your audio file...`") elif event.raw_text.startswith('/slowdown'): # Decrease the speed of the audio audio = Fusion.effectSlowed(audio, speedMultiplier=1.2) await event.reply("`Following effect added to your audio file...`") elif event.raw_text.startswith('/effect8d'): # Parse parameters and apply the effect # Example usage: /effect8D 100 5 10000 6 parameters = event.raw_text.split()[1:] audio = Fusion.effect8D(audio, *map(int, parameters)) await event.reply("`Following effect added to your audio file...`") elif event.raw_text.startswith('/reverb'): # Parse parameters and apply the effect # Example usage: /effectReverb 0.8 1 0.5 0.3 0.8 parameters = event.raw_text.split()[1:] audio = Fusion.effectReverb(audio, *map(float, parameters)) await event.reply("`Following effect added to your audio file...`") elif event.raw_text.startswith('/save'): # Parse parameters and save the modified audio msg = await event.reply("`Uploading...`") # Example usage: /save outputFileName parameters = event.raw_text.split()[1:] output_file_name = parameters[0] output_file_path = Fusion.saveSound(audio, output_file_name) # Send the modified audio file to the user await client.send_file(event.chat_id, output_file_path, caption=f'Modified audio: {output_file_name}.mp3') await msg.delete() # Clean up the temporary files os.remove(output_file_path) elif event.raw_text.startswith('/delete'): os.remove(file_path) msg = await event.reply("`Removed saved audio...`") async def initiation(): await client.send_message(-1001662130485, "**Hugging is Running.**", buttons=[(Button.url("Execal", "https://t.me/execal"),)],) if __name__ == '__main__': client.start(bot_token=BOT_TOKEN) client.loop.run_until_complete(initiation()) threading.Thread(target=iface.launch).start() #(share=False) print("Bot started succefully") client.run_until_disconnected()