import argparse import gc import hashlib import json import os import shlex import subprocess from contextlib import suppress from urllib.parse import urlparse, parse_qs import gradio as gr import librosa import numpy as np import soundfile as sf import sox import yt_dlp from pedalboard import Pedalboard, Reverb, Compressor, HighpassFilter from pedalboard.io import AudioFile from pydub import AudioSegment from audio_separator.separator import Separator from rvc import Config, load_hubert, get_vc, rvc_infer # Base directories BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) mdxnet_models_dir = os.path.join(BASE_DIR, 'mdxnet_models') rvc_models_dir = os.path.join(BASE_DIR, 'rvc_models') output_dir = os.path.join(BASE_DIR, 'song_output') def get_youtube_video_id(url, ignore_playlist=True): """ Extract the YouTube video ID from various URL formats. Examples: http://youtu.be/SA2iWivDJiE http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu http://www.youtube.com/embed/SA2iWivDJiE http://www.youtube.com/v/SA2iWivDJiE?version=3&hl=en_US """ parsed_url = urlparse(url) hostname = parsed_url.hostname or '' path = parsed_url.path if hostname.lower() == 'youtu.be': return path.lstrip('/') if hostname.lower() in {'www.youtube.com', 'youtube.com', 'music.youtube.com'}: if not ignore_playlist: with suppress(KeyError): return parse_qs(parsed_url.query)['list'][0] if parsed_url.path == '/watch': return parse_qs(parsed_url.query).get('v', [None])[0] if parsed_url.path.startswith('/watch/'): return parsed_url.path.split('/')[1] if parsed_url.path.startswith('/embed/'): return parsed_url.path.split('/')[2] if parsed_url.path.startswith('/v/'): return parsed_url.path.split('/')[2] return None def yt_download(link): """ Download the audio from a YouTube link as an mp3 file. """ ydl_opts = { 'format': 'bestaudio', 'outtmpl': '%(title)s', 'nocheckcertificate': True, 'ignoreerrors': True, 'no_warnings': True, 'quiet': True, 'extractaudio': True, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3' }], } with yt_dlp.YoutubeDL(ydl_opts) as ydl: result = ydl.extract_info(link, download=True) download_path = ydl.prepare_filename(result, outtmpl='%(title)s.mp3') return download_path def display_progress(message, percent, is_webui, progress=None): """ Display progress either via the provided progress callback or by printing. """ if is_webui and progress is not None: progress(percent, desc=message) else: print(message) def raise_exception(error_msg, is_webui): """ Raise an exception. If running in a web UI, use gr.Error. """ if is_webui: raise gr.Error(error_msg) else: raise Exception(error_msg) def get_rvc_model(voice_model, is_webui): """ Search the specified RVC model directory for the model (.pth) and index (.index) files. """ rvc_model_filename, rvc_index_filename = None, None model_dir = os.path.join(rvc_models_dir, voice_model) if not os.path.exists(model_dir): raise_exception(f'Model directory {model_dir} does not exist.', is_webui) for file in os.listdir(model_dir): ext = os.path.splitext(file)[1] if ext == '.pth': rvc_model_filename = file if ext == '.index': rvc_index_filename = file if rvc_model_filename is None: error_msg = f'No model file exists in {model_dir}.' raise_exception(error_msg, is_webui) model_path = os.path.join(model_dir, rvc_model_filename) index_path = os.path.join(model_dir, rvc_index_filename) if rvc_index_filename else '' return model_path, index_path def separation_uvr(filename, output): """ Run the separation steps using different pre-trained models. Returns a tuple of four file paths: - vocals_no_reverb: The vocals after initial de-echo/de-reverb (used as intermediate vocals) - instrumental_path: The separated instrumental audio - main_vocals_dereverb: The lead vocals after final de-reverb processing - backup_vocals: The backup vocals extracted in the final stage """ separator = Separator(output_dir=output) base_name = os.path.splitext(os.path.basename(filename))[0] instrumental_path = os.path.join(output, f'{base_name}_Instrumental.wav') initial_vocals = os.path.join(output, f'{base_name}_Vocals.wav') vocals_no_reverb = os.path.join(output, f'{base_name}_Vocals (No Reverb).wav') vocals_reverb = os.path.join(output, f'{base_name}_Vocals (Reverb).wav') main_vocals_dereverb = os.path.join(output, f'{base_name}_Vocals_Main_DeReverb.wav') backup_vocals = os.path.join(output, f'{base_name}_Vocals_Backup.wav') separator.load_model(model_filename='model_bs_roformer_ep_317_sdr_12.9755.ckpt') voc_inst = separator.separate(filename) os.rename(os.path.join(output, voc_inst[0]), instrumental_path) os.rename(os.path.join(output, voc_inst[1]), initial_vocals) separator.load_model(model_filename='UVR-DeEcho-DeReverb.pth') voc_no_reverb = separator.separate(initial_vocals) os.rename(os.path.join(output, voc_no_reverb[0]), vocals_no_reverb) os.rename(os.path.join(output, voc_no_reverb[1]), vocals_reverb) separator.load_model(model_filename='mel_band_roformer_karaoke_aufr33_viperx_sdr_10.1956.ckpt') voc_split = separator.separate(vocals_no_reverb) os.rename(os.path.join(output, voc_split[0]), backup_vocals) os.rename(os.path.join(output, voc_split[1]), main_vocals_dereverb) if os.path.exists(vocals_reverb): os.remove(vocals_reverb) return vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals def get_audio_paths(song_dir): """ Search the given directory for expected audio files. Returns: orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path """ orig_song_path = None instrumentals_path = None main_vocals_dereverb_path = None backup_vocals_path = None for file in os.listdir(song_dir): if file.endswith('_Instrumental.wav'): instrumentals_path = os.path.join(song_dir, file) orig_song_path = instrumentals_path.replace('_Instrumental', '') elif file.endswith('_Vocals_Main_DeReverb.wav'): main_vocals_dereverb_path = os.path.join(song_dir, file) elif file.endswith('_Vocals_Backup.wav'): backup_vocals_path = os.path.join(song_dir, file) return orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path def convert_to_stereo(audio_path): """ Convert the given audio file to stereo (2 channels) if it is mono. """ wave, sr = librosa.load(audio_path, mono=False, sr=44100) if wave.ndim == 1: stereo_path = f'{os.path.splitext(audio_path)[0]}_stereo.wav' command = shlex.split(f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 2 -f wav "{stereo_path}"') subprocess.run(command, check=True) return stereo_path return audio_path def pitch_shift(audio_path, pitch_change): """ Shift the pitch of the audio by the specified amount. """ output_path = f'{os.path.splitext(audio_path)[0]}_p{pitch_change}.wav' if not os.path.exists(output_path): y, sr = sf.read(audio_path) tfm = sox.Transformer() tfm.pitch(pitch_change) y_shifted = tfm.build_array(input_array=y, sample_rate_in=sr) sf.write(output_path, y_shifted, sr) return output_path def get_hash(filepath): """ Calculate a short BLAKE2b hash for the given file. """ with open(filepath, 'rb') as f: file_hash = hashlib.blake2b() while chunk := f.read(8192): file_hash.update(chunk) return file_hash.hexdigest()[:11] def preprocess_song(song_input, song_id, is_webui, input_type, progress): """ Preprocess the input song: - Download if YouTube URL. - Convert to stereo. - Separate vocals and instrumentals. Returns a tuple with six values matching the expected unpacking in the pipeline. """ if input_type == 'yt': display_progress('[~] Downloading song...', 0, is_webui, progress) song_link = song_input.split('&')[0] orig_song_path = yt_download(song_link) elif input_type == 'local': orig_song_path = song_input else: orig_song_path = None song_output_dir = os.path.join(output_dir, song_id) if not os.path.exists(song_output_dir): os.makedirs(song_output_dir) orig_song_path = convert_to_stereo(orig_song_path) display_progress('[~] Separating Vocals from Instrumental...', 0.1, is_webui, progress) vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals = separation_uvr(orig_song_path, song_output_dir) return orig_song_path, vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals, main_vocals_dereverb def voice_change(voice_model, vocals_path, output_path, pitch_change, f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui): """ Convert the input vocals using the specified RVC model. """ rvc_model_path, rvc_index_path = get_rvc_model(voice_model, is_webui) device = 'cuda:0' config = Config(device, True) hubert_model = load_hubert(embedder_model="contentvec", embedder_model_custom=None) cpt, version, net_g, tgt_sr, vc = get_vc(device, config.is_half, config, rvc_model_path) rvc_infer( rvc_index_path, index_rate, vocals_path, output_path, pitch_change, f0_method, cpt, version, net_g, filter_radius, tgt_sr, rms_mix_rate, protect, crepe_hop_length, vc, hubert_model ) del hubert_model, cpt gc.collect() def add_audio_effects(audio_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping): """ Apply a chain of audio effects (highpass, compression, reverb) to the input audio. """ output_path = f'{os.path.splitext(audio_path)[0]}_mixed.wav' board = Pedalboard([ HighpassFilter(), Compressor(ratio=4, threshold_db=-15), Reverb(room_size=reverb_rm_size, dry_level=reverb_dry, wet_level=reverb_wet, damping=reverb_damping) ]) with AudioFile(audio_path) as f: with AudioFile(output_path, 'w', f.samplerate, f.num_channels) as o: while f.tell() < f.frames: chunk = f.read(int(f.samplerate)) effected = board(chunk, f.samplerate, reset=False) o.write(effected) return output_path def combine_audio(audio_paths, output_path, main_gain, backup_gain, inst_gain, output_format): """ Combine main vocals, backup vocals, and instrumental audio into a final mix. """ main_vocal_audio = AudioSegment.from_wav(audio_paths[0]) - 4 + main_gain backup_vocal_audio = AudioSegment.from_wav(audio_paths[1]) - 6 + backup_gain instrumental_audio = AudioSegment.from_wav(audio_paths[2]) - 7 + inst_gain final_audio = main_vocal_audio.overlay(backup_vocal_audio).overlay(instrumental_audio) final_audio.export(output_path, format=output_format) def song_cover_pipeline(song_input, voice_model, pitch_change, keep_files, is_webui=0, main_gain=0, backup_gain=0, inst_gain=0, index_rate=0.5, filter_radius=3, rms_mix_rate=0.25, f0_method='rmvpe', crepe_hop_length=128, protect=0.33, pitch_change_all=0, reverb_rm_size=0.15, reverb_wet=0.2, reverb_dry=0.8, reverb_damping=0.7, output_format='mp3', progress=gr.Progress()): """ Main pipeline that orchestrates the AI cover song generation. """ try: if not song_input or not voice_model: raise_exception('Ensure that the song input field and voice model field is filled.', is_webui) display_progress('[~] Starting AI Cover Generation Pipeline...', 0, is_webui, progress) if urlparse(song_input).scheme == 'https': input_type = 'yt' song_id = get_youtube_video_id(song_input) if song_id is None: raise_exception('Invalid YouTube url.', is_webui) else: input_type = 'local' song_input = song_input.strip('\"') if os.path.exists(song_input): song_id = get_hash(song_input) else: raise_exception(f'{song_input} does not exist.', is_webui) song_dir = os.path.join(output_dir, song_id) if not os.path.exists(song_dir): os.makedirs(song_dir) (orig_song_path, vocals_path, instrumentals_path, main_vocals_path, backup_vocals_path, main_vocals_dereverb_path) = preprocess_song( song_input, song_id, is_webui, input_type, progress ) else: vocals_path, main_vocals_path = None, None paths = get_audio_paths(song_dir) if any(path is None for path in paths) or keep_files: (orig_song_path, vocals_path, instrumentals_path, main_vocals_path, backup_vocals_path, main_vocals_dereverb_path) = preprocess_song( song_input, song_id, is_webui, input_type, progress ) else: orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path = paths main_vocals_path = main_vocals_dereverb_path pitch_change += pitch_change_all base_song_name = os.path.splitext(os.path.basename(orig_song_path))[0] algo_suffix = f"_{crepe_hop_length}" if f0_method == "mangio-crepe" else "" ai_vocals_path = os.path.join( song_dir, f'{base_song_name}_lead_{voice_model}_p{pitch_change}_i{index_rate}_fr{filter_radius}_' f'rms{rms_mix_rate}_pro{protect}_{f0_method}{algo_suffix}.wav' ) ai_backing_path = os.path.join( song_dir, f'{base_song_name}_backing_{voice_model}_p{pitch_change}_i{index_rate}_fr{filter_radius}_' f'rms{rms_mix_rate}_pro{protect}_{f0_method}{algo_suffix}.wav' ) ai_cover_path = os.path.join(song_dir, f'{base_song_name} ({voice_model} Ver).{output_format}') ai_cover_backing_path = os.path.join(song_dir, f'{base_song_name} ({voice_model} Ver With Backing).{output_format}') if not os.path.exists(ai_vocals_path): display_progress('[~] Converting lead voice using RVC...', 0.5, is_webui, progress) voice_change(voice_model, main_vocals_dereverb_path, ai_vocals_path, pitch_change, f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui) display_progress('[~] Converting backing voice using RVC...', 0.65, is_webui, progress) voice_change(voice_model, backup_vocals_path, ai_backing_path, pitch_change, f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui) display_progress('[~] Applying audio effects to Vocals...', 0.8, is_webui, progress) ai_vocals_mixed_path = add_audio_effects(ai_vocals_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping) ai_backing_mixed_path = add_audio_effects(ai_backing_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping) if pitch_change_all != 0: display_progress('[~] Applying overall pitch change', 0.85, is_webui, progress) instrumentals_path = pitch_shift(instrumentals_path, pitch_change_all) backup_vocals_path = pitch_shift(backup_vocals_path, pitch_change_all) display_progress('[~] Combining AI Vocals and Instrumentals...', 0.9, is_webui, progress) combine_audio([ai_vocals_mixed_path, backup_vocals_path, instrumentals_path], ai_cover_path, main_gain, backup_gain, inst_gain, output_format) combine_audio([ai_vocals_mixed_path, ai_backing_mixed_path, instrumentals_path], ai_cover_backing_path, main_gain, backup_gain, inst_gain, output_format) if not keep_files: display_progress('[~] Removing intermediate audio files...', 0.95, is_webui, progress) intermediate_files = [vocals_path, main_vocals_path, ai_vocals_mixed_path, ai_backing_mixed_path] if pitch_change_all != 0: intermediate_files += [instrumentals_path, backup_vocals_path] for file in intermediate_files: if file and os.path.exists(file): os.remove(file) return ai_cover_path, ai_cover_backing_path except Exception as e: raise_exception(str(e), is_webui) if __name__ == '__main__': parser = argparse.ArgumentParser( description='AICoverGen: Mod.', add_help=True ) parser.add_argument('-i', '--song-input', type=str, required=True, help='Link to a YouTube video or the filepath to a local mp3/wav file to create an AI cover of') parser.add_argument('-dir', '--rvc-dirname', type=str, required=True, help='Name of the folder in the rvc_models directory containing the RVC model file and optional index file to use') parser.add_argument('-p', '--pitch-change', type=int, required=True, help='Change the pitch of AI Vocals only. Generally, use 1 for male to female and -1 for vice-versa. (Octaves)') parser.add_argument('-k', '--keep-files', action=argparse.BooleanOptionalAction, help='Whether to keep all intermediate audio files generated in the song_output/id directory, e.g. Isolated Vocals/Instrumentals') parser.add_argument('-ir', '--index-rate', type=float, default=0.5, help='A decimal number e.g. 0.5, used to reduce/resolve the timbre leakage problem. If set to 1, more biased towards the timbre quality of the training dataset') parser.add_argument('-fr', '--filter-radius', type=int, default=3, help='A number between 0 and 7. If >=3: apply median filtering to the harvested pitch results. The value represents the filter radius and can reduce breathiness.') parser.add_argument('-rms', '--rms-mix-rate', type=float, default=0.25, help="A decimal number e.g. 0.25. Control how much to use the original vocal's loudness (0) or a fixed loudness (1).") parser.add_argument('-palgo', '--pitch-detection-algo', type=str, default='rmvpe', help='Best option is rmvpe (clarity in vocals), then mangio-crepe (smoother vocals).') parser.add_argument('-hop', '--crepe-hop-length', type=int, default=128, help='If pitch detection algo is mangio-crepe, controls how often it checks for pitch changes in milliseconds. Recommended: 128.') parser.add_argument('-pro', '--protect', type=float, default=0.33, help='A decimal number e.g. 0.33. Protect voiceless consonants and breath sounds to prevent artifacts such as tearing in electronic music.') parser.add_argument('-mv', '--main-vol', type=int, default=0, help='Volume change for AI main vocals in decibels. Use -3 to decrease by 3 dB and 3 to increase by 3 dB') parser.add_argument('-bv', '--backup-vol', type=int, default=0, help='Volume change for backup vocals in decibels') parser.add_argument('-iv', '--inst-vol', type=int, default=0, help='Volume change for instrumentals in decibels') parser.add_argument('-pall', '--pitch-change-all', type=int, default=0, help='Change the pitch/key of vocals and instrumentals. Changing this slightly reduces sound quality') parser.add_argument('-rsize', '--reverb-size', type=float, default=0.15, help='Reverb room size between 0 and 1') parser.add_argument('-rwet', '--reverb-wetness', type=float, default=0.2, help='Reverb wet level between 0 and 1') parser.add_argument('-rdry', '--reverb-dryness', type=float, default=0.8, help='Reverb dry level between 0 and 1') parser.add_argument('-rdamp', '--reverb-damping', type=float, default=0.7, help='Reverb damping between 0 and 1') parser.add_argument('-oformat', '--output-format', type=str, default='mp3', help='Output format of audio file. mp3 for smaller file size, wav for best quality') args = parser.parse_args() rvc_dir = os.path.join(rvc_models_dir, args.rvc_dirname) if not os.path.exists(rvc_dir): raise Exception(f'The folder {rvc_dir} does not exist.') cover_path, cover_with_backing = song_cover_pipeline( args.song_input, args.rvc_dirname, args.pitch_change, args.keep_files, main_gain=args.main_vol, backup_gain=args.backup_vol, inst_gain=args.inst_vol, index_rate=args.index_rate, filter_radius=args.filter_radius, rms_mix_rate=args.rms_mix_rate, f0_method=args.pitch_detection_algo, crepe_hop_length=args.crepe_hop_length, protect=args.protect, pitch_change_all=args.pitch_change_all, reverb_rm_size=args.reverb_size, reverb_wet=args.reverb_wetness, reverb_dry=args.reverb_dryness, reverb_damping=args.reverb_damping, output_format=args.output_format ) print(f'[+] Cover generated at {cover_path}')