import numpy as np import soundfile as sf import subprocess import tempfile import os import gradio as gr from scipy import signal # ========== Processing Functions ========== def convert_to_wav_float(input_file): """ Convert any input audio to 32-bit float WAV to preserve full dynamic range. """ temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) temp_wav.close() # PCM 32-bit little endian preserves float dynamic without clipping subprocess.run([ "ffmpeg", "-y", "-i", input_file, "-c:a", "pcm_f32le", "-f", "wav", temp_wav.name ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) return temp_wav.name def apply_reverb_wet_only(audio, samplerate, reverb_args): """ Apply wet-only reverb using SoX to a single channel with custom reverb args. """ with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tin, \ tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tout: sf.write(tin.name, audio, samplerate, subtype='FLOAT') subprocess.run( ["sox", tin.name, tout.name, "reverb", "-w"] + reverb_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) wet, _ = sf.read(tout.name, dtype='float32') os.unlink(tin.name) os.unlink(tout.name) return wet def sox_filter(audio, samplerate, filter_type, cutoff): """ Apply highpass or lowpass filter via SoX. filter_type: 'highpass' or 'lowpass'; cutoff in Hz. """ with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tin, \ tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tout: sf.write(tin.name, audio, samplerate, subtype='FLOAT') subprocess.run( ["sox", tin.name, tout.name, filter_type, str(cutoff)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) out, _ = sf.read(tout.name, dtype='float32') os.unlink(tin.name) os.unlink(tout.name) return out def extract_phantom_center(input_file, rdf=0.99999): """ Returns FL (front left without centre), FR, and FC (phantom centre). """ wav = convert_to_wav_float(input_file) data, fs = sf.read(wav, dtype='float32') os.unlink(wav) if data.ndim != 2 or data.shape[1] != 2: raise gr.Error("Input must be stereo 2-channel") L, R = data[:,0], data[:,1] M = (L + R) / 2 nperseg = fs noverlap = nperseg // 2 _, _, ZL = signal.stft(L, fs=fs, nperseg=nperseg, noverlap=noverlap) _, _, ZR = signal.stft(R, fs=fs, nperseg=nperseg, noverlap=noverlap) _, _, ZM = signal.stft(M, fs=fs, nperseg=nperseg, noverlap=noverlap) Zc = np.minimum(np.abs(ZL), np.abs(ZR)) * np.exp(1j * np.angle(ZM)) Zl_res = ZL - Zc * rdf Zr_res = ZR - Zc * rdf _, FL = signal.istft(Zl_res, fs=fs, nperseg=nperseg, noverlap=noverlap) _, FR = signal.istft(Zr_res, fs=fs, nperseg=nperseg, noverlap=noverlap) _, FC = signal.istft(Zc, fs=fs, nperseg=nperseg, noverlap=noverlap) return fs, FL[:len(L)], FR[:len(R)], FC[:len(M)] def create_5_1_surround(input_file, preset="music"): print("Starting Normal Processing") p = gr.Progress() # Preset-based parameters # Reverberance (50%) HF-damping (50%) room-scale (100%) stereo-depth (100%) pre-delay (0ms) wet-gain (0dB) if preset == "music": hp_cutoff = 120 lfe_cutoff = 120 reverb_args = ['70', '40', '100', '95', '10', '-2'] elif preset == "speech": hp_cutoff = 120 lfe_cutoff = 120 reverb_args = ['50', '99', '50', '70', '0', '0'] elif preset == "open": hp_cutoff = 120 lfe_cutoff = 120 reverb_args = ['20', '50', '100', '100', '100', '0'] else: raise gr.Error(f"Unknown preset: {preset}") p((1,7),"Extracting Centre")# 1. Extract FL/FR/phantom centre fs, FL, FR, FC = extract_phantom_center(input_file) p((2,7),"Getting File")# 2. Get stereo original for reverb wav = convert_to_wav_float(input_file) stereo, _ = sf.read(wav, dtype='float32') os.unlink(wav) L_orig, R_orig = stereo[:, 0], stereo[:, 1] p((3,7),"Reverb For Rear")# 3. Wet-only reverb with chosen settings SL = apply_reverb_wet_only(L_orig, fs, reverb_args) SR = apply_reverb_wet_only(R_orig, fs, reverb_args) p((4,7),"Highpassing")# 4. Highpass filter everything except LFE FL_hp = sox_filter(FL, fs, 'highpass', hp_cutoff) FR_hp = sox_filter(FR, fs, 'highpass', hp_cutoff) FC_hp = sox_filter(FC, fs, 'highpass', hp_cutoff) SL_hp = sox_filter(SL, fs, 'highpass', hp_cutoff) SR_hp = sox_filter(SR, fs, 'highpass', hp_cutoff) p((5,7),"Extracting LFE")# 5. Lowpass for LFE bass_sum = .5 * (L_orig + R_orig) LFE = sox_filter(bass_sum, fs, 'lowpass', lfe_cutoff) p((6,7),"Stacking")# 6. Stack and pad channels = [FL_hp, FR_hp, FC_hp, LFE, SL_hp, SR_hp] length = max(len(ch) for ch in channels) def pad(x): return np.pad(x, (0, length - len(x))) multich = np.column_stack([pad(ch) for ch in channels]) p((7,7),"Encoding")# 7. Write WAV and encode to OGG out_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) sf.write(out_wav.name, multich, fs, subtype='FLOAT') out_wav.close() out_ogg = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False) out_ogg.close() subprocess.run([ "ffmpeg", "-y", "-i", out_wav.name, "-c:a", "libvorbis", "-ac", "6", "-channel_layout", "5.1", out_ogg.name ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) os.unlink(out_wav.name) return out_ogg.name import mimetypes import requests import time def send_mvsep_audio_job( api_token: str, audio_bytes: bytes, filename: str, sep_type: int = 34, output_format: int = 2, addopt1: str = None, addopt2: str = None, poll_interval_sec: int = 5 ): """ Send audio to MVSep for source separation and wait for the result. Args: api_token (str): Your API token. audio_bytes (bytes): Audio data (any format). filename (str): Original filename, used for extension/MIME type. sep_type (int): Separation type (e.g., 34 for karaoke). output_format (int): Output format (e.g., 2 for FLAC). addopt1 (str): Optional extra parameter 1. addopt2 (str): Optional extra parameter 2. poll_interval_sec (int): How often to check job status. Returns: dict: Completed result data from mvsep.com (including file URLs). """ # Step 1: Determine MIME type mime_type, _ = mimetypes.guess_type(filename) if not mime_type: mime_type = "application/octet-stream" # fallback # Step 2: Prepare request url = "https://mvsep.com/api/separation/create" files = { 'audiofile': (filename, audio_bytes, mime_type) } data = { 'api_token': api_token, 'sep_type': str(sep_type), 'output_format': str(output_format) } if addopt1: data['add_opt1'] = str(addopt1) if addopt2: data['add_opt2'] = str(addopt2) # Step 3: Send creation request response = requests.post(url, files=files, data=data) response.raise_for_status() json_resp = response.json() if not json_resp.get('success'): error_msg = json_resp.get('data', {}).get('message', 'Unknown error') print(json_resp) raise gr.Error(f"API error: {error_msg}") job_hash = json_resp['data']['hash'] print(f"Job submitted successfully. Hash: {job_hash}") # Step 4: Poll until job is done status_url = "https://mvsep.com/api/separation/get" while True: poll_resp = requests.get(status_url, params={'hash': job_hash}) poll_resp.raise_for_status() poll_data = poll_resp.json() status = poll_data.get('status') print(f"Job status: {status}") if status == 'done': return poll_data.get('data', {}) elif status in ('failed', 'not_found'): raise gr.Error(f"Job failed or not found: {poll_data.get('data', {}).get('message', '')}") time.sleep(poll_interval_sec) # Download WAV and preserve sample rate, with optional resampling to target_fs def download_wav(url, target_fs=None): r = requests.get(url) r.raise_for_status() temp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) temp.write(r.content) temp.close() audio, sr = sf.read(temp.name, dtype='float32') os.unlink(temp.name) if target_fs and sr != target_fs: # resample if needed num_samples = int(len(audio) * target_fs / sr) audio = signal.resample(audio, num_samples) sr = target_fs return audio, sr # Smart mode workflow def smart_mode_process(input_file, api_key, multi_singer=False): print("Starting Smartmode") p = gr.Progress() import shutil if not api_key: raise gr.Error("An MVSep API Key Is Required For This. Get your key Here. it's Free!") # Load original wav = convert_to_wav_float(input_file) data, fs = sf.read(wav, dtype='float32') os.unlink(wav) p((0, 8), "Loading File") if data.ndim != 2 or data.shape[1] != 2: raise gr.Error("Expected stereo input (2 channels), got something else.") L, R = data[:, 0], data[:, 1] # Step 1: LFE from lowpass p((1, 8), "Processing LFE") bass = sox_filter(0.5 * (L + R), fs, 'lowpass', 120) # Step 2: Highpass for crowd extraction p((2, 8), "Extracting Crowd") hp_left = sox_filter(L, fs, 'highpass', 120) hp_right = sox_filter(R, fs, 'highpass', 120) hp_stereo = np.column_stack([hp_left, hp_right]) music_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) sf.write(music_buf.name, hp_stereo, fs, format='FLAC', subtype='PCM_16') music_buf.close() crowd_resp = send_mvsep_audio_job( api_key, open(music_buf.name, 'rb').read(), os.path.basename(music_buf.name), sep_type=34, output_format=2, addopt1=0 ) os.unlink(music_buf.name) crowd, _ = download_wav(crowd_resp['files'][0]['url'], target_fs=fs) other_after_crowd, _ = download_wav(crowd_resp['files'][1]['url'], target_fs=fs) # Step 3: Speech, music, SFX separation from 'other_after_crowd' p((3, 8), "Separating Speech, Music, and SFX") demucs_input_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) sf.write(demucs_input_buf.name, other_after_crowd, fs, format='FLAC', subtype='PCM_16') demucs_input_buf.close() demucs_resp = send_mvsep_audio_job( api_key, open(demucs_input_buf.name, 'rb').read(), os.path.basename(demucs_input_buf.name), sep_type=24, output_format=2 ) os.unlink(demucs_input_buf.name) dialog, _ = download_wav(demucs_resp['files'][0]['url'], target_fs=fs) sfx, _ = download_wav(demucs_resp['files'][2]['url'], target_fs=fs) music, _ = download_wav(demucs_resp['files'][1]['url'], target_fs=fs) # Step 4: Apply Reverb to the 'music' stem p((4, 8), "Applying Reverb") reverb_args = ['20', '50', '100', '100', '100', '0'] # open preset reverb_L = apply_reverb_wet_only(music[:, 0], fs, reverb_args) reverb_R = apply_reverb_wet_only(music[:, 1], fs, reverb_args) reverb = np.column_stack([reverb_L, reverb_R]) # Step 5: Vocal Extraction from music p((5, 8), "Extracting Vocals") music_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) sf.write(music_buf.name, music, fs, format='FLAC', subtype='PCM_16') music_buf.close() karaoke_resp = send_mvsep_audio_job( api_key, open(music_buf.name, 'rb').read(), os.path.basename(music_buf.name), sep_type=49, output_format=2, addopt1=3, addopt2=1 ) os.unlink(music_buf.name) vocals_full, _ = download_wav(karaoke_resp['files'][0]['url'], target_fs=fs) vocals_lead, _ = download_wav(karaoke_resp['files'][1]['url'], target_fs=fs) vocals_back, _ = download_wav(karaoke_resp['files'][2]['url'], target_fs=fs) instr, _ = download_wav(karaoke_resp['files'][3]['url'], target_fs=fs) # Step 6: Phantom center on vocals (lead or full) p((6, 8), "Phantom Center for Lead Vocals") vl_buf = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) sf.write(vl_buf.name, vocals_full if multi_singer else vocals_lead, fs, subtype='FLOAT') vl_buf.close() _, FL_vl, FR_vl, FC_vl = extract_phantom_center(vl_buf.name) os.unlink(vl_buf.name) # Mix dialog into the centre channel FC_vl += dialog[:, 0] if dialog.ndim == 2 else dialog # Step 7: Mapping and stacking p((7, 8), "Mapping Channels and Encoding") def match_len(x, length): return np.pad(x, (0, length - len(x))) lens = [len(FL_vl), len(FR_vl), len(FC_vl), len(bass), len(sfx), crowd.shape[0], vocals_back.shape[0], instr.shape[0], len(reverb)] length = max(lens) # FL and FR: Lead vocals + SFX + instruments out_L = match_len(FL_vl, length) + match_len(sfx[:, 0], length) + match_len(instr[:, 0], length) out_R = match_len(FR_vl, length) + match_len(sfx[:, 1], length) + match_len(instr[:, 1], length) out_C = match_len(FC_vl, length) out_LFE = match_len(bass, length) # SL/SR: Use reverb output SL = match_len(reverb[:, 0], length) SR = match_len(reverb[:, 1], length) if not multi_singer: SL += match_len(vocals_back[:, 0], length) SR += match_len(vocals_back[:, 1], length) SL += match_len(crowd[:, 0], length) SR += match_len(crowd[:, 1], length) multich = np.column_stack([out_L, out_R, out_C, out_LFE, SL, SR]) out_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) sf.write(out_wav.name, multich, fs, subtype='FLOAT') out_wav.close() out_ogg = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False) subprocess.run([ "ffmpeg", "-y", "-i", out_wav.name, "-c:a", "libvorbis", "-ac", "6", "-channel_layout", "5.1", out_ogg.name ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) os.unlink(out_wav.name) return out_ogg.name # ========== Gradio UI ========== with gr.Blocks(title="Stereo to 5.1 Surround") as demo: gr.Markdown("# 🎧 Stereo to 5.1 Converter") gr.Markdown("Convert A Stereo File Into Surround") inp = gr.Audio(label="Upload stereo audio", type="filepath") smart_mode = gr.Checkbox(label="Enable Smart Mode", value=False) # Normal mode elements preset = gr.Dropdown( label="Select Preset", choices=["music", "speech", "open"], value="music" ) btn = gr.Button("Convert to 5.1 OGG") out = gr.File(label="Download 5.1 OGG") # Smart mode section with gr.Column(visible=False) as smart_section: api_key = gr.Textbox(label="MVSep API Key", type="password") multi_singer = gr.Checkbox(label="Multi Singer Mode", value=False) smart_btn = gr.Button("Convert") smart_out = gr.File(label="Output") # Logic for toggling sections def toggle_mode(enabled): return ( gr.update(visible=not enabled), # preset gr.update(visible=not enabled), # btn gr.update(visible=not enabled), # out gr.update(visible=enabled) # smart_section ) smart_mode.change( fn=toggle_mode, inputs=[smart_mode], outputs=[preset, btn, out, smart_section] ) # Button functions btn.click(fn=create_5_1_surround, inputs=[inp, preset], outputs=[out], concurrency_limit=10) smart_btn.click(fn=smart_mode_process, inputs=[inp, api_key, multi_singer], outputs=[smart_out], concurrency_limit=20) if __name__ == "__main__": demo.launch(show_error=True)