Spaces:
Running
Running
import numpy as np | |
import soundfile as sf | |
import subprocess | |
import tempfile | |
import os | |
import gradio as gr | |
from scipy import signal | |
# ========== Processing Functions ========== | |
def convert_to_wav_float(input_file): | |
""" | |
Convert any input audio to 32-bit float WAV to preserve full dynamic range. | |
""" | |
temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
temp_wav.close() | |
# PCM 32-bit little endian preserves float dynamic without clipping | |
subprocess.run([ | |
"ffmpeg", "-y", "-i", input_file, | |
"-c:a", "pcm_f32le", "-f", "wav", temp_wav.name | |
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) | |
return temp_wav.name | |
def apply_reverb_wet_only(audio, samplerate, reverb_args): | |
""" | |
Apply wet-only reverb using SoX to a single channel with custom reverb args. | |
""" | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tin, \ | |
tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tout: | |
sf.write(tin.name, audio, samplerate, subtype='FLOAT') | |
subprocess.run( | |
["sox", tin.name, tout.name, "reverb", "-w"] + reverb_args, | |
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True | |
) | |
wet, _ = sf.read(tout.name, dtype='float32') | |
os.unlink(tin.name) | |
os.unlink(tout.name) | |
return wet | |
def sox_filter(audio, samplerate, filter_type, cutoff): | |
""" | |
Apply highpass or lowpass filter via SoX. | |
filter_type: 'highpass' or 'lowpass'; cutoff in Hz. | |
""" | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tin, \ | |
tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tout: | |
sf.write(tin.name, audio, samplerate, subtype='FLOAT') | |
subprocess.run( | |
["sox", tin.name, tout.name, filter_type, str(cutoff)], | |
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True | |
) | |
out, _ = sf.read(tout.name, dtype='float32') | |
os.unlink(tin.name) | |
os.unlink(tout.name) | |
return out | |
def extract_phantom_center(input_file, rdf=0.99999): | |
""" | |
Returns FL (front left without centre), FR, and FC (phantom centre). | |
""" | |
wav = convert_to_wav_float(input_file) | |
data, fs = sf.read(wav, dtype='float32') | |
os.unlink(wav) | |
if data.ndim != 2 or data.shape[1] != 2: | |
raise gr.Error("Input must be stereo 2-channel") | |
L, R = data[:,0], data[:,1] | |
M = (L + R) / 2 | |
nperseg = fs | |
noverlap = nperseg // 2 | |
_, _, ZL = signal.stft(L, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
_, _, ZR = signal.stft(R, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
_, _, ZM = signal.stft(M, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
Zc = np.minimum(np.abs(ZL), np.abs(ZR)) * np.exp(1j * np.angle(ZM)) | |
Zl_res = ZL - Zc * rdf | |
Zr_res = ZR - Zc * rdf | |
_, FL = signal.istft(Zl_res, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
_, FR = signal.istft(Zr_res, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
_, FC = signal.istft(Zc, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
return fs, FL[:len(L)], FR[:len(R)], FC[:len(M)] | |
def create_5_1_surround(input_file, preset="music"): | |
print("Starting Normal Processing") | |
p = gr.Progress() | |
# Preset-based parameters | |
# Reverberance (50%) HF-damping (50%) room-scale (100%) stereo-depth (100%) pre-delay (0ms) wet-gain (0dB) | |
if preset == "music": | |
hp_cutoff = 120 | |
lfe_cutoff = 120 | |
reverb_args = ['70', '40', '100', '95', '10', '-2'] | |
elif preset == "speech": | |
hp_cutoff = 120 | |
lfe_cutoff = 120 | |
reverb_args = ['50', '99', '50', '70', '0', '0'] | |
elif preset == "open": | |
hp_cutoff = 120 | |
lfe_cutoff = 120 | |
reverb_args = ['20', '50', '100', '100', '100', '0'] | |
else: | |
raise gr.Error(f"Unknown preset: {preset}") | |
p((1,7),"Extracting Centre")# 1. Extract FL/FR/phantom centre | |
fs, FL, FR, FC = extract_phantom_center(input_file) | |
p((2,7),"Getting File")# 2. Get stereo original for reverb | |
wav = convert_to_wav_float(input_file) | |
stereo, _ = sf.read(wav, dtype='float32') | |
os.unlink(wav) | |
L_orig, R_orig = stereo[:, 0], stereo[:, 1] | |
p((3,7),"Reverb For Rear")# 3. Wet-only reverb with chosen settings | |
SL = apply_reverb_wet_only(L_orig, fs, reverb_args) | |
SR = apply_reverb_wet_only(R_orig, fs, reverb_args) | |
p((4,7),"Highpassing")# 4. Highpass filter everything except LFE | |
FL_hp = sox_filter(FL, fs, 'highpass', hp_cutoff) | |
FR_hp = sox_filter(FR, fs, 'highpass', hp_cutoff) | |
FC_hp = sox_filter(FC, fs, 'highpass', hp_cutoff) | |
SL_hp = sox_filter(SL, fs, 'highpass', hp_cutoff) | |
SR_hp = sox_filter(SR, fs, 'highpass', hp_cutoff) | |
p((5,7),"Extracting LFE")# 5. Lowpass for LFE | |
bass_sum = .5 * (L_orig + R_orig) | |
LFE = sox_filter(bass_sum, fs, 'lowpass', lfe_cutoff) | |
p((6,7),"Stacking")# 6. Stack and pad | |
channels = [FL_hp, FR_hp, FC_hp, LFE, SL_hp, SR_hp] | |
length = max(len(ch) for ch in channels) | |
def pad(x): return np.pad(x, (0, length - len(x))) | |
multich = np.column_stack([pad(ch) for ch in channels]) | |
p((7,7),"Encoding")# 7. Write WAV and encode to OGG | |
out_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
sf.write(out_wav.name, multich, fs, subtype='FLOAT') | |
out_wav.close() | |
out_ogg = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False) | |
out_ogg.close() | |
subprocess.run([ | |
"ffmpeg", "-y", "-i", out_wav.name, | |
"-c:a", "libvorbis", "-ac", "6", "-channel_layout", "5.1", out_ogg.name | |
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) | |
os.unlink(out_wav.name) | |
return out_ogg.name | |
import mimetypes | |
import requests | |
import time | |
def send_mvsep_audio_job( | |
api_token: str, | |
audio_bytes: bytes, | |
filename: str, | |
sep_type: int = 34, | |
output_format: int = 2, | |
addopt1: str = None, | |
addopt2: str = None, | |
poll_interval_sec: int = 5 | |
): | |
""" | |
Send audio to MVSep for source separation and wait for the result. | |
Args: | |
api_token (str): Your API token. | |
audio_bytes (bytes): Audio data (any format). | |
filename (str): Original filename, used for extension/MIME type. | |
sep_type (int): Separation type (e.g., 34 for karaoke). | |
output_format (int): Output format (e.g., 2 for FLAC). | |
addopt1 (str): Optional extra parameter 1. | |
addopt2 (str): Optional extra parameter 2. | |
poll_interval_sec (int): How often to check job status. | |
Returns: | |
dict: Completed result data from mvsep.com (including file URLs). | |
""" | |
# Step 1: Determine MIME type | |
mime_type, _ = mimetypes.guess_type(filename) | |
if not mime_type: | |
mime_type = "application/octet-stream" # fallback | |
# Step 2: Prepare request | |
url = "https://mvsep.com/api/separation/create" | |
files = { | |
'audiofile': (filename, audio_bytes, mime_type) | |
} | |
data = { | |
'api_token': api_token, | |
'sep_type': str(sep_type), | |
'output_format': str(output_format) | |
} | |
if addopt1: | |
data['add_opt1'] = str(addopt1) | |
if addopt2: | |
data['add_opt2'] = str(addopt2) | |
# Step 3: Send creation request | |
response = requests.post(url, files=files, data=data) | |
response.raise_for_status() | |
json_resp = response.json() | |
if not json_resp.get('success'): | |
error_msg = json_resp.get('data', {}).get('message', 'Unknown error') | |
print(json_resp) | |
raise gr.Error(f"API error: {error_msg}") | |
job_hash = json_resp['data']['hash'] | |
print(f"Job submitted successfully. Hash: {job_hash}") | |
# Step 4: Poll until job is done | |
status_url = "https://mvsep.com/api/separation/get" | |
while True: | |
poll_resp = requests.get(status_url, params={'hash': job_hash}) | |
poll_resp.raise_for_status() | |
poll_data = poll_resp.json() | |
status = poll_data.get('status') | |
print(f"Job status: {status}") | |
if status == 'done': | |
return poll_data.get('data', {}) | |
elif status in ('failed', 'not_found'): | |
raise gr.Error(f"Job failed or not found: {poll_data.get('data', {}).get('message', '')}") | |
time.sleep(poll_interval_sec) | |
# Download WAV and preserve sample rate, with optional resampling to target_fs | |
def download_wav(url, target_fs=None): | |
r = requests.get(url) | |
r.raise_for_status() | |
temp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
temp.write(r.content) | |
temp.close() | |
audio, sr = sf.read(temp.name, dtype='float32') | |
os.unlink(temp.name) | |
if target_fs and sr != target_fs: | |
# resample if needed | |
num_samples = int(len(audio) * target_fs / sr) | |
audio = signal.resample(audio, num_samples) | |
sr = target_fs | |
return audio, sr | |
# Smart mode workflow | |
def smart_mode_process(input_file, api_key, multi_singer=False): | |
print("Starting Smartmode") | |
p = gr.Progress() | |
import shutil | |
if not api_key: | |
raise gr.Error("An MVSep API Key Is Required For This. Get your key <a href=\"https://mvsep.com/user-api\">Here</a>. it's Free!") | |
# Load original | |
wav = convert_to_wav_float(input_file) | |
data, fs = sf.read(wav, dtype='float32') | |
os.unlink(wav) | |
p((0, 8), "Loading File") | |
if data.ndim != 2 or data.shape[1] != 2: | |
raise gr.Error("Expected stereo input (2 channels), got something else.") | |
L, R = data[:, 0], data[:, 1] | |
# Step 1: LFE from lowpass | |
p((1, 8), "Processing LFE") | |
bass = sox_filter(0.5 * (L + R), fs, 'lowpass', 120) | |
# Step 2: Highpass for crowd extraction | |
p((2, 8), "Extracting Crowd") | |
hp_left = sox_filter(L, fs, 'highpass', 120) | |
hp_right = sox_filter(R, fs, 'highpass', 120) | |
hp_stereo = np.column_stack([hp_left, hp_right]) | |
music_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) | |
sf.write(music_buf.name, hp_stereo, fs, format='FLAC', subtype='PCM_16') | |
music_buf.close() | |
crowd_resp = send_mvsep_audio_job( | |
api_key, open(music_buf.name, 'rb').read(), os.path.basename(music_buf.name), | |
sep_type=34, output_format=2, addopt1=0 | |
) | |
os.unlink(music_buf.name) | |
crowd, _ = download_wav(crowd_resp['files'][0]['url'], target_fs=fs) | |
other_after_crowd, _ = download_wav(crowd_resp['files'][1]['url'], target_fs=fs) | |
# Step 3: Speech, music, SFX separation from 'other_after_crowd' | |
p((3, 8), "Separating Speech, Music, and SFX") | |
demucs_input_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) | |
sf.write(demucs_input_buf.name, other_after_crowd, fs, format='FLAC', subtype='PCM_16') | |
demucs_input_buf.close() | |
demucs_resp = send_mvsep_audio_job( | |
api_key, open(demucs_input_buf.name, 'rb').read(), os.path.basename(demucs_input_buf.name), | |
sep_type=24, output_format=2 | |
) | |
os.unlink(demucs_input_buf.name) | |
dialog, _ = download_wav(demucs_resp['files'][0]['url'], target_fs=fs) | |
sfx, _ = download_wav(demucs_resp['files'][2]['url'], target_fs=fs) | |
music, _ = download_wav(demucs_resp['files'][1]['url'], target_fs=fs) | |
# Step 4: Apply Reverb to the 'music' stem | |
p((4, 8), "Applying Reverb") | |
reverb_args = ['20', '50', '100', '100', '100', '0'] # open preset | |
reverb_L = apply_reverb_wet_only(music[:, 0], fs, reverb_args) | |
reverb_R = apply_reverb_wet_only(music[:, 1], fs, reverb_args) | |
reverb = np.column_stack([reverb_L, reverb_R]) | |
# Step 5: Vocal Extraction from music | |
p((5, 8), "Extracting Vocals") | |
music_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) | |
sf.write(music_buf.name, music, fs, format='FLAC', subtype='PCM_16') | |
music_buf.close() | |
karaoke_resp = send_mvsep_audio_job( | |
api_key, open(music_buf.name, 'rb').read(), os.path.basename(music_buf.name), | |
sep_type=49, output_format=2, addopt1=3, addopt2=1 | |
) | |
os.unlink(music_buf.name) | |
vocals_full, _ = download_wav(karaoke_resp['files'][0]['url'], target_fs=fs) | |
vocals_lead, _ = download_wav(karaoke_resp['files'][1]['url'], target_fs=fs) | |
vocals_back, _ = download_wav(karaoke_resp['files'][2]['url'], target_fs=fs) | |
instr, _ = download_wav(karaoke_resp['files'][3]['url'], target_fs=fs) | |
# Step 6: Phantom center on vocals (lead or full) | |
p((6, 8), "Phantom Center for Lead Vocals") | |
vl_buf = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
sf.write(vl_buf.name, vocals_full if multi_singer else vocals_lead, fs, subtype='FLOAT') | |
vl_buf.close() | |
_, FL_vl, FR_vl, FC_vl = extract_phantom_center(vl_buf.name) | |
os.unlink(vl_buf.name) | |
# Mix dialog into the centre channel | |
FC_vl += dialog[:, 0] if dialog.ndim == 2 else dialog | |
# Step 7: Mapping and stacking | |
p((7, 8), "Mapping Channels and Encoding") | |
def match_len(x, length): return np.pad(x, (0, length - len(x))) | |
lens = [len(FL_vl), len(FR_vl), len(FC_vl), len(bass), len(sfx), crowd.shape[0], vocals_back.shape[0], instr.shape[0], len(reverb)] | |
length = max(lens) | |
# FL and FR: Lead vocals + SFX + instruments | |
out_L = match_len(FL_vl, length) + match_len(sfx[:, 0], length) + match_len(instr[:, 0], length) | |
out_R = match_len(FR_vl, length) + match_len(sfx[:, 1], length) + match_len(instr[:, 1], length) | |
out_C = match_len(FC_vl, length) | |
out_LFE = match_len(bass, length) | |
# SL/SR: Use reverb output | |
SL = match_len(reverb[:, 0], length) | |
SR = match_len(reverb[:, 1], length) | |
if not multi_singer: | |
SL += match_len(vocals_back[:, 0], length) | |
SR += match_len(vocals_back[:, 1], length) | |
SL += match_len(crowd[:, 0], length) | |
SR += match_len(crowd[:, 1], length) | |
multich = np.column_stack([out_L, out_R, out_C, out_LFE, SL, SR]) | |
out_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
sf.write(out_wav.name, multich, fs, subtype='FLOAT') | |
out_wav.close() | |
out_ogg = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False) | |
subprocess.run([ | |
"ffmpeg", "-y", "-i", out_wav.name, | |
"-c:a", "libvorbis", "-ac", "6", "-channel_layout", "5.1", out_ogg.name | |
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) | |
os.unlink(out_wav.name) | |
return out_ogg.name | |
# ========== Gradio UI ========== | |
with gr.Blocks(title="Stereo to 5.1 Surround") as demo: | |
gr.Markdown("# 🎧 Stereo to 5.1 Converter") | |
gr.Markdown("Convert A Stereo File Into Surround") | |
inp = gr.Audio(label="Upload stereo audio", type="filepath") | |
smart_mode = gr.Checkbox(label="Enable Smart Mode", value=False) | |
# Normal mode elements | |
preset = gr.Dropdown( | |
label="Select Preset", | |
choices=["music", "speech", "open"], | |
value="music" | |
) | |
btn = gr.Button("Convert to 5.1 OGG") | |
out = gr.File(label="Download 5.1 OGG") | |
# Smart mode section | |
with gr.Column(visible=False) as smart_section: | |
api_key = gr.Textbox(label="MVSep API Key", type="password") | |
multi_singer = gr.Checkbox(label="Multi Singer Mode", value=False) | |
smart_btn = gr.Button("Convert") | |
smart_out = gr.File(label="Output") | |
# Logic for toggling sections | |
def toggle_mode(enabled): | |
return ( | |
gr.update(visible=not enabled), # preset | |
gr.update(visible=not enabled), # btn | |
gr.update(visible=not enabled), # out | |
gr.update(visible=enabled) # smart_section | |
) | |
smart_mode.change( | |
fn=toggle_mode, | |
inputs=[smart_mode], | |
outputs=[preset, btn, out, smart_section] | |
) | |
# Button functions | |
btn.click(fn=create_5_1_surround, inputs=[inp, preset], outputs=[out], concurrency_limit=10) | |
smart_btn.click(fn=smart_mode_process, inputs=[inp, api_key, multi_singer], outputs=[smart_out], concurrency_limit=20) | |
if __name__ == "__main__": | |
demo.launch(show_error=True) |