|
import argparse |
|
import gc |
|
import hashlib |
|
import json |
|
import os |
|
import shlex |
|
import subprocess |
|
from contextlib import suppress |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
import gradio as gr |
|
import librosa |
|
import numpy as np |
|
import soundfile as sf |
|
import sox |
|
import yt_dlp |
|
from pedalboard import Pedalboard, Reverb, Compressor, HighpassFilter |
|
from pedalboard.io import AudioFile |
|
from pydub import AudioSegment |
|
from audio_separator.separator import Separator |
|
from rvc import Config, load_hubert, get_vc, rvc_infer |
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
mdxnet_models_dir = os.path.join(BASE_DIR, 'mdxnet_models') |
|
rvc_models_dir = os.path.join(BASE_DIR, 'rvc_models') |
|
output_dir = os.path.join(BASE_DIR, 'song_output') |
|
|
|
|
|
def get_youtube_video_id(url, ignore_playlist=True): |
|
""" |
|
Extract the YouTube video ID from various URL formats. |
|
|
|
Examples: |
|
http://youtu.be/SA2iWivDJiE |
|
http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu |
|
http://www.youtube.com/embed/SA2iWivDJiE |
|
http://www.youtube.com/v/SA2iWivDJiE?version=3&hl=en_US |
|
""" |
|
parsed_url = urlparse(url) |
|
hostname = parsed_url.hostname or '' |
|
path = parsed_url.path |
|
|
|
if hostname.lower() == 'youtu.be': |
|
return path.lstrip('/') |
|
|
|
if hostname.lower() in {'www.youtube.com', 'youtube.com', 'music.youtube.com'}: |
|
if not ignore_playlist: |
|
with suppress(KeyError): |
|
return parse_qs(parsed_url.query)['list'][0] |
|
if parsed_url.path == '/watch': |
|
return parse_qs(parsed_url.query).get('v', [None])[0] |
|
if parsed_url.path.startswith('/watch/'): |
|
return parsed_url.path.split('/')[1] |
|
if parsed_url.path.startswith('/embed/'): |
|
return parsed_url.path.split('/')[2] |
|
if parsed_url.path.startswith('/v/'): |
|
return parsed_url.path.split('/')[2] |
|
|
|
return None |
|
|
|
|
|
def yt_download(link): |
|
""" |
|
Download the audio from a YouTube link as an mp3 file. |
|
""" |
|
ydl_opts = { |
|
'format': 'bestaudio', |
|
'outtmpl': '%(title)s', |
|
'nocheckcertificate': True, |
|
'ignoreerrors': True, |
|
'no_warnings': True, |
|
'quiet': True, |
|
'extractaudio': True, |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'mp3' |
|
}], |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
result = ydl.extract_info(link, download=True) |
|
download_path = ydl.prepare_filename(result, outtmpl='%(title)s.mp3') |
|
return download_path |
|
|
|
|
|
def display_progress(message, percent, is_webui, progress=None): |
|
""" |
|
Display progress either via the provided progress callback or by printing. |
|
""" |
|
if is_webui and progress is not None: |
|
progress(percent, desc=message) |
|
else: |
|
print(message) |
|
|
|
|
|
def raise_exception(error_msg, is_webui): |
|
""" |
|
Raise an exception. If running in a web UI, use gr.Error. |
|
""" |
|
if is_webui: |
|
raise gr.Error(error_msg) |
|
else: |
|
raise Exception(error_msg) |
|
|
|
|
|
def get_rvc_model(voice_model, is_webui): |
|
""" |
|
Search the specified RVC model directory for the model (.pth) and index (.index) files. |
|
""" |
|
rvc_model_filename, rvc_index_filename = None, None |
|
model_dir = os.path.join(rvc_models_dir, voice_model) |
|
if not os.path.exists(model_dir): |
|
raise_exception(f'Model directory {model_dir} does not exist.', is_webui) |
|
for file in os.listdir(model_dir): |
|
ext = os.path.splitext(file)[1] |
|
if ext == '.pth': |
|
rvc_model_filename = file |
|
if ext == '.index': |
|
rvc_index_filename = file |
|
|
|
if rvc_model_filename is None: |
|
error_msg = f'No model file exists in {model_dir}.' |
|
raise_exception(error_msg, is_webui) |
|
|
|
model_path = os.path.join(model_dir, rvc_model_filename) |
|
index_path = os.path.join(model_dir, rvc_index_filename) if rvc_index_filename else '' |
|
return model_path, index_path |
|
|
|
|
|
def separation_uvr(filename, output): |
|
""" |
|
Run the separation steps using different pre-trained models. |
|
Returns a tuple of four file paths: |
|
- vocals_no_reverb: The vocals after initial de-echo/de-reverb (used as intermediate vocals) |
|
- instrumental_path: The separated instrumental audio |
|
- main_vocals_dereverb: The lead vocals after final de-reverb processing |
|
- backup_vocals: The backup vocals extracted in the final stage |
|
""" |
|
separator = Separator(output_dir=output) |
|
base_name = os.path.splitext(os.path.basename(filename))[0] |
|
|
|
instrumental_path = os.path.join(output, f'{base_name}_Instrumental.wav') |
|
initial_vocals = os.path.join(output, f'{base_name}_Vocals.wav') |
|
vocals_no_reverb = os.path.join(output, f'{base_name}_Vocals (No Reverb).wav') |
|
vocals_reverb = os.path.join(output, f'{base_name}_Vocals (Reverb).wav') |
|
main_vocals_dereverb = os.path.join(output, f'{base_name}_Vocals_Main_DeReverb.wav') |
|
backup_vocals = os.path.join(output, f'{base_name}_Vocals_Backup.wav') |
|
|
|
separator.load_model(model_filename='model_bs_roformer_ep_317_sdr_12.9755.ckpt') |
|
voc_inst = separator.separate(filename) |
|
os.rename(os.path.join(output, voc_inst[0]), instrumental_path) |
|
os.rename(os.path.join(output, voc_inst[1]), initial_vocals) |
|
|
|
separator.load_model(model_filename='UVR-DeEcho-DeReverb.pth') |
|
voc_no_reverb = separator.separate(initial_vocals) |
|
os.rename(os.path.join(output, voc_no_reverb[0]), vocals_no_reverb) |
|
os.rename(os.path.join(output, voc_no_reverb[1]), vocals_reverb) |
|
|
|
separator.load_model(model_filename='mel_band_roformer_karaoke_aufr33_viperx_sdr_10.1956.ckpt') |
|
voc_split = separator.separate(vocals_no_reverb) |
|
os.rename(os.path.join(output, voc_split[0]), backup_vocals) |
|
os.rename(os.path.join(output, voc_split[1]), main_vocals_dereverb) |
|
|
|
if os.path.exists(vocals_reverb): |
|
os.remove(vocals_reverb) |
|
|
|
return vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals |
|
|
|
|
|
def get_audio_paths(song_dir): |
|
""" |
|
Search the given directory for expected audio files. |
|
Returns: |
|
orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path |
|
""" |
|
orig_song_path = None |
|
instrumentals_path = None |
|
main_vocals_dereverb_path = None |
|
backup_vocals_path = None |
|
|
|
for file in os.listdir(song_dir): |
|
if file.endswith('_Instrumental.wav'): |
|
instrumentals_path = os.path.join(song_dir, file) |
|
orig_song_path = instrumentals_path.replace('_Instrumental', '') |
|
elif file.endswith('_Vocals_Main_DeReverb.wav'): |
|
main_vocals_dereverb_path = os.path.join(song_dir, file) |
|
elif file.endswith('_Vocals_Backup.wav'): |
|
backup_vocals_path = os.path.join(song_dir, file) |
|
|
|
return orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path |
|
|
|
|
|
def convert_to_stereo(audio_path): |
|
""" |
|
Convert the given audio file to stereo (2 channels) if it is mono. |
|
""" |
|
wave, sr = librosa.load(audio_path, mono=False, sr=44100) |
|
if wave.ndim == 1: |
|
stereo_path = f'{os.path.splitext(audio_path)[0]}_stereo.wav' |
|
command = shlex.split(f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 2 -f wav "{stereo_path}"') |
|
subprocess.run(command, check=True) |
|
return stereo_path |
|
return audio_path |
|
|
|
|
|
def pitch_shift(audio_path, pitch_change): |
|
""" |
|
Shift the pitch of the audio by the specified amount. |
|
""" |
|
output_path = f'{os.path.splitext(audio_path)[0]}_p{pitch_change}.wav' |
|
if not os.path.exists(output_path): |
|
y, sr = sf.read(audio_path) |
|
tfm = sox.Transformer() |
|
tfm.pitch(pitch_change) |
|
y_shifted = tfm.build_array(input_array=y, sample_rate_in=sr) |
|
sf.write(output_path, y_shifted, sr) |
|
return output_path |
|
|
|
|
|
def get_hash(filepath): |
|
""" |
|
Calculate a short BLAKE2b hash for the given file. |
|
""" |
|
with open(filepath, 'rb') as f: |
|
file_hash = hashlib.blake2b() |
|
while chunk := f.read(8192): |
|
file_hash.update(chunk) |
|
return file_hash.hexdigest()[:11] |
|
|
|
|
|
def preprocess_song(song_input, song_id, is_webui, input_type, progress): |
|
""" |
|
Preprocess the input song: |
|
- Download if YouTube URL. |
|
- Convert to stereo. |
|
- Separate vocals and instrumentals. |
|
Returns a tuple with six values matching the expected unpacking in the pipeline. |
|
""" |
|
if input_type == 'yt': |
|
display_progress('[~] Downloading song...', 0, is_webui, progress) |
|
song_link = song_input.split('&')[0] |
|
orig_song_path = yt_download(song_link) |
|
elif input_type == 'local': |
|
orig_song_path = song_input |
|
else: |
|
orig_song_path = None |
|
|
|
song_output_dir = os.path.join(output_dir, song_id) |
|
if not os.path.exists(song_output_dir): |
|
os.makedirs(song_output_dir) |
|
|
|
orig_song_path = convert_to_stereo(orig_song_path) |
|
|
|
display_progress('[~] Separating Vocals from Instrumental...', 0.1, is_webui, progress) |
|
vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals = separation_uvr(orig_song_path, song_output_dir) |
|
return orig_song_path, vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals, main_vocals_dereverb |
|
|
|
|
|
def voice_change(voice_model, vocals_path, output_path, pitch_change, f0_method, |
|
index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui): |
|
""" |
|
Convert the input vocals using the specified RVC model. |
|
""" |
|
rvc_model_path, rvc_index_path = get_rvc_model(voice_model, is_webui) |
|
device = 'cuda:0' |
|
config = Config(device, True) |
|
hubert_model = load_hubert(embedder_model="contentvec", embedder_model_custom=None) |
|
cpt, version, net_g, tgt_sr, vc = get_vc(device, config.is_half, config, rvc_model_path) |
|
|
|
rvc_infer( |
|
rvc_index_path, index_rate, vocals_path, output_path, pitch_change, f0_method, |
|
cpt, version, net_g, filter_radius, tgt_sr, rms_mix_rate, protect, |
|
crepe_hop_length, vc, hubert_model |
|
) |
|
del hubert_model, cpt |
|
gc.collect() |
|
|
|
|
|
def add_audio_effects(audio_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping): |
|
""" |
|
Apply a chain of audio effects (highpass, compression, reverb) to the input audio. |
|
""" |
|
output_path = f'{os.path.splitext(audio_path)[0]}_mixed.wav' |
|
board = Pedalboard([ |
|
HighpassFilter(), |
|
Compressor(ratio=4, threshold_db=-15), |
|
Reverb(room_size=reverb_rm_size, dry_level=reverb_dry, wet_level=reverb_wet, damping=reverb_damping) |
|
]) |
|
|
|
with AudioFile(audio_path) as f: |
|
with AudioFile(output_path, 'w', f.samplerate, f.num_channels) as o: |
|
while f.tell() < f.frames: |
|
chunk = f.read(int(f.samplerate)) |
|
effected = board(chunk, f.samplerate, reset=False) |
|
o.write(effected) |
|
return output_path |
|
|
|
|
|
def combine_audio(audio_paths, output_path, main_gain, backup_gain, inst_gain, output_format): |
|
""" |
|
Combine main vocals, backup vocals, and instrumental audio into a final mix. |
|
""" |
|
main_vocal_audio = AudioSegment.from_wav(audio_paths[0]) - 4 + main_gain |
|
backup_vocal_audio = AudioSegment.from_wav(audio_paths[1]) - 6 + backup_gain |
|
instrumental_audio = AudioSegment.from_wav(audio_paths[2]) - 7 + inst_gain |
|
final_audio = main_vocal_audio.overlay(backup_vocal_audio).overlay(instrumental_audio) |
|
final_audio.export(output_path, format=output_format) |
|
|
|
|
|
def song_cover_pipeline(song_input, voice_model, pitch_change, keep_files, |
|
is_webui=0, main_gain=0, backup_gain=0, inst_gain=0, index_rate=0.5, filter_radius=3, |
|
rms_mix_rate=0.25, f0_method='rmvpe', crepe_hop_length=128, protect=0.33, pitch_change_all=0, |
|
reverb_rm_size=0.15, reverb_wet=0.2, reverb_dry=0.8, reverb_damping=0.7, output_format='mp3', |
|
progress=gr.Progress()): |
|
""" |
|
Main pipeline that orchestrates the AI cover song generation. |
|
""" |
|
try: |
|
if not song_input or not voice_model: |
|
raise_exception('Ensure that the song input field and voice model field is filled.', is_webui) |
|
|
|
display_progress('[~] Starting AI Cover Generation Pipeline...', 0, is_webui, progress) |
|
|
|
if urlparse(song_input).scheme == 'https': |
|
input_type = 'yt' |
|
song_id = get_youtube_video_id(song_input) |
|
if song_id is None: |
|
raise_exception('Invalid YouTube url.', is_webui) |
|
else: |
|
input_type = 'local' |
|
song_input = song_input.strip('\"') |
|
if os.path.exists(song_input): |
|
song_id = get_hash(song_input) |
|
else: |
|
raise_exception(f'{song_input} does not exist.', is_webui) |
|
|
|
song_dir = os.path.join(output_dir, song_id) |
|
|
|
if not os.path.exists(song_dir): |
|
os.makedirs(song_dir) |
|
(orig_song_path, vocals_path, instrumentals_path, |
|
main_vocals_path, backup_vocals_path, main_vocals_dereverb_path) = preprocess_song( |
|
song_input, song_id, is_webui, input_type, progress |
|
) |
|
else: |
|
vocals_path, main_vocals_path = None, None |
|
paths = get_audio_paths(song_dir) |
|
if any(path is None for path in paths) or keep_files: |
|
(orig_song_path, vocals_path, instrumentals_path, |
|
main_vocals_path, backup_vocals_path, main_vocals_dereverb_path) = preprocess_song( |
|
song_input, song_id, is_webui, input_type, progress |
|
) |
|
else: |
|
orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path = paths |
|
main_vocals_path = main_vocals_dereverb_path |
|
|
|
pitch_change += pitch_change_all |
|
|
|
base_song_name = os.path.splitext(os.path.basename(orig_song_path))[0] |
|
algo_suffix = f"_{crepe_hop_length}" if f0_method == "mangio-crepe" else "" |
|
ai_vocals_path = os.path.join( |
|
song_dir, |
|
f'{base_song_name}_lead_{voice_model}_p{pitch_change}_i{index_rate}_fr{filter_radius}_' |
|
f'rms{rms_mix_rate}_pro{protect}_{f0_method}{algo_suffix}.wav' |
|
) |
|
ai_backing_path = os.path.join( |
|
song_dir, |
|
f'{base_song_name}_backing_{voice_model}_p{pitch_change}_i{index_rate}_fr{filter_radius}_' |
|
f'rms{rms_mix_rate}_pro{protect}_{f0_method}{algo_suffix}.wav' |
|
) |
|
ai_cover_path = os.path.join(song_dir, f'{base_song_name} ({voice_model} Ver).{output_format}') |
|
ai_cover_backing_path = os.path.join(song_dir, f'{base_song_name} ({voice_model} Ver With Backing).{output_format}') |
|
|
|
if not os.path.exists(ai_vocals_path): |
|
display_progress('[~] Converting lead voice using RVC...', 0.5, is_webui, progress) |
|
voice_change(voice_model, main_vocals_dereverb_path, ai_vocals_path, pitch_change, |
|
f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui) |
|
|
|
display_progress('[~] Converting backing voice using RVC...', 0.65, is_webui, progress) |
|
voice_change(voice_model, backup_vocals_path, ai_backing_path, pitch_change, |
|
f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui) |
|
|
|
display_progress('[~] Applying audio effects to Vocals...', 0.8, is_webui, progress) |
|
ai_vocals_mixed_path = add_audio_effects(ai_vocals_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping) |
|
ai_backing_mixed_path = add_audio_effects(ai_backing_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping) |
|
|
|
if pitch_change_all != 0: |
|
display_progress('[~] Applying overall pitch change', 0.85, is_webui, progress) |
|
instrumentals_path = pitch_shift(instrumentals_path, pitch_change_all) |
|
backup_vocals_path = pitch_shift(backup_vocals_path, pitch_change_all) |
|
|
|
display_progress('[~] Combining AI Vocals and Instrumentals...', 0.9, is_webui, progress) |
|
combine_audio([ai_vocals_mixed_path, backup_vocals_path, instrumentals_path], |
|
ai_cover_path, main_gain, backup_gain, inst_gain, output_format) |
|
combine_audio([ai_vocals_mixed_path, ai_backing_mixed_path, instrumentals_path], |
|
ai_cover_backing_path, main_gain, backup_gain, inst_gain, output_format) |
|
|
|
if not keep_files: |
|
display_progress('[~] Removing intermediate audio files...', 0.95, is_webui, progress) |
|
intermediate_files = [vocals_path, main_vocals_path, ai_vocals_mixed_path, ai_backing_mixed_path] |
|
if pitch_change_all != 0: |
|
intermediate_files += [instrumentals_path, backup_vocals_path] |
|
for file in intermediate_files: |
|
if file and os.path.exists(file): |
|
os.remove(file) |
|
|
|
return ai_cover_path, ai_cover_backing_path |
|
|
|
except Exception as e: |
|
raise_exception(str(e), is_webui) |
|
|
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser( |
|
description='AICoverGen: Mod.', |
|
add_help=True |
|
) |
|
parser.add_argument('-i', '--song-input', type=str, required=True, |
|
help='Link to a YouTube video or the filepath to a local mp3/wav file to create an AI cover of') |
|
parser.add_argument('-dir', '--rvc-dirname', type=str, required=True, |
|
help='Name of the folder in the rvc_models directory containing the RVC model file and optional index file to use') |
|
parser.add_argument('-p', '--pitch-change', type=int, required=True, |
|
help='Change the pitch of AI Vocals only. Generally, use 1 for male to female and -1 for vice-versa. (Octaves)') |
|
parser.add_argument('-k', '--keep-files', action=argparse.BooleanOptionalAction, |
|
help='Whether to keep all intermediate audio files generated in the song_output/id directory, e.g. Isolated Vocals/Instrumentals') |
|
parser.add_argument('-ir', '--index-rate', type=float, default=0.5, |
|
help='A decimal number e.g. 0.5, used to reduce/resolve the timbre leakage problem. If set to 1, more biased towards the timbre quality of the training dataset') |
|
parser.add_argument('-fr', '--filter-radius', type=int, default=3, |
|
help='A number between 0 and 7. If >=3: apply median filtering to the harvested pitch results. The value represents the filter radius and can reduce breathiness.') |
|
parser.add_argument('-rms', '--rms-mix-rate', type=float, default=0.25, |
|
help="A decimal number e.g. 0.25. Control how much to use the original vocal's loudness (0) or a fixed loudness (1).") |
|
parser.add_argument('-palgo', '--pitch-detection-algo', type=str, default='rmvpe', |
|
help='Best option is rmvpe (clarity in vocals), then mangio-crepe (smoother vocals).') |
|
parser.add_argument('-hop', '--crepe-hop-length', type=int, default=128, |
|
help='If pitch detection algo is mangio-crepe, controls how often it checks for pitch changes in milliseconds. Recommended: 128.') |
|
parser.add_argument('-pro', '--protect', type=float, default=0.33, |
|
help='A decimal number e.g. 0.33. Protect voiceless consonants and breath sounds to prevent artifacts such as tearing in electronic music.') |
|
parser.add_argument('-mv', '--main-vol', type=int, default=0, |
|
help='Volume change for AI main vocals in decibels. Use -3 to decrease by 3 dB and 3 to increase by 3 dB') |
|
parser.add_argument('-bv', '--backup-vol', type=int, default=0, |
|
help='Volume change for backup vocals in decibels') |
|
parser.add_argument('-iv', '--inst-vol', type=int, default=0, |
|
help='Volume change for instrumentals in decibels') |
|
parser.add_argument('-pall', '--pitch-change-all', type=int, default=0, |
|
help='Change the pitch/key of vocals and instrumentals. Changing this slightly reduces sound quality') |
|
parser.add_argument('-rsize', '--reverb-size', type=float, default=0.15, |
|
help='Reverb room size between 0 and 1') |
|
parser.add_argument('-rwet', '--reverb-wetness', type=float, default=0.2, |
|
help='Reverb wet level between 0 and 1') |
|
parser.add_argument('-rdry', '--reverb-dryness', type=float, default=0.8, |
|
help='Reverb dry level between 0 and 1') |
|
parser.add_argument('-rdamp', '--reverb-damping', type=float, default=0.7, |
|
help='Reverb damping between 0 and 1') |
|
parser.add_argument('-oformat', '--output-format', type=str, default='mp3', |
|
help='Output format of audio file. mp3 for smaller file size, wav for best quality') |
|
args = parser.parse_args() |
|
|
|
rvc_dir = os.path.join(rvc_models_dir, args.rvc_dirname) |
|
if not os.path.exists(rvc_dir): |
|
raise Exception(f'The folder {rvc_dir} does not exist.') |
|
|
|
cover_path, cover_with_backing = song_cover_pipeline( |
|
args.song_input, args.rvc_dirname, args.pitch_change, args.keep_files, |
|
main_gain=args.main_vol, backup_gain=args.backup_vol, inst_gain=args.inst_vol, |
|
index_rate=args.index_rate, filter_radius=args.filter_radius, |
|
rms_mix_rate=args.rms_mix_rate, f0_method=args.pitch_detection_algo, |
|
crepe_hop_length=args.crepe_hop_length, protect=args.protect, |
|
pitch_change_all=args.pitch_change_all, |
|
reverb_rm_size=args.reverb_size, reverb_wet=args.reverb_wetness, |
|
reverb_dry=args.reverb_dryness, reverb_damping=args.reverb_damping, |
|
output_format=args.output_format |
|
) |
|
print(f'[+] Cover generated at {cover_path}') |