Spaces:
Configuration error
Configuration error
import argparse | |
import logging | |
import os | |
import sys | |
import shutil | |
import zipfile | |
import re | |
import yt_dlp | |
from pathlib import Path | |
from urllib.parse import urlparse | |
from functools import lru_cache | |
import gradio as gr | |
import requests | |
from traceback import format_exc | |
import asyncio | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[logging.StreamHandler(), logging.FileHandler('neorvc.log')] | |
) | |
logger = logging.getLogger(__name__) | |
# Base directory setup | |
BASE_DIR = os.getcwd() | |
sys.path.append(BASE_DIR) | |
try: | |
from neorvc.main_cli import song_cover_pipeline, vocal_cover_pipeline | |
from rvc.rvc_cli import run_prerequisites_script | |
except ImportError as e: | |
logger.error(f"Failed to import pipeline functions: {e}") | |
raise gr.Error(f"Failed to load required modules. Please check installation: {e}") | |
# Run prerequisites | |
try: | |
run_prerequisites_script( | |
models=True, | |
exe=False, | |
) | |
except Exception as e: | |
logger.error(f"Error running prerequisites: {e}") | |
raise gr.Error(f"Failed to initialize prerequisites: {e}") | |
# Directory configuration | |
CONFIG = { | |
"models_dir": "logs", | |
"output_dir": "song_output", | |
"max_upload_size_mb": 500 | |
} | |
RVC_MODELS_DIR = os.path.join(BASE_DIR, CONFIG['models_dir']) | |
OUTPUT_DIR = os.path.join(BASE_DIR, CONFIG['output_dir']) | |
ALLOWED_DIR = os.path.abspath(BASE_DIR) | |
AUDIO_EXTS = ['.mp3', '.wav', '.flac', '.ogg', '.m4a'] | |
# Ensure directories exist | |
try: | |
os.makedirs(RVC_MODELS_DIR, exist_ok=True) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
except OSError as e: | |
logger.error(f"Error creating directories: {e}") | |
raise gr.Error(f"Failed to create required directories: {e}") | |
def get_current_models(models_dir): | |
"""Retrieve list of model directories, excluding specific items.""" | |
try: | |
models_list = [item for item in os.listdir(models_dir) if item != 'mute' and os.path.isdir(os.path.join(models_dir, item))] | |
return sorted(models_list) | |
except OSError as e: | |
logger.error(f"Error accessing models directory: {e}\n{format_exc()}") | |
raise gr.Error(f"Failed to list models. Check directory permissions: {e}") | |
def update_models_list(): | |
"""Update the dropdown list of available models.""" | |
try: | |
get_current_models.cache_clear() | |
models = get_current_models(RVC_MODELS_DIR) | |
if not models: | |
return gr.update(choices=[], value=None), "No models found in the directory." | |
return gr.update(choices=models, value=None), "Model list refreshed successfully." | |
except Exception as e: | |
logger.error(f"Error updating models list: {e}\n{format_exc()}") | |
return gr.update(choices=[]), f"Failed to refresh models: {e}" | |
def sanitize_model_name(dir_name): | |
"""Sanitize model name to prevent invalid characters.""" | |
if not dir_name: | |
raise gr.Error("Model name cannot be empty.") | |
if not re.match(r'^[a-zA-Z0-9_-]+$', dir_name): | |
raise gr.Error("Invalid model name. Use alphanumeric characters, underscores, or hyphens only.") | |
return dir_name | |
def validate_file_path(file_path): | |
"""Ensure file path is within allowed directory.""" | |
try: | |
file_path = os.path.abspath(file_path) | |
if not os.path.commonpath([file_path, ALLOWED_DIR]).startswith(ALLOWED_DIR): | |
raise gr.Error("File path is outside allowed directory.") | |
return file_path | |
except Exception as e: | |
logger.error(f"Invalid file path: {e}\n{format_exc()}") | |
raise gr.Error(f"Invalid file path: {e}") | |
def extract_zip(extraction_folder, zip_path, progress=gr.Progress()): | |
"""Extract zip file and organize model files.""" | |
try: | |
extraction_folder = validate_file_path(extraction_folder) | |
zip_path = validate_file_path(zip_path) | |
os.makedirs(extraction_folder, exist_ok=True) | |
progress(0.2, desc="Extracting zip file...") | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(extraction_folder) | |
os.remove(zip_path) | |
except (zipfile.BadZipFile, OSError) as e: | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
logger.error(f"Error extracting zip: {e}\n{format_exc()}") | |
raise gr.Error(f"Failed to extract zip file. Ensure it's a valid zip: {e}") | |
index_filepath = model_filepath = None | |
try: | |
for root, _, files in os.walk(extraction_folder): | |
for name in files: | |
file_path = os.path.join(root, name) | |
try: | |
if name.endswith('.index') and os.path.getsize(file_path) > 1024 * 100: | |
index_filepath = file_path | |
elif name.endswith('.pth') and os.path.getsize(file_path) > 1024 * 1024 * 40: | |
model_filepath = file_path | |
except OSError as sub_e: | |
logger.warning(f"Error accessing file {file_path}: {sub_e}") | |
continue | |
except Exception as e: | |
logger.error(f"Error scanning extracted files: {e}\n{format_exc()}") | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
raise gr.Error(f"Failed to process extracted files: {e}") | |
if not model_filepath: | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
raise gr.Error(f"No valid .pth model file found in {extraction_folder}. Ensure a model file (>40MB) is included.") | |
try: | |
for filepath in (model_filepath, index_filepath): | |
if filepath: | |
new_path = os.path.join(extraction_folder, os.path.basename(filepath)) | |
if filepath != new_path: | |
os.rename(filepath, new_path) | |
for item in os.listdir(extraction_folder): | |
item_path = os.path.join(extraction_folder, item) | |
if os.path.isdir(item_path): | |
shutil.rmtree(item_path, ignore_errors=True) | |
except OSError as e: | |
logger.error(f"Error organizing extracted files: {e}\n{format_exc()}") | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
raise gr.Error(f"Failed to organize model files: {e}") | |
progress(1.0, desc="Zip extraction completed") | |
return f"Model extracted to {extraction_folder}" | |
def download_online_model(url, dir_name, progress=gr.Progress()): | |
"""Download and extract a model from a URL synchronously.""" | |
try: | |
dir_name = sanitize_model_name(dir_name) | |
if not url: | |
raise gr.Error("URL is required.") | |
if not url.startswith(('http://', 'https://')): | |
raise gr.Error("Invalid URL format. Must start with http:// or https://.") | |
extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) | |
if os.path.exists(extraction_folder): | |
raise gr.Error(f"Model directory '{dir_name}' already exists! Choose a different name.") | |
progress(0.1, desc=f"Preparing to download '{dir_name}'...") | |
zip_name = urlparse(url).path.split('/')[-1] | |
if 'pixeldrain.com' in url: | |
zip_name = os.path.basename(zip_name) | |
url = f'https://pixeldrain.com/api/file/{zip_name}' | |
zip_path = os.path.join(OUTPUT_DIR, zip_name) | |
progress(0.2, desc="Downloading model...") | |
response = requests.get(url, stream=True, timeout=600) | |
response.raise_for_status() | |
total_size = int(response.headers.get('content-length', 0)) | |
downloaded = 0 | |
with open(zip_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if total_size: | |
progress(0.2 + 0.6 * (downloaded / total_size), desc="Downloading...") | |
progress(0.8, desc="Extracting model...") | |
extract_zip(extraction_folder, zip_path, progress) | |
progress(1.0, desc="Download completed") | |
return f"Model '{dir_name}' successfully downloaded and extracted!", "Model download completed." | |
except requests.exceptions.RequestException as e: | |
logger.error(f"HTTP error during download: {e}\n{format_exc()}") | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
raise gr.Error(f"Failed to download model: {e}") | |
except Exception as e: | |
logger.error(f"Download failed: {e}\n{format_exc()}") | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
raise gr.Error(f"Failed to download model: {e}") | |
def upload_local_model(zip_file, dir_name, progress=gr.Progress()): | |
"""Upload and extract a local model zip file.""" | |
try: | |
dir_name = sanitize_model_name(dir_name) | |
if not zip_file: | |
raise gr.Error("No file uploaded. Please select a zip file.") | |
zip_path = zip_file.name | |
if not os.path.exists(zip_path): | |
raise gr.Error("Uploaded file not found. Please try again.") | |
if os.path.getsize(zip_path) > CONFIG['max_upload_size_mb'] * 1024 * 1024: | |
raise gr.Error(f"File size exceeds {CONFIG['max_upload_size_mb']}MB limit.") | |
extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) | |
if os.path.exists(extraction_folder): | |
raise gr.Error(f"Model directory '{dir_name}' already exists! Choose a different name.") | |
progress(0.5, desc="Processing uploaded file...") | |
extract_zip(extraction_folder, zip_path, progress) | |
return f"Model '{dir_name}' successfully uploaded and extracted!", "Model upload completed." | |
except Exception as e: | |
logger.error(f"Upload failed: {e}\n{format_exc()}") | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
raise gr.Error(f"Failed to upload model: {e}") | |
def delete_model(model_name): | |
"""Delete a model directory.""" | |
try: | |
if not model_name: | |
raise gr.Error("No model selected. Please choose a model to delete.") | |
model_path = os.path.join(RVC_MODELS_DIR, model_name) | |
if not os.path.exists(model_path): | |
raise gr.Error(f"Model '{model_name}' not found.") | |
shutil.rmtree(model_path) | |
return f"Model '{model_name}' deleted successfully!", "Model deletion completed." | |
except OSError as e: | |
logger.error(f"Failed to delete model: {e}\n{format_exc()}") | |
raise gr.Error(f"Failed to delete model: {e}") | |
def swap_visibility(): | |
"""Toggle visibility of YouTube link and file upload columns.""" | |
return gr.update(visible=True), gr.update(visible=False), gr.update(value=''), gr.update(value=None) | |
def process_file_upload(file): | |
"""Handle file upload and update UI.""" | |
try: | |
if not file: | |
raise gr.Error("No file uploaded. Please select an audio file.") | |
file_path = file.name | |
if os.path.splitext(file_path)[1].lower() not in AUDIO_EXTS: | |
raise gr.Error(f"Unsupported file format. Supported formats: {', '.join(AUDIO_EXTS)}") | |
return file_path, gr.update(value=file_path) | |
except Exception as e: | |
logger.error(f"File upload processing failed: {e}\n{format_exc()}") | |
raise gr.Error(f"Failed to process uploaded file: {e}") | |
def show_hop_slider(pitch_detection_algo): | |
"""Show/hide crepe hop length slider based on pitch detection algorithm.""" | |
return gr.update(visible=pitch_detection_algo == 'crepe') | |
async def run_async_pipeline(pipeline, **kwargs): | |
"""Run an async pipeline.""" | |
try: | |
logger.debug(f"Running pipeline with kwargs: {kwargs}") | |
return await pipeline(**kwargs) | |
except Exception as e: | |
logger.error(f"Pipeline execution failed: {e}\n{format_exc()}") | |
raise | |
async def generate_switch( | |
song_input, rvc_model, pitch, keep_files, main_gain, | |
backup_gain, inst_gain, index_rate, filter_radius, rms_mix_rate, | |
f0_method, crepe_hop_length, protect, output_format, vocal_only, | |
progress=gr.Progress() | |
): | |
"""Run the appropriate pipeline based on vocal_only flag.""" | |
try: | |
logger.info("Starting generate_switch with inputs: " | |
f"song_input={song_input}, rvc_model={rvc_model}, vocal_only={vocal_only}") | |
# Validate inputs first | |
if not song_input: | |
raise gr.Error("Song input is required. Provide a YouTube link or file path.") | |
if not rvc_model: | |
raise gr.Error("Voice model is required. Select a model from the dropdown.") | |
# Select pipeline | |
pipeline = vocal_cover_pipeline if vocal_only else song_cover_pipeline | |
logger.info(f"Selected pipeline: {'vocal_cover_pipeline' if vocal_only else 'song_cover_pipeline'}") | |
# Validate song_input path if it's a local file | |
song_input = validate_file_path(song_input) if os.path.exists(song_input) else song_input | |
# Run the pipeline | |
progress(0.1, desc="Initializing conversion...") | |
result = await run_async_pipeline( | |
pipeline, | |
song_input=song_input, | |
voice_model=rvc_model, | |
pitch_change=pitch, | |
keep_files=keep_files, | |
main_gain=main_gain, | |
backup_gain=backup_gain, | |
inst_gain=inst_gain, | |
index_rate=index_rate, | |
filter_radius=filter_radius, | |
rms_mix_rate=rms_mix_rate, | |
f0_method=f0_method, | |
crepe_hop_length=crepe_hop_length, | |
protect=protect, | |
output_format=output_format | |
) | |
progress(1.0, desc="Conversion completed") | |
logger.info("Pipeline execution completed successfully") | |
return result, gr.Info("Conversion completed successfully.") | |
except yt_dlp.utils.DownloadError as e: | |
logger.error(f"YouTube download failed: {e}\n{format_exc()}") | |
raise gr.Error(f"Failed to download audio from YouTube. Check the URL or cookies file: {e}") | |
except Exception as e: | |
logger.error(f"Conversion failed: {e}\n{format_exc()}") | |
raise gr.Error(f"Failed to process conversion: {e}") | |
# endcode UwU | |