Wget / Test.py
NeoPy's picture
Update Test.py
fa2fd5b verified
import argparse
import gc
import hashlib
import json
import os
import shlex
import subprocess
from contextlib import suppress
from urllib.parse import urlparse, parse_qs
import gradio as gr
import librosa
import numpy as np
import soundfile as sf
import sox
import yt_dlp
from pedalboard import Pedalboard, Reverb, Compressor, HighpassFilter
from pedalboard.io import AudioFile
from pydub import AudioSegment
from audio_separator.separator import Separator
from rvc import Config, load_hubert, get_vc, rvc_infer
# Base directories
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
mdxnet_models_dir = os.path.join(BASE_DIR, 'mdxnet_models')
rvc_models_dir = os.path.join(BASE_DIR, 'rvc_models')
output_dir = os.path.join(BASE_DIR, 'song_output')
def get_youtube_video_id(url, ignore_playlist=True):
"""
Extract the YouTube video ID from various URL formats.
Examples:
http://youtu.be/SA2iWivDJiE
http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu
http://www.youtube.com/embed/SA2iWivDJiE
http://www.youtube.com/v/SA2iWivDJiE?version=3&hl=en_US
"""
parsed_url = urlparse(url)
hostname = parsed_url.hostname or ''
path = parsed_url.path
if hostname.lower() == 'youtu.be':
return path.lstrip('/')
if hostname.lower() in {'www.youtube.com', 'youtube.com', 'music.youtube.com'}:
if not ignore_playlist:
with suppress(KeyError):
return parse_qs(parsed_url.query)['list'][0]
if parsed_url.path == '/watch':
return parse_qs(parsed_url.query).get('v', [None])[0]
if parsed_url.path.startswith('/watch/'):
return parsed_url.path.split('/')[1]
if parsed_url.path.startswith('/embed/'):
return parsed_url.path.split('/')[2]
if parsed_url.path.startswith('/v/'):
return parsed_url.path.split('/')[2]
return None
def yt_download(link):
"""
Download the audio from a YouTube link as an mp3 file.
"""
ydl_opts = {
'format': 'bestaudio',
'outtmpl': '%(title)s',
'nocheckcertificate': True,
'ignoreerrors': True,
'no_warnings': True,
'quiet': True,
'extractaudio': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3'
}],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
result = ydl.extract_info(link, download=True)
download_path = ydl.prepare_filename(result, outtmpl='%(title)s.mp3')
return download_path
def display_progress(message, percent, is_webui, progress=None):
"""
Display progress either via the provided progress callback or by printing.
"""
if is_webui and progress is not None:
progress(percent, desc=message)
else:
print(message)
def raise_exception(error_msg, is_webui):
"""
Raise an exception. If running in a web UI, use gr.Error.
"""
if is_webui:
raise gr.Error(error_msg)
else:
raise Exception(error_msg)
def get_rvc_model(voice_model, is_webui):
"""
Search the specified RVC model directory for the model (.pth) and index (.index) files.
"""
rvc_model_filename, rvc_index_filename = None, None
model_dir = os.path.join(rvc_models_dir, voice_model)
if not os.path.exists(model_dir):
raise_exception(f'Model directory {model_dir} does not exist.', is_webui)
for file in os.listdir(model_dir):
ext = os.path.splitext(file)[1]
if ext == '.pth':
rvc_model_filename = file
if ext == '.index':
rvc_index_filename = file
if rvc_model_filename is None:
error_msg = f'No model file exists in {model_dir}.'
raise_exception(error_msg, is_webui)
model_path = os.path.join(model_dir, rvc_model_filename)
index_path = os.path.join(model_dir, rvc_index_filename) if rvc_index_filename else ''
return model_path, index_path
def separation_uvr(filename, output):
"""
Run the separation steps using different pre-trained models.
Returns a tuple of four file paths:
- vocals_no_reverb: The vocals after initial de-echo/de-reverb (used as intermediate vocals)
- instrumental_path: The separated instrumental audio
- main_vocals_dereverb: The lead vocals after final de-reverb processing
- backup_vocals: The backup vocals extracted in the final stage
"""
separator = Separator(output_dir=output)
base_name = os.path.splitext(os.path.basename(filename))[0]
instrumental_path = os.path.join(output, f'{base_name}_Instrumental.wav')
initial_vocals = os.path.join(output, f'{base_name}_Vocals.wav')
vocals_no_reverb = os.path.join(output, f'{base_name}_Vocals (No Reverb).wav')
vocals_reverb = os.path.join(output, f'{base_name}_Vocals (Reverb).wav')
main_vocals_dereverb = os.path.join(output, f'{base_name}_Vocals_Main_DeReverb.wav')
backup_vocals = os.path.join(output, f'{base_name}_Vocals_Backup.wav')
separator.load_model(model_filename='model_bs_roformer_ep_317_sdr_12.9755.ckpt')
voc_inst = separator.separate(filename)
os.rename(os.path.join(output, voc_inst[0]), instrumental_path)
os.rename(os.path.join(output, voc_inst[1]), initial_vocals)
separator.load_model(model_filename='UVR-DeEcho-DeReverb.pth')
voc_no_reverb = separator.separate(initial_vocals)
os.rename(os.path.join(output, voc_no_reverb[0]), vocals_no_reverb)
os.rename(os.path.join(output, voc_no_reverb[1]), vocals_reverb)
separator.load_model(model_filename='mel_band_roformer_karaoke_aufr33_viperx_sdr_10.1956.ckpt')
voc_split = separator.separate(vocals_no_reverb)
os.rename(os.path.join(output, voc_split[0]), backup_vocals)
os.rename(os.path.join(output, voc_split[1]), main_vocals_dereverb)
if os.path.exists(vocals_reverb):
os.remove(vocals_reverb)
return vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals
def get_audio_paths(song_dir):
"""
Search the given directory for expected audio files.
Returns:
orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path
"""
orig_song_path = None
instrumentals_path = None
main_vocals_dereverb_path = None
backup_vocals_path = None
for file in os.listdir(song_dir):
if file.endswith('_Instrumental.wav'):
instrumentals_path = os.path.join(song_dir, file)
orig_song_path = instrumentals_path.replace('_Instrumental', '')
elif file.endswith('_Vocals_Main_DeReverb.wav'):
main_vocals_dereverb_path = os.path.join(song_dir, file)
elif file.endswith('_Vocals_Backup.wav'):
backup_vocals_path = os.path.join(song_dir, file)
return orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path
def convert_to_stereo(audio_path):
"""
Convert the given audio file to stereo (2 channels) if it is mono.
"""
wave, sr = librosa.load(audio_path, mono=False, sr=44100)
if wave.ndim == 1:
stereo_path = f'{os.path.splitext(audio_path)[0]}_stereo.wav'
command = shlex.split(f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 2 -f wav "{stereo_path}"')
subprocess.run(command, check=True)
return stereo_path
return audio_path
def pitch_shift(audio_path, pitch_change):
"""
Shift the pitch of the audio by the specified amount.
"""
output_path = f'{os.path.splitext(audio_path)[0]}_p{pitch_change}.wav'
if not os.path.exists(output_path):
y, sr = sf.read(audio_path)
tfm = sox.Transformer()
tfm.pitch(pitch_change)
y_shifted = tfm.build_array(input_array=y, sample_rate_in=sr)
sf.write(output_path, y_shifted, sr)
return output_path
def get_hash(filepath):
"""
Calculate a short BLAKE2b hash for the given file.
"""
with open(filepath, 'rb') as f:
file_hash = hashlib.blake2b()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()[:11]
def preprocess_song(song_input, song_id, is_webui, input_type, progress):
"""
Preprocess the input song:
- Download if YouTube URL.
- Convert to stereo.
- Separate vocals and instrumentals.
Returns a tuple with six values matching the expected unpacking in the pipeline.
"""
if input_type == 'yt':
display_progress('[~] Downloading song...', 0, is_webui, progress)
song_link = song_input.split('&')[0]
orig_song_path = yt_download(song_link)
elif input_type == 'local':
orig_song_path = song_input
else:
orig_song_path = None
song_output_dir = os.path.join(output_dir, song_id)
if not os.path.exists(song_output_dir):
os.makedirs(song_output_dir)
orig_song_path = convert_to_stereo(orig_song_path)
display_progress('[~] Separating Vocals from Instrumental...', 0.1, is_webui, progress)
vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals = separation_uvr(orig_song_path, song_output_dir)
return orig_song_path, vocals_no_reverb, instrumental_path, main_vocals_dereverb, backup_vocals, main_vocals_dereverb
def voice_change(voice_model, vocals_path, output_path, pitch_change, f0_method,
index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui):
"""
Convert the input vocals using the specified RVC model.
"""
rvc_model_path, rvc_index_path = get_rvc_model(voice_model, is_webui)
device = 'cuda:0'
config = Config(device, True)
hubert_model = load_hubert(embedder_model="contentvec", embedder_model_custom=None)
cpt, version, net_g, tgt_sr, vc = get_vc(device, config.is_half, config, rvc_model_path)
rvc_infer(
rvc_index_path, index_rate, vocals_path, output_path, pitch_change, f0_method,
cpt, version, net_g, filter_radius, tgt_sr, rms_mix_rate, protect,
crepe_hop_length, vc, hubert_model
)
del hubert_model, cpt
gc.collect()
def add_audio_effects(audio_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping):
"""
Apply a chain of audio effects (highpass, compression, reverb) to the input audio.
"""
output_path = f'{os.path.splitext(audio_path)[0]}_mixed.wav'
board = Pedalboard([
HighpassFilter(),
Compressor(ratio=4, threshold_db=-15),
Reverb(room_size=reverb_rm_size, dry_level=reverb_dry, wet_level=reverb_wet, damping=reverb_damping)
])
with AudioFile(audio_path) as f:
with AudioFile(output_path, 'w', f.samplerate, f.num_channels) as o:
while f.tell() < f.frames:
chunk = f.read(int(f.samplerate))
effected = board(chunk, f.samplerate, reset=False)
o.write(effected)
return output_path
def combine_audio(audio_paths, output_path, main_gain, backup_gain, inst_gain, output_format):
"""
Combine main vocals, backup vocals, and instrumental audio into a final mix.
"""
main_vocal_audio = AudioSegment.from_wav(audio_paths[0]) - 4 + main_gain
backup_vocal_audio = AudioSegment.from_wav(audio_paths[1]) - 6 + backup_gain
instrumental_audio = AudioSegment.from_wav(audio_paths[2]) - 7 + inst_gain
final_audio = main_vocal_audio.overlay(backup_vocal_audio).overlay(instrumental_audio)
final_audio.export(output_path, format=output_format)
def song_cover_pipeline(song_input, voice_model, pitch_change, keep_files,
is_webui=0, main_gain=0, backup_gain=0, inst_gain=0, index_rate=0.5, filter_radius=3,
rms_mix_rate=0.25, f0_method='rmvpe', crepe_hop_length=128, protect=0.33, pitch_change_all=0,
reverb_rm_size=0.15, reverb_wet=0.2, reverb_dry=0.8, reverb_damping=0.7, output_format='mp3',
progress=gr.Progress()):
"""
Main pipeline that orchestrates the AI cover song generation.
"""
try:
if not song_input or not voice_model:
raise_exception('Ensure that the song input field and voice model field is filled.', is_webui)
display_progress('[~] Starting AI Cover Generation Pipeline...', 0, is_webui, progress)
if urlparse(song_input).scheme == 'https':
input_type = 'yt'
song_id = get_youtube_video_id(song_input)
if song_id is None:
raise_exception('Invalid YouTube url.', is_webui)
else:
input_type = 'local'
song_input = song_input.strip('\"')
if os.path.exists(song_input):
song_id = get_hash(song_input)
else:
raise_exception(f'{song_input} does not exist.', is_webui)
song_dir = os.path.join(output_dir, song_id)
if not os.path.exists(song_dir):
os.makedirs(song_dir)
(orig_song_path, vocals_path, instrumentals_path,
main_vocals_path, backup_vocals_path, main_vocals_dereverb_path) = preprocess_song(
song_input, song_id, is_webui, input_type, progress
)
else:
vocals_path, main_vocals_path = None, None
paths = get_audio_paths(song_dir)
if any(path is None for path in paths) or keep_files:
(orig_song_path, vocals_path, instrumentals_path,
main_vocals_path, backup_vocals_path, main_vocals_dereverb_path) = preprocess_song(
song_input, song_id, is_webui, input_type, progress
)
else:
orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path = paths
main_vocals_path = main_vocals_dereverb_path
pitch_change += pitch_change_all
base_song_name = os.path.splitext(os.path.basename(orig_song_path))[0]
algo_suffix = f"_{crepe_hop_length}" if f0_method == "mangio-crepe" else ""
ai_vocals_path = os.path.join(
song_dir,
f'{base_song_name}_lead_{voice_model}_p{pitch_change}_i{index_rate}_fr{filter_radius}_'
f'rms{rms_mix_rate}_pro{protect}_{f0_method}{algo_suffix}.wav'
)
ai_backing_path = os.path.join(
song_dir,
f'{base_song_name}_backing_{voice_model}_p{pitch_change}_i{index_rate}_fr{filter_radius}_'
f'rms{rms_mix_rate}_pro{protect}_{f0_method}{algo_suffix}.wav'
)
ai_cover_path = os.path.join(song_dir, f'{base_song_name} ({voice_model} Ver).{output_format}')
ai_cover_backing_path = os.path.join(song_dir, f'{base_song_name} ({voice_model} Ver With Backing).{output_format}')
if not os.path.exists(ai_vocals_path):
display_progress('[~] Converting lead voice using RVC...', 0.5, is_webui, progress)
voice_change(voice_model, main_vocals_dereverb_path, ai_vocals_path, pitch_change,
f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui)
display_progress('[~] Converting backing voice using RVC...', 0.65, is_webui, progress)
voice_change(voice_model, backup_vocals_path, ai_backing_path, pitch_change,
f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui)
display_progress('[~] Applying audio effects to Vocals...', 0.8, is_webui, progress)
ai_vocals_mixed_path = add_audio_effects(ai_vocals_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping)
ai_backing_mixed_path = add_audio_effects(ai_backing_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping)
if pitch_change_all != 0:
display_progress('[~] Applying overall pitch change', 0.85, is_webui, progress)
instrumentals_path = pitch_shift(instrumentals_path, pitch_change_all)
backup_vocals_path = pitch_shift(backup_vocals_path, pitch_change_all)
display_progress('[~] Combining AI Vocals and Instrumentals...', 0.9, is_webui, progress)
combine_audio([ai_vocals_mixed_path, backup_vocals_path, instrumentals_path],
ai_cover_path, main_gain, backup_gain, inst_gain, output_format)
combine_audio([ai_vocals_mixed_path, ai_backing_mixed_path, instrumentals_path],
ai_cover_backing_path, main_gain, backup_gain, inst_gain, output_format)
if not keep_files:
display_progress('[~] Removing intermediate audio files...', 0.95, is_webui, progress)
intermediate_files = [vocals_path, main_vocals_path, ai_vocals_mixed_path, ai_backing_mixed_path]
if pitch_change_all != 0:
intermediate_files += [instrumentals_path, backup_vocals_path]
for file in intermediate_files:
if file and os.path.exists(file):
os.remove(file)
return ai_cover_path, ai_cover_backing_path
except Exception as e:
raise_exception(str(e), is_webui)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='AICoverGen: Mod.',
add_help=True
)
parser.add_argument('-i', '--song-input', type=str, required=True,
help='Link to a YouTube video or the filepath to a local mp3/wav file to create an AI cover of')
parser.add_argument('-dir', '--rvc-dirname', type=str, required=True,
help='Name of the folder in the rvc_models directory containing the RVC model file and optional index file to use')
parser.add_argument('-p', '--pitch-change', type=int, required=True,
help='Change the pitch of AI Vocals only. Generally, use 1 for male to female and -1 for vice-versa. (Octaves)')
parser.add_argument('-k', '--keep-files', action=argparse.BooleanOptionalAction,
help='Whether to keep all intermediate audio files generated in the song_output/id directory, e.g. Isolated Vocals/Instrumentals')
parser.add_argument('-ir', '--index-rate', type=float, default=0.5,
help='A decimal number e.g. 0.5, used to reduce/resolve the timbre leakage problem. If set to 1, more biased towards the timbre quality of the training dataset')
parser.add_argument('-fr', '--filter-radius', type=int, default=3,
help='A number between 0 and 7. If >=3: apply median filtering to the harvested pitch results. The value represents the filter radius and can reduce breathiness.')
parser.add_argument('-rms', '--rms-mix-rate', type=float, default=0.25,
help="A decimal number e.g. 0.25. Control how much to use the original vocal's loudness (0) or a fixed loudness (1).")
parser.add_argument('-palgo', '--pitch-detection-algo', type=str, default='rmvpe',
help='Best option is rmvpe (clarity in vocals), then mangio-crepe (smoother vocals).')
parser.add_argument('-hop', '--crepe-hop-length', type=int, default=128,
help='If pitch detection algo is mangio-crepe, controls how often it checks for pitch changes in milliseconds. Recommended: 128.')
parser.add_argument('-pro', '--protect', type=float, default=0.33,
help='A decimal number e.g. 0.33. Protect voiceless consonants and breath sounds to prevent artifacts such as tearing in electronic music.')
parser.add_argument('-mv', '--main-vol', type=int, default=0,
help='Volume change for AI main vocals in decibels. Use -3 to decrease by 3 dB and 3 to increase by 3 dB')
parser.add_argument('-bv', '--backup-vol', type=int, default=0,
help='Volume change for backup vocals in decibels')
parser.add_argument('-iv', '--inst-vol', type=int, default=0,
help='Volume change for instrumentals in decibels')
parser.add_argument('-pall', '--pitch-change-all', type=int, default=0,
help='Change the pitch/key of vocals and instrumentals. Changing this slightly reduces sound quality')
parser.add_argument('-rsize', '--reverb-size', type=float, default=0.15,
help='Reverb room size between 0 and 1')
parser.add_argument('-rwet', '--reverb-wetness', type=float, default=0.2,
help='Reverb wet level between 0 and 1')
parser.add_argument('-rdry', '--reverb-dryness', type=float, default=0.8,
help='Reverb dry level between 0 and 1')
parser.add_argument('-rdamp', '--reverb-damping', type=float, default=0.7,
help='Reverb damping between 0 and 1')
parser.add_argument('-oformat', '--output-format', type=str, default='mp3',
help='Output format of audio file. mp3 for smaller file size, wav for best quality')
args = parser.parse_args()
rvc_dir = os.path.join(rvc_models_dir, args.rvc_dirname)
if not os.path.exists(rvc_dir):
raise Exception(f'The folder {rvc_dir} does not exist.')
cover_path, cover_with_backing = song_cover_pipeline(
args.song_input, args.rvc_dirname, args.pitch_change, args.keep_files,
main_gain=args.main_vol, backup_gain=args.backup_vol, inst_gain=args.inst_vol,
index_rate=args.index_rate, filter_radius=args.filter_radius,
rms_mix_rate=args.rms_mix_rate, f0_method=args.pitch_detection_algo,
crepe_hop_length=args.crepe_hop_length, protect=args.protect,
pitch_change_all=args.pitch_change_all,
reverb_rm_size=args.reverb_size, reverb_wet=args.reverb_wetness,
reverb_dry=args.reverb_dryness, reverb_damping=args.reverb_damping,
output_format=args.output_format
)
print(f'[+] Cover generated at {cover_path}')