neopy-neorvc / neorvc /main_cli.py
NeoPy's picture
Upload folder using huggingface_hub
8bb5911 verified
from neorvc.init.type_module import *
import torch
import os
from urllib.parse import urlparse, parse_qs
from pathlib import Path
import asyncio
import aiohttp
import aiofiles
import zipfile
import shutil
import re
import hashlib
import subprocess
import shlex
import argparse
import logging
import gc
from tqdm import tqdm
from pydub import AudioSegment
import soundfile as sf
from typing import Optional, Union
try:
from gradio.helpers import Progress as GradioProgress
except ImportError:
GradioProgress = None
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")
def handle_progress(progress: Optional[Union[tqdm, 'GradioProgress']], description: Optional[str] = None, value: Optional[float] = None) -> None:
"""Handle progress updates for both tqdm and Gradio Progress objects."""
if progress is None:
return
# Handle Gradio Progress
if GradioProgress is not None and isinstance(progress, GradioProgress):
if description:
progress(0, desc=description) # Gradio progress uses a callable to set description
if value is not None:
progress(value / 100) # Gradio expects progress as a fraction (0 to 1)
return
# Handle tqdm Progress
if isinstance(progress, tqdm):
if description and hasattr(progress, 'set_description'):
progress.set_description(description)
if value is not None:
progress.update(value - progress.n if progress.n < value else 0)
def get_youtube_video_id(url: str, ignore_playlist: bool = True) -> str | None:
parsed = urlparse(url)
if parsed.hostname == "youtu.be":
return parsed.path.lstrip("/")
if parsed.hostname in {"www.youtube.com", "youtube.com", "music.youtube.com"}:
if not ignore_playlist and "list" in parse_qs(parsed.query):
return parse_qs(parsed.query)["list"][0]
if parsed.path == "/watch":
return parse_qs(parsed.query)["v"][0]
if parsed.path.startswith(("/embed/", "/v/")):
return parsed.path.split("/")[-1]
return None
async def yt_download(link: str, cookies_path: str = os.path.join(BASE_DIR, "neorvc", "config.txt"), progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> Path:
if not os.path.exists(cookies_path):
raise FileNotFoundError(f"Cookies file not found: {cookies_path}")
video_id = get_youtube_video_id(link)
if not video_id:
raise ValueError("Invalid YouTube URL: could not extract video ID.")
output_file = os.path.join(OUTPUT_DIR, f"{video_id}.mp3")
if os.path.exists(output_file):
return Path(output_file)
handle_progress(progress, description="Downloading YouTube audio", value=10)
cmd = [
"yt-dlp",
"--format", "bestaudio/best",
"--extract-audio",
"--audio-format", "mp3",
"--audio-quality", "192K",
"--cookies", str(cookies_path),
"--output", str(output_file),
"--no-check-certificate",
link
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await process.communicate()
if process.returncode != 0:
raise RuntimeError("yt-dlp failed")
if not os.path.exists(output_file):
raise RuntimeError(f"Downloaded file not found: {output_file}")
handle_progress(progress, value=20)
return Path(output_file)
def sanitize_model_name(dir_name: str) -> str:
if not dir_name or not re.match(r"^[a-zAZ0-9_-]+$", dir_name):
raise ValueError("Invalid model name")
return dir_name
async def download_online_model(url: str, dir_name: str, progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> str:
dir_name = sanitize_model_name(dir_name)
if not url or not dir_name:
raise ValueError("URL and model name are required")
if not url.startswith(("http://", "https://")):
raise ValueError("Invalid URL format")
extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name)
if os.path.exists(extraction_folder):
raise ValueError(f"Model directory '{dir_name}' already exists")
zip_name = url.split("/")[-1]
if "pixeldrain.com" in url:
zip_name = Path(zip_name).name
url = f"https://pixeldrain.com/api/file/{zip_name}"
handle_progress(progress, description=f"Downloading model '{dir_name}'")
zip_path = os.path.join(OUTPUT_DIR, zip_name)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise ValueError(f"Failed to download model: HTTP {response.status}")
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
async with aiofiles.open(zip_path, "wb") as f:
async for chunk in response.content.iter_chunked(1024):
await f.write(chunk)
downloaded += len(chunk)
if total_size:
progress_value = 20 + (downloaded / total_size) * 30
handle_progress(progress, value=progress_value)
handle_progress(progress, description="Extracting model")
try:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extraction_folder)
os.unlink(zip_path)
except (zipfile.BadZipFile, OSError) as e:
shutil.rmtree(extraction_folder, ignore_errors=True)
raise ValueError(f"Error extracting zip: {e}")
model_filepath = None
index_filepath = None
for file_path in Path(extraction_folder).rglob("*"):
if file_path.suffix == ".pth" and file_path.stat().st_size > 40 * 1024 * 1024:
model_filepath = file_path
if file_path.suffix == ".index" and file_path.stat().st_size > 100 * 1024:
index_filepath = file_path
if not model_filepath:
shutil.rmtree(extraction_folder, ignore_errors=True)
raise ValueError(f"No valid .pth model file found in {extraction_folder}")
for filepath in (model_filepath, index_filepath):
if filepath and filepath != Path(os.path.join(extraction_folder, filepath.name)):
os.rename(filepath, os.path.join(extraction_folder, filepath.name))
for item in Path(extraction_folder).iterdir():
if item.is_dir():
shutil.rmtree(item, ignore_errors=True)
handle_progress(progress, value=10)
print(f"Model '{dir_name}' downloaded")
return f"Model '{dir_name}' downloaded"
def raise_exception(msg: str) -> None:
raise ValueError(msg)
def get_rvc_model(voice_model: str) -> tuple[Path, Path | None]:
model_dir = os.path.join(RVC_MODELS_DIR, voice_model)
pth = None
idx = None
for f in Path(model_dir).iterdir():
if f.suffix == ".pth":
pth = f
if f.suffix == ".index":
idx = f
if not pth:
raise_exception(f"No model file in {model_dir}")
return pth, idx
def get_audio_paths(song_dir: Path) -> tuple[Path | None, Path | None, Path | None, Path | None]:
orig = inst = main_drb = backup = None
for f in song_dir.iterdir():
if f.name.endswith("_Instrumental.wav"):
inst = f
orig = Path(os.path.join(song_dir, f.name.replace("_Instrumental", "")))
elif f.name.endswith("_Vocals_Main_DeReverb.wav"):
main_drb = f
elif f.name.endswith("_Vocals_Backup.wav"):
backup = f
return orig, inst, main_drb, backup
def convert_to_stereo(path: Path) -> Path:
info = sf.info(path)
if info.channels == 1:
stereo = path.with_stem(f"{path.stem}_stereo")
cmd = shlex.split(f'ffmpeg -y -loglevel error -i "{path}" -ac 2 "{stereo}"')
subprocess.run(cmd, check=True)
return stereo
return path
def get_hash(fp: Path) -> str:
h = hashlib.blake2b()
with fp.open("rb") as f:
while chunk := f.read(8192):
h.update(chunk)
return h.hexdigest()[:11]
async def preprocess_song(
inp: str,
sid: str,
inp_type: str,
progress: Optional[Union[tqdm, 'GradioProgress']] = None
) -> tuple[Path | None, Path, Path, Path, Path, Path]:
keep = False
if inp_type == "yt":
handle_progress(progress, description="Downloading audio")
path = await yt_download(inp.split("&")[0], progress=progress)
else:
path = Path(inp.strip('"'))
if not path.exists() or path.suffix.lower() not in AUDIO_EXTS:
raise_exception(f"Invalid audio file: {path}")
keep = True
out_dir = os.path.join(UVR_OUTPUT_DIR, sid)
os.makedirs(out_dir, exist_ok=True)
separator = Separator(output_dir=out_dir, log_level=logging.WARNING)
path = convert_to_stereo(path)
base = path.stem
inst = Path(os.path.join(out_dir, f"{base}_Instrumental.wav"))
vocals = Path(os.path.join(out_dir, f"{base}_Vocals.wav"))
vocals_no_reverb = Path(os.path.join(out_dir, f"{base}_Vocals_NoReverb.wav"))
backup = Path(os.path.join(out_dir, f"{base}_Vocals_Backup.wav"))
main_drb = Path(os.path.join(out_dir, f"{base}_Vocals_Main_DeReverb.wav"))
required_files = [vocals, inst, vocals_no_reverb, backup, main_drb]
if all(p.exists() for p in required_files):
orig = path if keep else None
return orig, vocals, inst, vocals_no_reverb, backup, main_drb
handle_progress(progress, description="Separating vocals")
separator.load_model(model_filename="model_bs_roformer_ep_317_sdr_12.9755.ckpt")
voc_inst = separator.separate(str(path))
Path(os.path.join(out_dir, voc_inst[0])).rename(inst)
Path(os.path.join(out_dir, voc_inst[1])).rename(vocals)
handle_progress(progress, value=10)
handle_progress(progress, description="DeReverbing vocals")
separator.load_model(model_filename="UVR-DeEcho-DeReverb.pth")
voc_no_reverb = separator.separate(str(vocals))
Path(os.path.join(out_dir, voc_no_reverb[0])).rename(vocals_no_reverb)
Path(os.path.join(out_dir, voc_no_reverb[1])).rename(Path(os.path.join(out_dir, f"{base}_Vocals_Reverb.wav")))
handle_progress(progress, value=20)
handle_progress(progress, description="Splitting main/backup vocals")
separator.load_model(model_filename="mel_band_roformer_karaoke_aufr33_viperx_sdr_10.1956.ckpt")
backing_voc = separator.separate(str(vocals_no_reverb))
Path(os.path.join(out_dir, backing_voc[0])).rename(backup)
Path(os.path.join(out_dir, backing_voc[1])).rename(main_drb)
handle_progress(progress, value=30)
orig = path if keep else None
return orig, vocals, inst, vocals_no_reverb, backup, main_drb
def voice_change(
model: str,
vocals: Path,
out: Path,
pitch: int,
f0: str,
idx_rate: float,
filt_rad: int,
rms: float,
prot: float,
hop: int,
progress: Optional[Union[tqdm, 'GradioProgress']] = None
) -> None:
pth, idx = get_rvc_model(model)
handle_progress(progress, description="Converting voice")
run_infer_script(
pth_path=str(pth),
index_path=str(idx) if idx else "",
index_rate=idx_rate,
input_path=str(vocals),
output_path=str(out),
pitch=pitch,
f0_method=f0,
filter_radius=filt_rad,
volume_envelope=rms,
protect=prot,
hop_length=hop,
split_audio=False,
f0_autotune_strength=0.0,
clean_audio=False,
f0_autotune=False,
clean_strength=0.0,
export_format="wav",
f0_file=None,
embedder_model="contentvec"
)
gc.collect()
handle_progress(progress, value=50)
print(f"Voice conversion completed: {out}")
def combine_audio(paths: list[Path], out: Path, mg: float, bg: float, ig: float, fmt: str, progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> None:
handle_progress(progress, description="Combining tracks")
main = AudioSegment.from_file(paths[0]) + mg - 4
backup = AudioSegment.from_file(paths[1]) + bg - 6
inst = AudioSegment.from_file(paths[2]) + ig - 7
main.overlay(backup).overlay(inst).export(out, format=fmt)
handle_progress(progress, value=60)
print(f"Combined audio saved: {out}")
async def song_cover_pipeline(
song_input: str,
voice_model: str,
pitch_change: int,
keep_files: bool,
main_gain: float = 0,
backup_gain: float = 0,
inst_gain: float = 0,
index_rate: float = 0.5,
filter_radius: int = 3,
rms_mix_rate: float = 0.25,
f0_method: str = "rmvpe",
crepe_hop_length: int = 128,
protect: float = 0.33,
output_format: str = "mp3",
progress: Optional[Union[tqdm, 'GradioProgress']] = None
) -> Path:
if not song_input or not voice_model:
raise_exception("Song input and voice model are required")
handle_progress(progress, description="Starting pipeline")
parsed = urlparse(song_input)
if parsed.scheme.startswith("http"):
inp_type = "yt"
sid = get_youtube_video_id(song_input)
if not sid:
raise_exception("Invalid YouTube URL")
base_filename = sid
else:
inp_type = "local"
song_input = song_input.strip('"')
path = Path(song_input)
if path.exists():
sid = get_hash(path)
base_filename = path.stem
else:
raise_exception(f"File not found: {song_input}")
song_dir = os.path.join(OUTPUT_DIR, sid)
os.makedirs(song_dir, exist_ok=True)
orig_fp, inst_fp, main_drb_fp, backup_fp = get_audio_paths(Path(song_dir))
if not keep_files and all((orig_fp, inst_fp, main_drb_fp, backup_fp)):
orig, inst, main, backup, main_drb = orig_fp, inst_fp, main_drb_fp, backup_fp, main_drb_fp
else:
orig, vocals, inst, main, backup, main_drb = await preprocess_song(
song_input, sid, inp_type, progress
)
ai_vocals = Path(os.path.join(OUTPUT_DIR, f"{voice_model}_Generated_{base_filename}.wav"))
ai_cover = Path(os.path.join(OUTPUT_DIR, f"{base_filename} ({voice_model} Ver).{output_format}"))
voice_change(
model=voice_model,
vocals=main_drb,
out=ai_vocals,
pitch=pitch_change,
f0=f0_method,
idx_rate=index_rate,
filt_rad=filter_radius,
rms=rms_mix_rate,
prot=protect,
hop=crepe_hop_length,
progress=progress
)
combine_audio([ai_vocals, backup, inst], ai_cover, main_gain, backup_gain, inst_gain, output_format, progress)
if not keep_files:
handle_progress(progress, description="Cleaning up")
for f in (main, inst, backup):
if f and f.exists():
f.unlink()
handle_progress(progress, value=65)
print(f"Output saved: {ai_cover}")
return ai_cover
async def vocal_cover_pipeline(
song_input: str,
voice_model: str,
pitch_change: int,
keep_files: bool,
main_gain: float = 0,
backup_gain: float = 0,
inst_gain: float = 0,
index_rate: float = 0.5,
filter_radius: int = 3,
rms_mix_rate: float = 0.25,
f0_method: str = "rmvpe",
crepe_hop_length: int = 128,
protect: float = 0.33,
output_format: str = "mp3",
progress: Optional[Union[tqdm, 'GradioProgress']] = None
) -> Path:
if not song_input or not voice_model:
raise_exception("Song input and voice model are required")
handle_progress(progress, description="Starting pipeline")
parsed = urlparse(song_input)
if parsed.scheme.startswith("http"):
inp_type = "yt"
sid = get_youtube_video_id(song_input)
if not sid:
raise_exception("Invalid YouTube URL")
song_input = str(await yt_download(song_input.split("&")[0], progress=progress))
else:
inp_type = "local"
song_input = song_input.strip('"')
path = Path(song_input)
if path.exists():
sid = get_hash(path)
else:
raise_exception(f"File not found: {song_input}")
orig = Path(song_input)
song_dir = os.path.join(OUTPUT_DIR, sid)
ai_vocals = Path(os.path.join(OUTPUT_DIR, f"Cover_{orig.stem}_{voice_model}.wav"))
voice_change(
voice_model,
orig,
ai_vocals,
pitch_change,
f0_method,
index_rate,
filter_radius,
rms_mix_rate,
protect,
crepe_hop_length,
progress
)
print(f"Output saved: {ai_vocals}")
return ai_vocals
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate a song cover using voice conversion.")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
infer_parser = subparsers.add_parser("infer", help="RVC Inference")
infer_parser.add_argument("song_input", help="YouTube URL or local audio file path")
infer_parser.add_argument("voice_model", help="Name of the RVC voice model")
infer_parser.add_argument("--pitch_change", type=int, default=0, help="Pitch change in semitones")
infer_parser.add_argument("--keep_files", action="store_true", help="Keep intermediate files")
infer_parser.add_argument(
"--output_type", choices=["full", "vocals"], default="full",
help="Output type: full song or vocals only"
)
infer_parser.add_argument("--main_gain", type=float, default=0, help="Main vocals gain (dB)")
infer_parser.add_argument("--backup_gain", type=float, default=0, help="Backup vocals gain (dB)")
infer_parser.add_argument("--inst_gain", type=float, default=0, help="Instrumental gain (dB)")
infer_parser.add_argument("--index_rate", type=float, default=0.5, help="Index rate for voice conversion")
infer_parser.add_argument("--filter_radius", type=int, default=3, help="Filter radius for voice conversion")
infer_parser.add_argument("--rms_mix_rate", type=float, default=0.25, help="RMS mix rate")
infer_parser.add_argument("--f0_method", default="rmvpe", help="F0 extraction method")
infer_parser.add_argument("--crepe_hop_length", type=int, default=128, help="CREPE hop length")
infer_parser.add_argument("--protect", type=float, default=0.33, help="Protect voiceless consonants")
infer_parser.add_argument("--output_format", default="mp3", help="Output format (e.g., mp3, wav)")
download_parser = subparsers.add_parser("download", help="RVC Model Downloader")
download_parser.add_argument("model_url", help="URL for RVC model")
download_parser.add_argument("voice_model", help="Name of the RVC voice model")
return parser.parse_args()
async def main() -> None:
args = parse_arguments()
if not args.command:
print("Please run with '-h' for help")
return
with tqdm(total=100, desc="Starting...", unit="%") as pbar:
if args.command == "infer":
pipeline = song_cover_pipeline if args.output_type == "full" else vocal_cover_pipeline
result = await pipeline(
song_input=args.song_input,
voice_model=args.voice_model,
pitch_change=args.pitch_change,
keep_files=args.keep_files,
main_gain=args.main_gain,
backup_gain=args.backup_gain,
inst_gain=args.inst_gain,
index_rate=args.index_rate,
filter_radius=args.filter_radius,
rms_mix_rate=args.rms_mix_rate,
f0_method=args.f0_method,
crepe_hop_length=args.crepe_hop_length,
protect=args.protect,
output_format=args.output_format,
progress=pbar
)
handle_progress(pbar, value=100)
print(f"Completed: {result}")
elif args.command == "download":
result = await download_online_model(
url=args.model_url,
dir_name=args.voice_model,
progress=pbar
)
handle_progress(pbar, value=100)
print("Download completed")
if __name__ == "__main__":
asyncio.run(main())