Spaces:
Runtime error
Runtime error
| import os | |
| import shutil | |
| import numpy as np | |
| import string | |
| import random | |
| from datetime import datetime | |
| from pyannote.audio import Model, Inference | |
| from pydub import AudioSegment | |
| class AudioProcessor(): | |
| def __init__(self,cache_dir = "/tmp/hf_cache"): | |
| hf_token = os.environ.get("HF") | |
| if hf_token is None: | |
| raise ValueError("HUGGINGFACE_HUB_TOKEN が設定されていません。") | |
| os.makedirs(cache_dir, exist_ok=True) | |
| # pyannote モデルの読み込み | |
| model = Model.from_pretrained("pyannote/embedding", use_auth_token=hf_token, cache_dir=cache_dir) | |
| self.inference = Inference(model) | |
| def cosine_similarity(self,vec1, vec2): | |
| vec1 = vec1 / np.linalg.norm(vec1) | |
| vec2 = vec2 / np.linalg.norm(vec2) | |
| return np.dot(vec1, vec2) | |
| def segment_audio(self, path, target_path='/tmp/setup_voice', seg_duration=1.0): | |
| # 出力先ディレクトリが存在していれば中身をクリアする | |
| if os.path.exists(target_path): | |
| for file in os.listdir(target_path): | |
| file_path = os.path.join(target_path, file) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| else: | |
| os.makedirs(target_path, exist_ok=True) | |
| base_sound = AudioSegment.from_file(path) | |
| duration_ms = len(base_sound) | |
| seg_duration_ms = int(seg_duration * 1000) | |
| for i, start in enumerate(range(0, duration_ms, seg_duration_ms)): | |
| end = min(start + seg_duration_ms, duration_ms) | |
| segment = base_sound[start:end] | |
| # セグメントが指定長さに満たない場合、無音でパディングする | |
| if len(segment) < seg_duration_ms: | |
| silence = AudioSegment.silent(duration=(seg_duration_ms - len(segment))) | |
| segment = segment + silence | |
| segment.export(os.path.join(target_path, f'{i}.wav'), format="wav") | |
| return target_path, duration_ms | |
| def calculate_similarity(self,path1, path2): | |
| embedding1 = self.inference(path1) | |
| embedding2 = self.inference(path2) | |
| return float(self.cosine_similarity(embedding1.data.flatten(), embedding2.data.flatten())) | |
| def generate_random_string(self,length): | |
| letters = string.ascii_letters + string.digits | |
| return ''.join(random.choice(letters) for i in range(length)) | |
| def generate_filename(self,random_length): | |
| random_string = self.generate_random_string(random_length) | |
| current_time = datetime.now().strftime("%Y%m%d%H%M%S") | |
| filename = f"{current_time}_{random_string}.wav" | |
| return filename | |
| def process_audio(self, reference_path, input_path, output_folder='/tmp/data/matched_segments', seg_duration=1.0, threshold=0.5): | |
| # 出力先ディレクトリの中身をクリアする | |
| if os.path.exists(output_folder): | |
| for file in os.listdir(output_folder): | |
| file_path = os.path.join(output_folder, file) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| else: | |
| os.makedirs(output_folder, exist_ok=True) | |
| segmented_path, total_duration_ms = self.segment_audio(input_path, seg_duration=seg_duration) | |
| matched_time_ms = 0 | |
| for file in sorted(os.listdir(segmented_path)): | |
| segment_file = os.path.join(segmented_path, file) | |
| similarity = self.calculate_similarity(segment_file, reference_path) | |
| if similarity > threshold: | |
| shutil.copy(segment_file, output_folder) | |
| matched_time_ms += len(AudioSegment.from_file(segment_file)) | |
| unmatched_time_ms = total_duration_ms - matched_time_ms | |
| return matched_time_ms, unmatched_time_ms | |
| def process_multi_audio(self, reference_pathes, input_path, output_folder='/tmp/data/matched_multi_segments', seg_duration=1.0, threshold=0.5): | |
| # 出力先ディレクトリの中身をクリアする | |
| if os.path.exists(output_folder): | |
| for file in os.listdir(output_folder): | |
| file_path = os.path.join(output_folder, file) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| else: | |
| os.makedirs(output_folder, exist_ok=True) | |
| # 入力音声をセグメントに分割 | |
| segmented_path, total_duration_ms = self.segment_audio(input_path, seg_duration=seg_duration) | |
| segment_files = sorted(os.listdir(segmented_path)) | |
| num_segments = len(segment_files) | |
| # 各リファレンスごとにセグメントとの類似度を計算し、行列 (rows: reference, columns: segment) を作成 | |
| similarity = [] | |
| for reference_path in reference_pathes: | |
| ref_similarity = [] | |
| for file in segment_files: | |
| segment_file = os.path.join(segmented_path, file) | |
| sim = self.calculate_similarity(segment_file, reference_path) | |
| ref_similarity.append(sim) | |
| similarity.append(ref_similarity) | |
| # 転置行列を作成 (rows: segment, columns: reference) | |
| similarity_transposed = [] | |
| for seg_idx in range(num_segments): | |
| seg_sim = [] | |
| for ref_idx in range(len(reference_pathes)): | |
| seg_sim.append(similarity[ref_idx][seg_idx]) | |
| similarity_transposed.append(seg_sim) | |
| # 各セグメントについて、最も高い類似度のリファレンスを選択 | |
| best_matches = [] | |
| for seg_sim in similarity_transposed: | |
| best_ref = np.argmax(seg_sim) # 最も類似度の高いリファレンスのインデックス | |
| # 閾値チェック (必要に応じて) | |
| if seg_sim[best_ref] < threshold: | |
| best_matches.append(None) # 閾値未満の場合はマッチなしとする | |
| else: | |
| best_matches.append(best_ref) | |
| # 各リファレンスごとに一致時間を集計 (セグメントごとの長さ seg_duration を加算) | |
| matched_time = [0] * len(reference_pathes) | |
| for match in best_matches: | |
| if match is not None: | |
| matched_time[match] += seg_duration | |
| return matched_time | |