import os import secrets import threading import gradio as gr from io import BytesIO from fusion import Fusion from telethon import TelegramClient, events, Button, types API_ID = os.environ.get("API_ID") API_HASH = os.environ.get("API_HASH") BOT_TOKEN = os.environ.get("BOT_TOKEN") client = TelegramClient('session_name', API_ID, API_HASH) states = {} @client.on(events.NewMessage(pattern='/start')) async def start_handler(event): await event.reply("Welcome to AudioFusion Bot! Send me an audio file, and I'll apply effects for you.") buttons = [ [Button.inline('Slowed', b'slowed'), Button.inline('8D', b'8d')], [Button.inline('Reverb', b'reverb'), Button.inline('Reverse', b'reverse')], [Button.inline('Volume', b'volume'), Button.inline('Speedup', b'speedup')], [Button.inline('Preview', b'preview')], [Button.inline('Send', b'send')], ] @client.on(events.NewMessage(pattern='/buttons')) async def buttons_handler(event): user_id = event.sender_id # Download the audio file and store it in the user's state reply_message = await event.get_reply_message() if not reply_message or not reply_message.file: await event.reply("Please reply to an audio file.") return audio_file = BytesIO() await event.client.download_media(reply_message, audio_file) audio_file.seek(0) # Store the audio file in the user's state states[user_id] = {'audio': audio_file} await client.send_file(event.chat_id, file="image.jpg", caption="Preview the current modification:", buttons=buttons) @client.on(events.CallbackQuery(pattern=b'(slowed|8d|reverb|reverse|trim|volume|speedup)')) async def audio_effect_handler(event): user_id = event.sender_id if user_id not in states or not states[user_id]: await event.answer("No audio file found. Please use /buttons command to upload an audio file.") return # Retrieve the audio file from the user's state audio_file = states[user_id]['audio'] query = event.pattern_match.group(1).decode("UTF-8") sound = Fusion.from_file(audio_file, format="mp3") if query == 'slowed': modified_sound = await Fusion.effectSlowed(sound) elif query == 'speedup': modified_sound = await Fusion.effectSlowed(sound, 1.1) elif query == '8d': modified_sound = await Fusion.effect8D(sound) elif query == 'reverb': modified_sound = await Fusion.effectReverb(sound) elif query == 'reverse': modified_sound = sound.reverse() else: return await event.answer("INvalid for now...") audio_file = BytesIO() audio = modified_sound.export(audio_file, format="mp3") audio.seek(0) # Update the user's state with the modified sound states[user_id]['audio'] = audio await event.answer("Effect applied. Click /send to receive the modified audio file.", alert=True) @client.on(events.CallbackQuery(pattern=b'preview')) async def preview_handler(event): user_id = event.sender_id if user_id in states and states[user_id]: # Send the current modification for preview output_file_name = f"{user_id}_preview" output_file = await Fusion.saveSound(states[user_id]['audio'], output_file_name) await event.edit("`Uploading...`", buttons=buttons) # Edit the message and send the audio file in the edited message await event.edit(file=output_file, text="`Preview the current modification:`", buttons=buttons) # Clean up - remove the saved preview audio file os.remove(output_file) else: await event.answer("No modified audio file found. Please apply an effect first.", alert=True) @client.on(events.CallbackQuery(pattern=b'send')) async def send_handler(event): user_id = event.sender_id if user_id in states and states[user_id]: # Send the modified sound file # output_file_name = f"{user_id}_modified_audio" # output_file = await Fusion.saveSound(states[user_id]["audio"], output_file_name) # await event.reply(file=output_file) await client.send_file(event.chat_id, file=states[user_id]["audio"], attributes=types.DocumentAttributeFilename(file_name="Audio.mp3")) # Clean up - remove the user's state and the saved audio file del states[user_id] # os.remove(output_file) await event.delete() else: await event.answer("No modified audio file found. Please apply an effect first.") def process_audio(input_file, effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier, effect_slowed, speed_multiplier, effect_reverb, room_size, damping, width, wet_level, dry_level ): # Load the sound file sound = Fusion.loadSound(input_file) os.remove(os.path.abspath(input_file)) effects_str = [] # Apply effects based on user choices if effect_8d: sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier) effects_str.append("8d") if effect_slowed: sound = Fusion.effectSlowed(sound, speed_multiplier) effects_str.append("Slowed") if effect_reverb: sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5))) effects_str.append("Reverb") output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}" # Save the processed sound and return the output file return Fusion.saveSound(sound, output_file) before_text = """