surroundify / app.py
ziqiangao's picture
Fix safegaurd against mono or already surround files
40a628b verified
import numpy as np
import soundfile as sf
import subprocess
import tempfile
import os
import gradio as gr
from scipy import signal
# ========== Processing Functions ==========
def convert_to_wav_float(input_file):
"""
Convert any input audio to 32-bit float WAV to preserve full dynamic range.
"""
temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
temp_wav.close()
# PCM 32-bit little endian preserves float dynamic without clipping
subprocess.run([
"ffmpeg", "-y", "-i", input_file,
"-c:a", "pcm_f32le", "-f", "wav", temp_wav.name
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return temp_wav.name
def apply_reverb_wet_only(audio, samplerate, reverb_args):
"""
Apply wet-only reverb using SoX to a single channel with custom reverb args.
"""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tin, \
tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tout:
sf.write(tin.name, audio, samplerate, subtype='FLOAT')
subprocess.run(
["sox", tin.name, tout.name, "reverb", "-w"] + reverb_args,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
wet, _ = sf.read(tout.name, dtype='float32')
os.unlink(tin.name)
os.unlink(tout.name)
return wet
def sox_filter(audio, samplerate, filter_type, cutoff):
"""
Apply highpass or lowpass filter via SoX.
filter_type: 'highpass' or 'lowpass'; cutoff in Hz.
"""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tin, \
tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tout:
sf.write(tin.name, audio, samplerate, subtype='FLOAT')
subprocess.run(
["sox", tin.name, tout.name, filter_type, str(cutoff)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
out, _ = sf.read(tout.name, dtype='float32')
os.unlink(tin.name)
os.unlink(tout.name)
return out
def extract_phantom_center(input_file, rdf=0.99999):
"""
Returns FL (front left without centre), FR, and FC (phantom centre).
"""
wav = convert_to_wav_float(input_file)
data, fs = sf.read(wav, dtype='float32')
os.unlink(wav)
if data.ndim != 2 or data.shape[1] != 2:
raise gr.Error("Input must be stereo 2-channel")
L, R = data[:,0], data[:,1]
M = (L + R) / 2
nperseg = fs
noverlap = nperseg // 2
_, _, ZL = signal.stft(L, fs=fs, nperseg=nperseg, noverlap=noverlap)
_, _, ZR = signal.stft(R, fs=fs, nperseg=nperseg, noverlap=noverlap)
_, _, ZM = signal.stft(M, fs=fs, nperseg=nperseg, noverlap=noverlap)
Zc = np.minimum(np.abs(ZL), np.abs(ZR)) * np.exp(1j * np.angle(ZM))
Zl_res = ZL - Zc * rdf
Zr_res = ZR - Zc * rdf
_, FL = signal.istft(Zl_res, fs=fs, nperseg=nperseg, noverlap=noverlap)
_, FR = signal.istft(Zr_res, fs=fs, nperseg=nperseg, noverlap=noverlap)
_, FC = signal.istft(Zc, fs=fs, nperseg=nperseg, noverlap=noverlap)
return fs, FL[:len(L)], FR[:len(R)], FC[:len(M)]
def create_5_1_surround(input_file, preset="music"):
print("Starting Normal Processing")
p = gr.Progress()
# Preset-based parameters
# Reverberance (50%) HF-damping (50%) room-scale (100%) stereo-depth (100%) pre-delay (0ms) wet-gain (0dB)
if preset == "music":
hp_cutoff = 120
lfe_cutoff = 120
reverb_args = ['70', '40', '100', '95', '10', '-2']
elif preset == "speech":
hp_cutoff = 120
lfe_cutoff = 120
reverb_args = ['50', '99', '50', '70', '0', '0']
elif preset == "open":
hp_cutoff = 120
lfe_cutoff = 120
reverb_args = ['20', '50', '100', '100', '100', '0']
else:
raise gr.Error(f"Unknown preset: {preset}")
p((1,7),"Extracting Centre")# 1. Extract FL/FR/phantom centre
fs, FL, FR, FC = extract_phantom_center(input_file)
p((2,7),"Getting File")# 2. Get stereo original for reverb
wav = convert_to_wav_float(input_file)
stereo, _ = sf.read(wav, dtype='float32')
os.unlink(wav)
L_orig, R_orig = stereo[:, 0], stereo[:, 1]
p((3,7),"Reverb For Rear")# 3. Wet-only reverb with chosen settings
SL = apply_reverb_wet_only(L_orig, fs, reverb_args)
SR = apply_reverb_wet_only(R_orig, fs, reverb_args)
p((4,7),"Highpassing")# 4. Highpass filter everything except LFE
FL_hp = sox_filter(FL, fs, 'highpass', hp_cutoff)
FR_hp = sox_filter(FR, fs, 'highpass', hp_cutoff)
FC_hp = sox_filter(FC, fs, 'highpass', hp_cutoff)
SL_hp = sox_filter(SL, fs, 'highpass', hp_cutoff)
SR_hp = sox_filter(SR, fs, 'highpass', hp_cutoff)
p((5,7),"Extracting LFE")# 5. Lowpass for LFE
bass_sum = .5 * (L_orig + R_orig)
LFE = sox_filter(bass_sum, fs, 'lowpass', lfe_cutoff)
p((6,7),"Stacking")# 6. Stack and pad
channels = [FL_hp, FR_hp, FC_hp, LFE, SL_hp, SR_hp]
length = max(len(ch) for ch in channels)
def pad(x): return np.pad(x, (0, length - len(x)))
multich = np.column_stack([pad(ch) for ch in channels])
p((7,7),"Encoding")# 7. Write WAV and encode to OGG
out_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
sf.write(out_wav.name, multich, fs, subtype='FLOAT')
out_wav.close()
out_ogg = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False)
out_ogg.close()
subprocess.run([
"ffmpeg", "-y", "-i", out_wav.name,
"-c:a", "libvorbis", "-ac", "6", "-channel_layout", "5.1", out_ogg.name
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
os.unlink(out_wav.name)
return out_ogg.name
import mimetypes
import requests
import time
def send_mvsep_audio_job(
api_token: str,
audio_bytes: bytes,
filename: str,
sep_type: int = 34,
output_format: int = 2,
addopt1: str = None,
addopt2: str = None,
poll_interval_sec: int = 5
):
"""
Send audio to MVSep for source separation and wait for the result.
Args:
api_token (str): Your API token.
audio_bytes (bytes): Audio data (any format).
filename (str): Original filename, used for extension/MIME type.
sep_type (int): Separation type (e.g., 34 for karaoke).
output_format (int): Output format (e.g., 2 for FLAC).
addopt1 (str): Optional extra parameter 1.
addopt2 (str): Optional extra parameter 2.
poll_interval_sec (int): How often to check job status.
Returns:
dict: Completed result data from mvsep.com (including file URLs).
"""
# Step 1: Determine MIME type
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = "application/octet-stream" # fallback
# Step 2: Prepare request
url = "https://mvsep.com/api/separation/create"
files = {
'audiofile': (filename, audio_bytes, mime_type)
}
data = {
'api_token': api_token,
'sep_type': str(sep_type),
'output_format': str(output_format)
}
if addopt1:
data['add_opt1'] = str(addopt1)
if addopt2:
data['add_opt2'] = str(addopt2)
# Step 3: Send creation request
response = requests.post(url, files=files, data=data)
response.raise_for_status()
json_resp = response.json()
if not json_resp.get('success'):
error_msg = json_resp.get('data', {}).get('message', 'Unknown error')
print(json_resp)
raise gr.Error(f"API error: {error_msg}")
job_hash = json_resp['data']['hash']
print(f"Job submitted successfully. Hash: {job_hash}")
# Step 4: Poll until job is done
status_url = "https://mvsep.com/api/separation/get"
while True:
poll_resp = requests.get(status_url, params={'hash': job_hash})
poll_resp.raise_for_status()
poll_data = poll_resp.json()
status = poll_data.get('status')
print(f"Job status: {status}")
if status == 'done':
return poll_data.get('data', {})
elif status in ('failed', 'not_found'):
raise gr.Error(f"Job failed or not found: {poll_data.get('data', {}).get('message', '')}")
time.sleep(poll_interval_sec)
# Download WAV and preserve sample rate, with optional resampling to target_fs
def download_wav(url, target_fs=None):
r = requests.get(url)
r.raise_for_status()
temp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
temp.write(r.content)
temp.close()
audio, sr = sf.read(temp.name, dtype='float32')
os.unlink(temp.name)
if target_fs and sr != target_fs:
# resample if needed
num_samples = int(len(audio) * target_fs / sr)
audio = signal.resample(audio, num_samples)
sr = target_fs
return audio, sr
# Smart mode workflow
def smart_mode_process(input_file, api_key, multi_singer=False):
print("Starting Smartmode")
p = gr.Progress()
import shutil
if not api_key:
raise gr.Error("An MVSep API Key Is Required For This. Get your key <a href=\"https://mvsep.com/user-api\">Here</a>. it's Free!")
# Load original
wav = convert_to_wav_float(input_file)
data, fs = sf.read(wav, dtype='float32')
os.unlink(wav)
p((0, 8), "Loading File")
if data.ndim != 2 or data.shape[1] != 2:
raise gr.Error("Expected stereo input (2 channels), got something else.")
L, R = data[:, 0], data[:, 1]
# Step 1: LFE from lowpass
p((1, 8), "Processing LFE")
bass = sox_filter(0.5 * (L + R), fs, 'lowpass', 120)
# Step 2: Highpass for crowd extraction
p((2, 8), "Extracting Crowd")
hp_left = sox_filter(L, fs, 'highpass', 120)
hp_right = sox_filter(R, fs, 'highpass', 120)
hp_stereo = np.column_stack([hp_left, hp_right])
music_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False)
sf.write(music_buf.name, hp_stereo, fs, format='FLAC', subtype='PCM_16')
music_buf.close()
crowd_resp = send_mvsep_audio_job(
api_key, open(music_buf.name, 'rb').read(), os.path.basename(music_buf.name),
sep_type=34, output_format=2, addopt1=0
)
os.unlink(music_buf.name)
crowd, _ = download_wav(crowd_resp['files'][0]['url'], target_fs=fs)
other_after_crowd, _ = download_wav(crowd_resp['files'][1]['url'], target_fs=fs)
# Step 3: Speech, music, SFX separation from 'other_after_crowd'
p((3, 8), "Separating Speech, Music, and SFX")
demucs_input_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False)
sf.write(demucs_input_buf.name, other_after_crowd, fs, format='FLAC', subtype='PCM_16')
demucs_input_buf.close()
demucs_resp = send_mvsep_audio_job(
api_key, open(demucs_input_buf.name, 'rb').read(), os.path.basename(demucs_input_buf.name),
sep_type=24, output_format=2
)
os.unlink(demucs_input_buf.name)
dialog, _ = download_wav(demucs_resp['files'][0]['url'], target_fs=fs)
sfx, _ = download_wav(demucs_resp['files'][2]['url'], target_fs=fs)
music, _ = download_wav(demucs_resp['files'][1]['url'], target_fs=fs)
# Step 4: Apply Reverb to the 'music' stem
p((4, 8), "Applying Reverb")
reverb_args = ['20', '50', '100', '100', '100', '0'] # open preset
reverb_L = apply_reverb_wet_only(music[:, 0], fs, reverb_args)
reverb_R = apply_reverb_wet_only(music[:, 1], fs, reverb_args)
reverb = np.column_stack([reverb_L, reverb_R])
# Step 5: Vocal Extraction from music
p((5, 8), "Extracting Vocals")
music_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False)
sf.write(music_buf.name, music, fs, format='FLAC', subtype='PCM_16')
music_buf.close()
karaoke_resp = send_mvsep_audio_job(
api_key, open(music_buf.name, 'rb').read(), os.path.basename(music_buf.name),
sep_type=49, output_format=2, addopt1=3, addopt2=1
)
os.unlink(music_buf.name)
vocals_full, _ = download_wav(karaoke_resp['files'][0]['url'], target_fs=fs)
vocals_lead, _ = download_wav(karaoke_resp['files'][1]['url'], target_fs=fs)
vocals_back, _ = download_wav(karaoke_resp['files'][2]['url'], target_fs=fs)
instr, _ = download_wav(karaoke_resp['files'][3]['url'], target_fs=fs)
# Step 6: Phantom center on vocals (lead or full)
p((6, 8), "Phantom Center for Lead Vocals")
vl_buf = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
sf.write(vl_buf.name, vocals_full if multi_singer else vocals_lead, fs, subtype='FLOAT')
vl_buf.close()
_, FL_vl, FR_vl, FC_vl = extract_phantom_center(vl_buf.name)
os.unlink(vl_buf.name)
# Mix dialog into the centre channel
FC_vl += dialog[:, 0] if dialog.ndim == 2 else dialog
# Step 7: Mapping and stacking
p((7, 8), "Mapping Channels and Encoding")
def match_len(x, length): return np.pad(x, (0, length - len(x)))
lens = [len(FL_vl), len(FR_vl), len(FC_vl), len(bass), len(sfx), crowd.shape[0], vocals_back.shape[0], instr.shape[0], len(reverb)]
length = max(lens)
# FL and FR: Lead vocals + SFX + instruments
out_L = match_len(FL_vl, length) + match_len(sfx[:, 0], length) + match_len(instr[:, 0], length)
out_R = match_len(FR_vl, length) + match_len(sfx[:, 1], length) + match_len(instr[:, 1], length)
out_C = match_len(FC_vl, length)
out_LFE = match_len(bass, length)
# SL/SR: Use reverb output
SL = match_len(reverb[:, 0], length)
SR = match_len(reverb[:, 1], length)
if not multi_singer:
SL += match_len(vocals_back[:, 0], length)
SR += match_len(vocals_back[:, 1], length)
SL += match_len(crowd[:, 0], length)
SR += match_len(crowd[:, 1], length)
multich = np.column_stack([out_L, out_R, out_C, out_LFE, SL, SR])
out_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
sf.write(out_wav.name, multich, fs, subtype='FLOAT')
out_wav.close()
out_ogg = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False)
subprocess.run([
"ffmpeg", "-y", "-i", out_wav.name,
"-c:a", "libvorbis", "-ac", "6", "-channel_layout", "5.1", out_ogg.name
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
os.unlink(out_wav.name)
return out_ogg.name
# ========== Gradio UI ==========
with gr.Blocks(title="Stereo to 5.1 Surround") as demo:
gr.Markdown("# 🎧 Stereo to 5.1 Converter")
gr.Markdown("Convert A Stereo File Into Surround")
inp = gr.Audio(label="Upload stereo audio", type="filepath")
smart_mode = gr.Checkbox(label="Enable Smart Mode", value=False)
# Normal mode elements
preset = gr.Dropdown(
label="Select Preset",
choices=["music", "speech", "open"],
value="music"
)
btn = gr.Button("Convert to 5.1 OGG")
out = gr.File(label="Download 5.1 OGG")
# Smart mode section
with gr.Column(visible=False) as smart_section:
api_key = gr.Textbox(label="MVSep API Key", type="password")
multi_singer = gr.Checkbox(label="Multi Singer Mode", value=False)
smart_btn = gr.Button("Convert")
smart_out = gr.File(label="Output")
# Logic for toggling sections
def toggle_mode(enabled):
return (
gr.update(visible=not enabled), # preset
gr.update(visible=not enabled), # btn
gr.update(visible=not enabled), # out
gr.update(visible=enabled) # smart_section
)
smart_mode.change(
fn=toggle_mode,
inputs=[smart_mode],
outputs=[preset, btn, out, smart_section]
)
# Button functions
btn.click(fn=create_5_1_surround, inputs=[inp, preset], outputs=[out], concurrency_limit=10)
smart_btn.click(fn=smart_mode_process, inputs=[inp, api_key, multi_singer], outputs=[smart_out], concurrency_limit=20)
if __name__ == "__main__":
demo.launch(show_error=True)