Spaces:
Running
Running
File size: 9,685 Bytes
4bb640e 998beeb 06583ef 9acefe3 3e0f2ab 9acefe3 488c75f 00e3b03 d22ccce 00e3b03 8651489 4bb640e 8651489 19c8b69 4bb640e d6c1a9f 4bb640e 19c8b69 1f7c7c0 d6c1a9f 19c8b69 d6c1a9f 19c8b69 716e055 d6c1a9f 19c8b69 4bb640e 19c8b69 716e055 19c8b69 9acefe3 df433d3 c0c3d77 df433d3 e77a682 df433d3 e77a682 df433d3 0d37356 df433d3 9203d26 71488aa 16d22f2 9203d26 16d22f2 9203d26 71488aa 6546294 16d22f2 6546294 16d22f2 bbc5ea2 16d22f2 bbc5ea2 16d22f2 9203d26 a2a9034 6df37b5 d56b541 71488aa d56b541 71488aa 6df37b5 71488aa 6df37b5 71488aa 6df37b5 71488aa 6df37b5 71488aa d56b541 a2a9034 6546294 d7a7779 6546294 9203d26 d7a7779 cbad5f5 6546294 df433d3 6546294 9acefe3 00e3b03 3e0f2ab caa956c 0a0eb88 00e3b03 3e0f2ab 0a0eb88 3e0f2ab caa956c 3e0f2ab 0a0eb88 3e0f2ab 4cb0ebf 3e0f2ab 00e3b03 3e0f2ab caa956c 3e0f2ab 4cb0ebf 3e0f2ab 4cb0ebf 00e3b03 3e0f2ab 0a0eb88 3e0f2ab caa956c 3e0f2ab 4cb0ebf 3e0f2ab 0a0eb88 3e0f2ab 00e3b03 488c75f 917fbb7 488c75f 00e3b03 488c75f a071ca5 06583ef b3ade12 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
import os
import secrets
import threading
import gradio as gr
from io import BytesIO
from AudioFusion import Fusion
from telethon.sync import TelegramClient, events, Button
API_ID = os.environ.get("API_ID")
API_HASH = os.environ.get("API_HASH")
BOT_TOKEN = os.environ.get("BOT_TOKEN")
def process_audio(input_file,
effect_8d, pan_boundary, jump_percentage, time_l_to_r, volume_multiplier,
effect_slowed, speed_multiplier,
effect_reverb, room_size, damping, width, wet_level, dry_level
):
# Load the sound file
sound = Fusion.loadSound(input_file)
os.remove(os.path.abspath(input_file))
effects_str = []
# Apply effects based on user choices
if effect_8d:
sound = Fusion.effect8D(sound, pan_boundary, jump_percentage, time_l_to_r*1000, volume_multiplier)
effects_str.append("8d")
if effect_slowed:
sound = Fusion.effectSlowed(sound, speed_multiplier)
effects_str.append("Slowed")
if effect_reverb:
sound = Fusion.effectReverb(sound, room_size, damping, width, wet_level, dry_level, str(secrets.token_hex(5)))
effects_str.append("Reverb")
output_file = f"{input_file} {' + '.join(effects_str)} - {'By AudioFusion'}"
# Save the processed sound and return the output file
output = Fusion.saveSound(sound, output_file)
return output
before_text = """<div align="center">
<h1>AudioFusion</h1>
<i>Add a touch of uniqueness with various customizable effects like slowed and reverb.</i>
</div>
<hr>"""
after_text = """<hr>
PR in [github](https://github.com/MineisZarox/AudioFusion) repository beta branch are always welcome.
<h3>Todo</h3>
\# Acapella Extractor<br>
\# Karoke Maker<br>
\# Bass Booster<br>
\# Volume Booster<br>
<h3>Inspiration & Credits</h3>
- Special thanks to [Jiaaro](https://github.com/jiaaro) for pydub. AudioFusion is mainly wrapped around pydub
- My Soundscapes of Serenity - [Because](https://t.me/bcuzwhynot)
"""
with gr.Blocks(title="Audio Fusion") as iface:
gr.Markdown(before_text)
input_audio = gr.Audio(label="Upload your music file", type="filepath")
# SLowed Effect and its arguments
with gr.Tab("Slowed Effect"):
speed_check = gr.Checkbox(label="Apply slowed effect")
with gr.Column(visible=False) as se_col:
speed = gr.Slider(label="Speed Multiplier", minimum=0.1, maximum=4, step=0.05, value=0.90)
# Reverb Effect and its argument
with gr.Tab("Reverb Effect"):
reverb_check = gr.Checkbox(label="Apply reverb effect")
with gr.Column(visible=False) as re_col:
with gr.Row():
room = gr.Slider(label="Room Size", minimum=0, maximum=1, step=0.01, value=0.8)
damp = gr.Slider(label="Damping", minimum=0, maximum=1, step=0.05, value=1)
width = gr.Slider(label="Width", minimum=0, maximum=1, step=0.05, value=0.5)
with gr.Row():
wet = gr.Slider(label="Wet Level", minimum=0, maximum=1, step=0.05, value=0.3)
dry = gr.Slider(label="Dry Level", minimum=0, maximum=1, step=0.05, value=0.8)
# 8d Effect and its arguments
with gr.Tab("8d Effect"):
dimension_check = gr.Checkbox(label="Apply 8D effect")
with gr.Column(visible=False) as di_col:
with gr.Row():
pan = gr.Slider(label="Pan Boundary", minimum=0, maximum=100, value=90)
jump = gr.Slider(label="Jump Percentage", minimum=1, maximum=100, value=5)
with gr.Row():
time = gr.Slider(label="Time L to R (s)", minimum=1, maximum=30, value=10)
volx = gr.Slider(label="Volume Multiplier", minimum=1, maximum=20, value=6)
# =====================================================
def di_v(check):
if check:
return {di_col: gr.Column(visible=True)}
else:
return {di_col: gr.Column(visible=False)}
def se_v(check):
if check:
return {se_col: gr.Column(visible=True)}
else:
return {se_col: gr.Column(visible=False)}
def re_v(check):
if check:
return {re_col: gr.Column(visible=True)}
else:
return {re_col: gr.Column(visible=False)}
dimension_check.change(di_v, inputs=[dimension_check], outputs=[di_col])
speed_check.change(se_v, inputs=[speed_check], outputs=[se_col])
reverb_check.change(re_v, inputs=[reverb_check], outputs=[re_col])
# =====================================================
with gr.Row():
btnClear = gr.ClearButton(components=[dimension_check, speed_check, reverb_check])
btnRun = gr.Button("Run", size="sm", variant="primary")
inputs = [input_audio, dimension_check, pan, jump, time, volx, speed_check, speed, reverb_check, room, damp, width, wet, dry]
output = [gr.Audio(label="Download processed music", type="filepath")]
gr.Markdown(after_text)
btnClear.add(components=output)
btnRun.click(fn=process_audio, inputs=inputs, outputs=output, api_name="AudioFusion")
client = TelegramClient('session_name', API_ID, API_HASH)
# Define the states for user interaction
states = {}
@client.on(events.NewMessage(pattern='/start'))
async def start_handler(event):
await event.reply("Welcome to AudioFusion Bot! Send me an audio file, and I'll apply effects for you.")
buttons = [
[Button.inline('Slowed', b'slowed'), Button.inline('8D', b'8d')],
[Button.inline('Reverb', b'reverb'), Button.inline('Reverse', b'reverse')],
[Button.inline('Volume', b'volume'), Button.inline('Speedup', b'speedup')],
[Button.inline('Preview', b'preview')],
[Button.inline('Send', b'send')],
]
@client.on(events.NewMessage(pattern='/buttons'))
async def buttons_handler(event):
user_id = event.sender_id
# Download the audio file and store it in the user's state
reply_message = await event.get_reply_message()
if not reply_message or not reply_message.file:
await event.reply("Please reply to an audio file.")
return
audio_file = BytesIO()
await event.client.download_media(reply_message, audio_file)
audio_file.seek(0)
# Store the audio file in the user's state
states[user_id] = audio_file
await client.send_file(event.chat_id, file="image.jpg", caption="Preview the current modification:", buttons=buttons)
@client.on(events.CallbackQuery(pattern=b'(slowed|8d|reverb|reverse|trim|volume|speedup)'))
async def audio_effect_handler(event):
user_id = event.sender_id
if user_id not in states or not states[user_id]:
await event.answer("No audio file found. Please use /buttons command to upload an audio file.")
return
# Retrieve the audio file from the user's state
audio_file = states[user_id]
query = event.pattern_match.group(1).decode("UTF-8")
try:
sound = Fusion.loadSound(audio_file)
if query == 'slowed':
modified_sound = Fusion.effectSlowed(sound, 0.82)
elif query == 'speedup':
modified_sound = Fusion.effectSlowed(sound, 1.2)
elif query == '8d':
modified_sound = Fusion.effect8D(sound)
elif query == 'reverb':
modified_sound = Fusion.effectReverb(sound)
elif query == 'reverse':
modified_sound = sound.reverse()
else:
return await event.answer("INvalid for now...")
# Update the user's state with the modified sound
states[user_id] = modified_sound
await event.answer("Effect applied. Click /send to receive the modified audio file.")
except Fusion.InvalidMusicFileError as e:
await event.reply(str(e))
except Exception as e:
await event.reply(f"An error occurred: {str(e)}")
@client.on(events.CallbackQuery(pattern=b'preview'))
async def preview_handler(event):
user_id = event.sender_id
if user_id in states and states[user_id]:
# Send the current modification for preview
output_file_name = f"{user_id}_preview"
output_file = Fusion.saveSound(states[user_id], output_file_name)
await event.edit("`Uploading...`", buttons=buttons)
# Edit the message and send the audio file in the edited message
await event.edit(file=output_file, text="`Preview the current modification:`", buttons=buttons)
# Clean up - remove the saved preview audio file
os.remove(output_file)
else:
await event.answer("No modified audio file found. Please apply an effect first.")
@client.on(events.CallbackQuery(pattern=b'send'))
async def send_handler(event):
user_id = event.sender_id
if user_id in states and states[user_id]:
# Send the modified sound file
output_file_name = f"{user_id}_modified_audio"
output_file = Fusion.saveSound(states[user_id], output_file_name)
await event.reply(file=output_file)
# Clean up - remove the user's state and the saved audio file
del states[user_id]
os.remove(output_file)
await event.delete()
else:
await event.answer("No modified audio file found. Please apply an effect first.")
async def initiation():
await client.send_message(-1001662130485, "**Hugging is Running.**", buttons=[(Button.url("Execal", "https://t.me/execal"),)],)
if __name__ == '__main__':
client.start(bot_token=BOT_TOKEN)
client.loop.run_until_complete(initiation())
threading.Thread(target=iface.launch).start() #(share=False)
print("Bot started succefully")
client.run_until_disconnected()
|