Spaces:
Running
Running
File size: 4,341 Bytes
bf0e5e2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
from pathlib import Path
from typing import Any
import librosa
import pyloudnorm as pyln
import soundfile
from numpy.typing import NDArray
from tqdm import tqdm
from config import get_config
from style_bert_vits2.logging import logger
from style_bert_vits2.utils.stdout_wrapper import SAFE_STDOUT
DEFAULT_BLOCK_SIZE: float = 0.400 # seconds
class BlockSizeException(Exception):
pass
def normalize_audio(data: NDArray[Any], sr: int):
meter = pyln.Meter(sr, block_size=DEFAULT_BLOCK_SIZE) # create BS.1770 meter
try:
loudness = meter.integrated_loudness(data)
except ValueError as e:
raise BlockSizeException(e)
data = pyln.normalize.loudness(data, loudness, -23.0)
return data
def resample(
file: Path,
input_dir: Path,
output_dir: Path,
target_sr: int,
normalize: bool,
trim: bool,
):
"""
fileを読み込んで、target_srなwavファイルに変換して、
output_dirの中に、input_dirからの相対パスを保つように保存する
"""
try:
# librosaが読めるファイルかチェック
# wav以外にもmp3やoggやflacなども読める
wav: NDArray[Any]
sr: int
wav, sr = librosa.load(file, sr=target_sr)
if normalize:
try:
wav = normalize_audio(wav, sr)
except BlockSizeException:
print("")
logger.info(
f"Skip normalize due to less than {DEFAULT_BLOCK_SIZE} second audio: {file}"
)
if trim:
wav, _ = librosa.effects.trim(wav, top_db=30)
relative_path = file.relative_to(input_dir)
# ここで拡張子が.wav以外でも.wavに置き換えられる
output_path = output_dir / relative_path.with_suffix(".wav")
output_path.parent.mkdir(parents=True, exist_ok=True)
soundfile.write(output_path, wav, sr)
except Exception as e:
logger.warning(f"Cannot load file, so skipping: {file}, {e}")
if __name__ == "__main__":
config = get_config()
parser = argparse.ArgumentParser()
parser.add_argument(
"--sr",
type=int,
default=config.resample_config.sampling_rate,
help="sampling rate",
)
parser.add_argument(
"--input_dir",
"-i",
type=str,
default=config.resample_config.in_dir,
help="path to source dir",
)
parser.add_argument(
"--output_dir",
"-o",
type=str,
default=config.resample_config.out_dir,
help="path to target dir",
)
parser.add_argument(
"--num_processes",
type=int,
default=4,
help="cpu_processes",
)
parser.add_argument(
"--normalize",
action="store_true",
default=False,
help="loudness normalize audio",
)
parser.add_argument(
"--trim",
action="store_true",
default=False,
help="trim silence (start and end only)",
)
args = parser.parse_args()
if args.num_processes == 0:
processes = cpu_count() - 2 if cpu_count() > 4 else 1
else:
processes: int = args.num_processes
input_dir = Path(args.input_dir)
output_dir = Path(args.output_dir)
logger.info(f"Resampling {input_dir} to {output_dir}")
sr = int(args.sr)
normalize: bool = args.normalize
trim: bool = args.trim
# 後でlibrosaに読ませて有効な音声ファイルかチェックするので、全てのファイルを取得
original_files = [f for f in input_dir.rglob("*") if f.is_file()]
if len(original_files) == 0:
logger.error(f"No files found in {input_dir}")
raise ValueError(f"No files found in {input_dir}")
output_dir.mkdir(parents=True, exist_ok=True)
with ThreadPoolExecutor(max_workers=processes) as executor:
futures = [
executor.submit(resample, file, input_dir, output_dir, sr, normalize, trim)
for file in original_files
]
for future in tqdm(
as_completed(futures), total=len(original_files), file=SAFE_STDOUT
):
pass
logger.info("Resampling Done!")
|