Spaces:
Running
Running
| import subprocess | |
| import os | |
| import sys | |
| import errno | |
| import shutil | |
| from mega import Mega | |
| import datetime | |
| import unicodedata | |
| import torch | |
| import glob | |
| import gradio as gr | |
| import gdown | |
| import zipfile | |
| import traceback | |
| import json | |
| import requests | |
| import wget | |
| import ffmpeg | |
| import hashlib | |
| now_dir = os.getcwd() | |
| sys.path.append(now_dir) | |
| from unidecode import unidecode | |
| import re | |
| import time | |
| from infer.modules.vc.pipeline import Pipeline | |
| VC = Pipeline | |
| from lib.infer_pack.models import ( | |
| SynthesizerTrnMs256NSFsid, | |
| SynthesizerTrnMs256NSFsid_nono, | |
| SynthesizerTrnMs768NSFsid, | |
| SynthesizerTrnMs768NSFsid_nono, | |
| ) | |
| from configs.config import Config | |
| from huggingface_hub import HfApi, list_models | |
| from huggingface_hub import login | |
| from bs4 import BeautifulSoup | |
| from sklearn.cluster import MiniBatchKMeans | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| config = Config() | |
| tmp = os.path.join(now_dir, "TEMP") | |
| shutil.rmtree(tmp, ignore_errors=True) | |
| os.environ["TEMP"] = tmp | |
| weight_root = os.getenv("weight_root") | |
| index_root = os.getenv("index_root") | |
| audio_root = "audios" | |
| names = [] | |
| for name in os.listdir(weight_root): | |
| if name.endswith(".pth"): | |
| names.append(name) | |
| index_paths = [] | |
| global indexes_list | |
| indexes_list = [] | |
| audio_paths = [] | |
| for root, dirs, files in os.walk(index_root, topdown=False): | |
| for name in files: | |
| if name.endswith(".index") and "trained" not in name: | |
| index_paths.append("%s\\%s" % (root, name)) | |
| for root, dirs, files in os.walk(audio_root, topdown=False): | |
| for name in files: | |
| audio_paths.append("%s/%s" % (root, name)) | |
| def calculate_md5(file_path): | |
| hash_md5 = hashlib.md5() | |
| with open(file_path, "rb") as f: | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_md5.update(chunk) | |
| return hash_md5.hexdigest() | |
| def format_title(title): | |
| formatted_title = re.sub(r'[^\w\s-]', '', title) | |
| formatted_title = formatted_title.replace(" ", "_") | |
| return formatted_title | |
| def silentremove(filename): | |
| try: | |
| os.remove(filename) | |
| except OSError as e: | |
| if e.errno != errno.ENOENT: | |
| raise | |
| def get_md5(temp_folder): | |
| for root, subfolders, files in os.walk(temp_folder): | |
| for file in files: | |
| if not file.startswith("G_") and not file.startswith("D_") and file.endswith(".pth") and not "_G_" in file and not "_D_" in file: | |
| md5_hash = calculate_md5(os.path.join(root, file)) | |
| return md5_hash | |
| return None | |
| def find_parent(search_dir, file_name): | |
| for dirpath, dirnames, filenames in os.walk(search_dir): | |
| if file_name in filenames: | |
| return os.path.abspath(dirpath) | |
| return None | |
| def find_folder_parent(search_dir, folder_name): | |
| for dirpath, dirnames, filenames in os.walk(search_dir): | |
| if folder_name in dirnames: | |
| return os.path.abspath(dirpath) | |
| return None | |
| def delete_large_files(directory_path, max_size_megabytes): | |
| for filename in os.listdir(directory_path): | |
| file_path = os.path.join(directory_path, filename) | |
| if os.path.isfile(file_path): | |
| size_in_bytes = os.path.getsize(file_path) | |
| size_in_megabytes = size_in_bytes / (1024 * 1024) # Convert bytes to megabytes | |
| if size_in_megabytes > max_size_megabytes: | |
| print("###################################") | |
| print(f"Deleting s*** {filename} (Size: {size_in_megabytes:.2f} MB)") | |
| os.remove(file_path) | |
| print("###################################") | |
| def download_from_url(url): | |
| parent_path = find_folder_parent(".", "pretrained_v2") | |
| zips_path = os.path.join(parent_path, 'zips') | |
| print(f"Limit download size in MB {os.getenv('MAX_DOWNLOAD_SIZE')}, duplicate the space for modify the limit") | |
| if url != '': | |
| print("Downloading the file: " + f"{url}") | |
| if "drive.google.com" in url: | |
| if "file/d/" in url: | |
| file_id = url.split("file/d/")[1].split("/")[0] | |
| elif "id=" in url: | |
| file_id = url.split("id=")[1].split("&")[0] | |
| else: | |
| return None | |
| if file_id: | |
| os.chdir('./zips') | |
| result = subprocess.run(["gdown", f"https://drive.google.com/uc?id={file_id}", "--fuzzy"], capture_output=True, text=True, encoding='utf-8') | |
| if "Too many users have viewed or downloaded this file recently" in str(result.stderr): | |
| return "too much use" | |
| if "Cannot retrieve the public link of the file." in str(result.stderr): | |
| return "private link" | |
| print(result.stderr) | |
| elif "/blob/" in url: | |
| os.chdir('./zips') | |
| url = url.replace("blob", "resolve") | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| file_name = url.split('/')[-1] | |
| with open(os.path.join(zips_path, file_name), "wb") as newfile: | |
| newfile.write(response.content) | |
| else: | |
| os.chdir(parent_path) | |
| elif "mega.nz" in url: | |
| if "#!" in url: | |
| file_id = url.split("#!")[1].split("!")[0] | |
| elif "file/" in url: | |
| file_id = url.split("file/")[1].split("/")[0] | |
| else: | |
| return None | |
| if file_id: | |
| m = Mega() | |
| m.download_url(url, zips_path) | |
| elif "/tree/main" in url: | |
| response = requests.get(url) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| temp_url = '' | |
| for link in soup.find_all('a', href=True): | |
| if link['href'].endswith('.zip'): | |
| temp_url = link['href'] | |
| break | |
| if temp_url: | |
| url = temp_url | |
| url = url.replace("blob", "resolve") | |
| if "huggingface.co" not in url: | |
| url = "https://huggingface.co" + url | |
| wget.download(url) | |
| else: | |
| print("No .zip file found on the page.") | |
| elif "cdn.discordapp.com" in url: | |
| file = requests.get(url) | |
| if file.status_code == 200: | |
| name = url.split('/') | |
| with open(os.path.join(zips_path, name[len(name)-1]), "wb") as newfile: | |
| newfile.write(file.content) | |
| else: | |
| return None | |
| elif "pixeldrain.com" in url: | |
| try: | |
| file_id = url.split("pixeldrain.com/u/")[1] | |
| os.chdir('./zips') | |
| print(file_id) | |
| response = requests.get(f"https://pixeldrain.com/api/file/{file_id}") | |
| if response.status_code == 200: | |
| file_name = response.headers.get("Content-Disposition").split('filename=')[-1].strip('";') | |
| if not os.path.exists(zips_path): | |
| os.makedirs(zips_path) | |
| with open(os.path.join(zips_path, file_name), "wb") as newfile: | |
| newfile.write(response.content) | |
| os.chdir(parent_path) | |
| return "downloaded" | |
| else: | |
| os.chdir(parent_path) | |
| return None | |
| except Exception as e: | |
| print(e) | |
| os.chdir(parent_path) | |
| return None | |
| else: | |
| os.chdir('./zips') | |
| wget.download(url) | |
| delete_large_files(zips_path, int(os.getenv("MAX_DOWNLOAD_SIZE"))) | |
| os.chdir(parent_path) | |
| print("Full download") | |
| return "downloaded" | |
| else: | |
| return None | |
| class error_message(Exception): | |
| def __init__(self, mensaje): | |
| self.mensaje = mensaje | |
| super().__init__(mensaje) | |
| def get_vc(sid, to_return_protect0, to_return_protect1): | |
| global n_spk, tgt_sr, net_g, vc, cpt, version | |
| if sid == "" or sid == []: | |
| global hubert_model | |
| if hubert_model is not None: | |
| print("clean_empty_cache") | |
| del net_g, n_spk, vc, hubert_model, tgt_sr | |
| hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| if_f0 = cpt.get("f0", 1) | |
| version = cpt.get("version", "v1") | |
| if version == "v1": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs256NSFsid( | |
| *cpt["config"], is_half=config.is_half | |
| ) | |
| else: | |
| net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
| elif version == "v2": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs768NSFsid( | |
| *cpt["config"], is_half=config.is_half | |
| ) | |
| else: | |
| net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
| del net_g, cpt | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| cpt = None | |
| return ( | |
| {"visible": False, "__type__": "update"}, | |
| {"visible": False, "__type__": "update"}, | |
| {"visible": False, "__type__": "update"}, | |
| ) | |
| person = "%s/%s" % (weight_root, sid) | |
| print("loading %s" % person) | |
| cpt = torch.load(person, map_location="cpu") | |
| tgt_sr = cpt["config"][-1] | |
| cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] | |
| if_f0 = cpt.get("f0", 1) | |
| if if_f0 == 0: | |
| to_return_protect0 = to_return_protect1 = { | |
| "visible": False, | |
| "value": 0.5, | |
| "__type__": "update", | |
| } | |
| else: | |
| to_return_protect0 = { | |
| "visible": True, | |
| "value": to_return_protect0, | |
| "__type__": "update", | |
| } | |
| to_return_protect1 = { | |
| "visible": True, | |
| "value": to_return_protect1, | |
| "__type__": "update", | |
| } | |
| version = cpt.get("version", "v1") | |
| if version == "v1": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) | |
| else: | |
| net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
| elif version == "v2": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) | |
| else: | |
| net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
| del net_g.enc_q | |
| print(net_g.load_state_dict(cpt["weight"], strict=False)) | |
| net_g.eval().to(config.device) | |
| if config.is_half: | |
| net_g = net_g.half() | |
| else: | |
| net_g = net_g.float() | |
| vc = VC(tgt_sr, config) | |
| n_spk = cpt["config"][-3] | |
| return ( | |
| {"visible": True, "maximum": n_spk, "__type__": "update"}, | |
| to_return_protect0, | |
| to_return_protect1, | |
| ) | |
| def load_downloaded_model(url): | |
| parent_path = find_folder_parent(".", "pretrained_v2") | |
| try: | |
| infos = [] | |
| logs_folders = ['0_gt_wavs','1_16k_wavs','2a_f0','2b-f0nsf','3_feature256','3_feature768'] | |
| zips_path = os.path.join(parent_path, 'zips') | |
| unzips_path = os.path.join(parent_path, 'unzips') | |
| weights_path = os.path.join(parent_path, 'weights') | |
| logs_dir = "" | |
| if os.path.exists(zips_path): | |
| shutil.rmtree(zips_path) | |
| if os.path.exists(unzips_path): | |
| shutil.rmtree(unzips_path) | |
| os.mkdir(zips_path) | |
| os.mkdir(unzips_path) | |
| download_file = download_from_url(url) | |
| if not download_file: | |
| print("The file could not be downloaded.") | |
| infos.append("The file could not be downloaded.") | |
| yield "\n".join(infos) | |
| elif download_file == "downloaded": | |
| print("It has been downloaded successfully.") | |
| infos.append("It has been downloaded successfully.") | |
| yield "\n".join(infos) | |
| elif download_file == "too much use": | |
| raise Exception("Too many users have recently viewed or downloaded this file") | |
| elif download_file == "private link": | |
| raise Exception("Cannot get file from this private link") | |
| for filename in os.listdir(zips_path): | |
| if filename.endswith(".zip"): | |
| zipfile_path = os.path.join(zips_path,filename) | |
| print("Proceeding with the extraction...") | |
| infos.append("Proceeding with the extraction...") | |
| shutil.unpack_archive(zipfile_path, unzips_path, 'zip') | |
| model_name = os.path.basename(zipfile_path) | |
| logs_dir = os.path.join(parent_path,'logs', os.path.normpath(str(model_name).replace(".zip",""))) | |
| yield "\n".join(infos) | |
| else: | |
| print("Unzip error.") | |
| infos.append("Unzip error.") | |
| yield "\n".join(infos) | |
| index_file = False | |
| model_file = False | |
| D_file = False | |
| G_file = False | |
| for path, subdirs, files in os.walk(unzips_path): | |
| for item in files: | |
| item_path = os.path.join(path, item) | |
| if not 'G_' in item and not 'D_' in item and item.endswith('.pth'): | |
| model_file = True | |
| model_name = item.replace(".pth","") | |
| logs_dir = os.path.join(parent_path,'logs', model_name) | |
| if os.path.exists(logs_dir): | |
| shutil.rmtree(logs_dir) | |
| os.mkdir(logs_dir) | |
| if not os.path.exists(weights_path): | |
| os.mkdir(weights_path) | |
| if os.path.exists(os.path.join(weights_path, item)): | |
| os.remove(os.path.join(weights_path, item)) | |
| if os.path.exists(item_path): | |
| shutil.move(item_path, weights_path) | |
| if not model_file and not os.path.exists(logs_dir): | |
| os.mkdir(logs_dir) | |
| for path, subdirs, files in os.walk(unzips_path): | |
| for item in files: | |
| item_path = os.path.join(path, item) | |
| if item.startswith('added_') and item.endswith('.index'): | |
| index_file = True | |
| if os.path.exists(item_path): | |
| if os.path.exists(os.path.join(logs_dir, item)): | |
| os.remove(os.path.join(logs_dir, item)) | |
| shutil.move(item_path, logs_dir) | |
| if item.startswith('total_fea.npy') or item.startswith('events.'): | |
| if os.path.exists(item_path): | |
| if os.path.exists(os.path.join(logs_dir, item)): | |
| os.remove(os.path.join(logs_dir, item)) | |
| shutil.move(item_path, logs_dir) | |
| result = "" | |
| if model_file: | |
| if index_file: | |
| print("The model works for inference, and has the .index file.") | |
| infos.append("\n" + "The model works for inference, and has the .index file.") | |
| yield "\n".join(infos) | |
| else: | |
| print("The model works for inference, but it doesn't have the .index file.") | |
| infos.append("\n" + "The model works for inference, but it doesn't have the .index file.") | |
| yield "\n".join(infos) | |
| if not index_file and not model_file: | |
| print("No relevant file was found to upload.") | |
| infos.append("No relevant file was found to upload.") | |
| yield "\n".join(infos) | |
| if os.path.exists(zips_path): | |
| shutil.rmtree(zips_path) | |
| if os.path.exists(unzips_path): | |
| shutil.rmtree(unzips_path) | |
| os.chdir(parent_path) | |
| return result | |
| except Exception as e: | |
| os.chdir(parent_path) | |
| if "too much use" in str(e): | |
| print("Too many users have recently viewed or downloaded this file") | |
| yield "Too many users have recently viewed or downloaded this file" | |
| elif "private link" in str(e): | |
| print("Cannot get file from this private link") | |
| yield "Cannot get file from this private link" | |
| else: | |
| print(e) | |
| yield "An error occurred downloading" | |
| finally: | |
| os.chdir(parent_path) | |
| def save_to_wav(record_button): | |
| if record_button is None: | |
| pass | |
| else: | |
| path_to_file=record_button | |
| new_name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")+'.wav' | |
| new_path='./audios/'+new_name | |
| shutil.move(path_to_file,new_path) | |
| return new_name | |
| def change_choices2(): | |
| audio_paths=[] | |
| for filename in os.listdir("./audios"): | |
| if filename.endswith(('wav', 'mp3', 'flac', 'ogg', 'opus', | |
| 'm4a', 'mp4', 'aac', 'alac', 'wma', | |
| 'aiff', 'webm', 'ac3')): | |
| audio_paths.append(os.path.join('./audios',filename).replace('\\', '/')) | |
| return {"choices": sorted(audio_paths), "__type__": "update"}, {"__type__": "update"} | |
| sup_audioext = {'wav', 'mp3', 'flac', 'ogg', 'opus', | |
| 'm4a', 'mp4', 'aac', 'alac', 'wma', | |
| 'aiff', 'webm', 'ac3'} | |
| def load_downloaded_audio(url): | |
| parent_path = find_folder_parent(".", "pretrained_v2") | |
| try: | |
| infos = [] | |
| audios_path = os.path.join(parent_path, 'audios') | |
| zips_path = os.path.join(parent_path, 'zips') | |
| if not os.path.exists(audios_path): | |
| os.mkdir(audios_path) | |
| download_file = download_from_url(url) | |
| if not download_file: | |
| print("The file could not be downloaded.") | |
| infos.append("The file could not be downloaded.") | |
| yield "\n".join(infos) | |
| elif download_file == "downloaded": | |
| print("It has been downloaded successfully.") | |
| infos.append("It has been downloaded successfully.") | |
| yield "\n".join(infos) | |
| elif download_file == "too much use": | |
| raise Exception("Too many users have recently viewed or downloaded this file") | |
| elif download_file == "private link": | |
| raise Exception("Cannot get file from this private link") | |
| for filename in os.listdir(zips_path): | |
| item_path = os.path.join(zips_path, filename) | |
| if item_path.split('.')[-1] in sup_audioext: | |
| if os.path.exists(item_path): | |
| shutil.move(item_path, audios_path) | |
| result = "" | |
| print("Audio files have been moved to the 'audios' folder.") | |
| infos.append("Audio files have been moved to the 'audios' folder.") | |
| yield "\n".join(infos) | |
| os.chdir(parent_path) | |
| return result | |
| except Exception as e: | |
| os.chdir(parent_path) | |
| if "too much use" in str(e): | |
| print("Too many users have recently viewed or downloaded this file") | |
| yield "Too many users have recently viewed or downloaded this file" | |
| elif "private link" in str(e): | |
| print("Cannot get file from this private link") | |
| yield "Cannot get file from this private link" | |
| else: | |
| print(e) | |
| yield "An error occurred downloading" | |
| finally: | |
| os.chdir(parent_path) | |
| class error_message(Exception): | |
| def __init__(self, mensaje): | |
| self.mensaje = mensaje | |
| super().__init__(mensaje) | |
| def get_vc(sid, to_return_protect0, to_return_protect1): | |
| global n_spk, tgt_sr, net_g, vc, cpt, version | |
| if sid == "" or sid == []: | |
| global hubert_model | |
| if hubert_model is not None: | |
| print("clean_empty_cache") | |
| del net_g, n_spk, vc, hubert_model, tgt_sr | |
| hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| if_f0 = cpt.get("f0", 1) | |
| version = cpt.get("version", "v1") | |
| if version == "v1": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs256NSFsid( | |
| *cpt["config"], is_half=config.is_half | |
| ) | |
| else: | |
| net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
| elif version == "v2": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs768NSFsid( | |
| *cpt["config"], is_half=config.is_half | |
| ) | |
| else: | |
| net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
| del net_g, cpt | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| cpt = None | |
| return ( | |
| {"visible": False, "__type__": "update"}, | |
| {"visible": False, "__type__": "update"}, | |
| {"visible": False, "__type__": "update"}, | |
| ) | |
| person = "%s/%s" % (weight_root, sid) | |
| print("loading %s" % person) | |
| cpt = torch.load(person, map_location="cpu") | |
| tgt_sr = cpt["config"][-1] | |
| cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] | |
| if_f0 = cpt.get("f0", 1) | |
| if if_f0 == 0: | |
| to_return_protect0 = to_return_protect1 = { | |
| "visible": False, | |
| "value": 0.5, | |
| "__type__": "update", | |
| } | |
| else: | |
| to_return_protect0 = { | |
| "visible": True, | |
| "value": to_return_protect0, | |
| "__type__": "update", | |
| } | |
| to_return_protect1 = { | |
| "visible": True, | |
| "value": to_return_protect1, | |
| "__type__": "update", | |
| } | |
| version = cpt.get("version", "v1") | |
| if version == "v1": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) | |
| else: | |
| net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
| elif version == "v2": | |
| if if_f0 == 1: | |
| net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) | |
| else: | |
| net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
| del net_g.enc_q | |
| print(net_g.load_state_dict(cpt["weight"], strict=False)) | |
| net_g.eval().to(config.device) | |
| if config.is_half: | |
| net_g = net_g.half() | |
| else: | |
| net_g = net_g.float() | |
| vc = VC(tgt_sr, config) | |
| n_spk = cpt["config"][-3] | |
| return ( | |
| {"visible": True, "maximum": n_spk, "__type__": "update"}, | |
| to_return_protect0, | |
| to_return_protect1, | |
| ) | |
| def download_model(): | |
| gr.Markdown(value="# " + "Download Model") | |
| gr.Markdown(value="It is used to download your inference models.") | |
| with gr.Row(): | |
| model_url=gr.Textbox(label="Url:") | |
| with gr.Row(): | |
| download_model_status_bar=gr.Textbox(label="Status:") | |
| with gr.Row(): | |
| download_button=gr.Button("Download") | |
| download_button.click(fn=load_downloaded_model, inputs=[model_url], outputs=[download_model_status_bar]) | |
| def download_audio(): | |
| gr.Markdown(value="# " + "Download Audio") | |
| gr.Markdown(value="Download audios of any format for use in inference (Recommended for Mobile Users).") | |
| with gr.Row(): | |
| audio_url=gr.Textbox(label="Url:") | |
| with gr.Row(): | |
| download_audio_status_bar=gr.Textbox(label="Status:") | |
| with gr.Row(): | |
| download_button2=gr.Button("Download") | |
| download_button2.click(fn=load_downloaded_audio, inputs=[audio_url], outputs=[download_audio_status_bar]) | |
| def get_edge_voice(): | |
| completed_process = subprocess.run(['edge-tts',"-l"], capture_output=True, text=True) | |
| lines = completed_process.stdout.strip().split("\n") | |
| data = [] | |
| current_entry = {} | |
| for line in lines: | |
| if line.startswith("Name: "): | |
| if current_entry: | |
| data.append(current_entry) | |
| current_entry = {"Name": line.split(": ")[1]} | |
| elif line.startswith("Gender: "): | |
| current_entry["Gender"] = line.split(": ")[1] | |
| if current_entry: | |
| data.append(current_entry) | |
| tts_voice = [] | |
| for entry in data: | |
| name = entry["Name"] | |
| gender = entry["Gender"] | |
| formatted_entry = f'{name}-{gender}' | |
| tts_voice.append(formatted_entry) | |
| return tts_voice |