|
from nemo.collections.asr.models import ASRModel |
|
import torch |
|
import gradio as gr |
|
import spaces |
|
import gc |
|
import shutil |
|
from pathlib import Path |
|
from pydub import AudioSegment |
|
import numpy as np |
|
import os |
|
import gradio.themes as gr_themes |
|
import csv |
|
import json |
|
from typing import List, Tuple |
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
MODEL_NAME="nvidia/parakeet-tdt-0.6b-v2" |
|
|
|
model = ASRModel.from_pretrained(model_name=MODEL_NAME) |
|
model.eval() |
|
|
|
def start_session(request: gr.Request): |
|
session_hash = request.session_hash |
|
|
|
base_dir = Path(__file__).parent |
|
session_dir = base_dir / "outputs" / session_hash |
|
session_dir.mkdir(parents=True, exist_ok=True) |
|
print(f"Session with hash {session_hash} started in {session_dir}") |
|
return session_dir.as_posix() |
|
|
|
def end_session(request: gr.Request): |
|
session_hash = request.session_hash |
|
base_dir = Path(__file__).parent |
|
session_dir = base_dir / "outputs" / session_hash |
|
if session_dir.exists(): |
|
print(f"Session directory {session_dir} will be preserved.") |
|
|
|
|
|
print(f"Session with hash {session_hash} ended.") |
|
|
|
def get_audio_segment(audio_path, start_second, end_second): |
|
if not audio_path or not Path(audio_path).exists(): |
|
print(f"Warning: Audio path '{audio_path}' not found or invalid for clipping.") |
|
return None |
|
try: |
|
start_ms = int(start_second * 1000) |
|
end_ms = int(end_second * 1000) |
|
|
|
start_ms = max(0, start_ms) |
|
if end_ms <= start_ms: |
|
print(f"Warning: End time ({end_second}s) is not after start time ({start_second}s). Adjusting end time.") |
|
end_ms = start_ms + 100 |
|
|
|
audio = AudioSegment.from_file(audio_path) |
|
clipped_audio = audio[start_ms:end_ms] |
|
|
|
samples = np.array(clipped_audio.get_array_of_samples()) |
|
if clipped_audio.channels == 2: |
|
samples = samples.reshape((-1, 2)).mean(axis=1).astype(samples.dtype) |
|
|
|
frame_rate = clipped_audio.frame_rate |
|
if frame_rate <= 0: |
|
print(f"Warning: Invalid frame rate ({frame_rate}) detected for clipped audio.") |
|
frame_rate = audio.frame_rate |
|
|
|
if samples.size == 0: |
|
print(f"Warning: Clipped audio resulted in empty samples array ({start_second}s to {end_second}s).") |
|
return None |
|
|
|
return (frame_rate, samples) |
|
except FileNotFoundError: |
|
print(f"Error: Audio file not found at path: {audio_path}") |
|
return None |
|
except Exception as e: |
|
print(f"Error clipping audio {audio_path} from {start_second}s to {end_second}s: {e}") |
|
return None |
|
|
|
def preprocess_audio(audio_path, session_dir): |
|
""" |
|
オーディオファイルの前処理(リサンプリング、モノラル変換)を行う。 |
|
|
|
Args: |
|
audio_path (str): 入力オーディオファイルのパス。 |
|
session_dir (str): セッションディレクトリのパス。 |
|
|
|
Returns: |
|
tuple: (processed_path, info_path_name, duration_sec) のタプル、または None(処理に失敗した場合)。 |
|
""" |
|
try: |
|
original_path_name = Path(audio_path).name |
|
audio_name = Path(audio_path).stem |
|
|
|
try: |
|
gr.Info(f"Loading audio: {original_path_name}", duration=2) |
|
audio = AudioSegment.from_file(audio_path) |
|
duration_sec = audio.duration_seconds |
|
except Exception as load_e: |
|
gr.Error(f"Failed to load audio file {original_path_name}: {load_e}", duration=None) |
|
return None, None, None |
|
|
|
resampled = False |
|
mono = False |
|
target_sr = 16000 |
|
|
|
if audio.frame_rate != target_sr: |
|
try: |
|
audio = audio.set_frame_rate(target_sr) |
|
resampled = True |
|
except Exception as resample_e: |
|
gr.Error(f"Failed to resample audio: {resample_e}", duration=None) |
|
return None, None, None |
|
|
|
if audio.channels == 2: |
|
try: |
|
audio = audio.set_channels(1) |
|
mono = True |
|
except Exception as mono_e: |
|
gr.Error(f"Failed to convert audio to mono: {mono_e}", duration=None) |
|
return None, None, None |
|
elif audio.channels > 2: |
|
gr.Error(f"Audio has {audio.channels} channels. Only mono (1) or stereo (2) supported.", duration=None) |
|
return None, None, None |
|
|
|
processed_audio_path = None |
|
if resampled or mono: |
|
try: |
|
processed_audio_path = Path(session_dir, f"{audio_name}_resampled.wav") |
|
audio.export(processed_audio_path, format="wav") |
|
transcribe_path = processed_audio_path.as_posix() |
|
info_path_name = f"{original_path_name} (processed)" |
|
except Exception as export_e: |
|
gr.Error(f"Failed to export processed audio: {export_e}", duration=None) |
|
if processed_audio_path and os.path.exists(processed_audio_path): |
|
os.remove(processed_audio_path) |
|
return None, None, None |
|
else: |
|
transcribe_path = audio_path |
|
info_path_name = original_path_name |
|
|
|
return transcribe_path, info_path_name, duration_sec |
|
except Exception as e: |
|
gr.Error(f"Audio preprocessing failed: {e}", duration=None) |
|
return None, None, None |
|
|
|
def transcribe_audio(transcribe_path, model, duration_sec, device): |
|
""" |
|
オーディオファイルを文字起こしし、タイムスタンプを取得する。 |
|
|
|
Args: |
|
transcribe_path (str): 入力オーディオファイルのパス。 |
|
model (ASRModel): 使用するASRモデル。 |
|
duration_sec (float): オーディオファイルの長さ(秒)。 |
|
device (str): 使用するデバイス('cuda' or 'cpu')。 |
|
|
|
Returns: |
|
tuple: (vis_data, raw_times_data, word_vis_data) のタプル、または None(処理に失敗した場合)。 |
|
""" |
|
long_audio_settings_applied = False |
|
try: |
|
|
|
if device == 'cuda': |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
model.to(device) |
|
model.to(torch.float32) |
|
gr.Info(f"Transcribing on {device}...", duration=2) |
|
|
|
if duration_sec > 480: |
|
try: |
|
gr.Info("Audio longer than 8 minutes. Applying optimized settings for long transcription.", duration=3) |
|
print("Applying long audio settings: Local Attention and Chunking.") |
|
model.change_attention_model("rel_pos_local_attn", [256,256]) |
|
model.change_subsampling_conv_chunking_factor(1) |
|
|
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
long_audio_settings_applied = True |
|
except Exception as setting_e: |
|
gr.Warning(f"Could not apply long audio settings: {setting_e}", duration=5) |
|
print(f"Warning: Failed to apply long audio settings: {setting_e}") |
|
|
|
|
|
model.to(torch.bfloat16) |
|
|
|
|
|
if device == 'cuda': |
|
print(f"CUDA Memory before transcription: {torch.cuda.memory_allocated() / 1024**2:.2f} MB") |
|
|
|
output = model.transcribe([transcribe_path], timestamps=True) |
|
|
|
if not output or not isinstance(output, list) or not output[0] or not hasattr(output[0], 'timestamp') or not output[0].timestamp or 'segment' not in output[0].timestamp: |
|
gr.Error("Transcription failed or produced unexpected output format.", duration=None) |
|
return None, None, None |
|
|
|
|
|
if device == 'cuda': |
|
model.cpu() |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
segment_timestamps = output[0].timestamp['segment'] |
|
vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in segment_timestamps] |
|
raw_times_data = [[ts['start'], ts['end']] for ts in segment_timestamps] |
|
|
|
word_timestamps_raw = output[0].timestamp.get("word", []) |
|
word_vis_data = [ |
|
[f"{w['start']:.2f}", f"{w['end']:.2f}", w["word"]] |
|
for w in word_timestamps_raw if isinstance(w, dict) and 'start' in w and 'end' in w and 'word' in w |
|
] |
|
|
|
gr.Info("Transcription complete.", duration=2) |
|
return vis_data, raw_times_data, word_vis_data |
|
|
|
except torch.cuda.OutOfMemoryError as e: |
|
error_msg = 'CUDA out of memory. Please try a shorter audio or reduce GPU load.' |
|
print(f"CUDA OutOfMemoryError: {e}") |
|
gr.Error(error_msg, duration=None) |
|
|
|
if device == 'cuda': |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
return None, None, None |
|
|
|
except Exception as e: |
|
error_msg = f"Transcription failed: {e}" |
|
print(f"Error during transcription processing: {e}") |
|
gr.Error(error_msg, duration=None) |
|
return None, None, None |
|
|
|
finally: |
|
try: |
|
if long_audio_settings_applied: |
|
try: |
|
print("Reverting long audio settings.") |
|
model.change_attention_model("rel_pos") |
|
model.change_subsampling_conv_chunking_factor(-1) |
|
except Exception as revert_e: |
|
print(f"Warning: Failed to revert long audio settings: {revert_e}") |
|
gr.Warning(f"Issue reverting model settings after long transcription: {revert_e}", duration=5) |
|
|
|
if device == 'cuda': |
|
model.cpu() |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
except Exception as cleanup_e: |
|
print(f"Error during model cleanup: {cleanup_e}") |
|
gr.Warning(f"Issue during model cleanup: {cleanup_e}", duration=5) |
|
|
|
def save_transcripts(session_dir, audio_name, vis_data, word_vis_data): |
|
""" |
|
文字起こし結果を各種ファイル形式(CSV、SRT、VTT、JSON、LRC)で保存する。 |
|
|
|
Args: |
|
session_dir (str): セッションディレクトリのパス。 |
|
audio_name (str): オーディオファイルの名前。 |
|
vis_data (list): 表示用の文字起こし結果のリスト。 |
|
word_vis_data (list): 単語レベルのタイムスタンプのリスト。 |
|
|
|
Returns: |
|
tuple: 各ファイルのダウンロードボタンの更新情報を含むタプル。 |
|
""" |
|
try: |
|
csv_headers = ["Start (s)", "End (s)", "Segment"] |
|
csv_file_path = Path(session_dir, f"transcription_{audio_name}.csv") |
|
with open(csv_file_path, 'w', newline='', encoding='utf-8') as f: |
|
writer = csv.writer(f) |
|
writer.writerow(csv_headers) |
|
writer.writerows(vis_data) |
|
print(f"CSV transcript saved to temporary file: {csv_file_path}") |
|
|
|
srt_file_path = Path(session_dir, f"transcription_{audio_name}.srt") |
|
vtt_file_path = Path(session_dir, f"transcription_{audio_name}.vtt") |
|
json_file_path = Path(session_dir, f"transcription_{audio_name}.json") |
|
write_srt(vis_data, srt_file_path) |
|
write_vtt(vis_data, word_vis_data, vtt_file_path) |
|
write_json(vis_data, word_vis_data, json_file_path) |
|
print(f"SRT, VTT, JSON transcript saved to temporary files: {srt_file_path}, {vtt_file_path}, {json_file_path}") |
|
|
|
lrc_file_path = Path(session_dir, f"transcription_{audio_name}.lrc") |
|
write_lrc(vis_data, lrc_file_path) |
|
print(f"LRC transcript saved to temporary file: {lrc_file_path}") |
|
|
|
return ( |
|
gr.DownloadButton(value=csv_file_path.as_posix(), visible=True), |
|
gr.DownloadButton(value=srt_file_path.as_posix(), visible=True), |
|
gr.DownloadButton(value=vtt_file_path.as_posix(), visible=True), |
|
gr.DownloadButton(value=json_file_path.as_posix(), visible=True), |
|
gr.DownloadButton(value=lrc_file_path.as_posix(), visible=True) |
|
) |
|
except Exception as e: |
|
gr.Error(f"Failed to create transcript files: {e}", duration=None) |
|
print(f"Error writing transcript files: {e}") |
|
return tuple([gr.DownloadButton(visible=False)] * 5) |
|
|
|
def split_audio_with_overlap(audio_path: str, session_dir: str, chunk_length_sec: int = 3600, overlap_sec: int = 30) -> List[str]: |
|
""" |
|
音声ファイルをchunk_length_secごとにoverlap_secのオーバーラップ付きで分割し、 |
|
分割ファイルのパスリストを返す。 |
|
""" |
|
audio = AudioSegment.from_file(audio_path) |
|
duration = audio.duration_seconds |
|
chunk_paths = [] |
|
start = 0 |
|
chunk_idx = 0 |
|
while start < duration: |
|
end = min(start + chunk_length_sec, duration) |
|
|
|
chunk_start = max(0, start - (overlap_sec if start > 0 else 0)) |
|
chunk_end = min(end + (overlap_sec if end < duration else 0), duration) |
|
chunk = audio[chunk_start * 1000:chunk_end * 1000] |
|
chunk_path = Path(session_dir, f"chunk_{chunk_idx:03d}.wav").as_posix() |
|
chunk.export(chunk_path, format="wav") |
|
chunk_paths.append(chunk_path) |
|
start += chunk_length_sec |
|
chunk_idx += 1 |
|
return chunk_paths |
|
|
|
@spaces.GPU |
|
def get_transcripts_and_raw_times(audio_path, session_dir, progress=gr.Progress(track_tqdm=True)): |
|
""" |
|
オーディオファイルを処理し、文字起こし結果を生成する。 |
|
3時間を超える場合は60分ごとに分割し、オーバーラップ付きでASRを実行してマージする。 |
|
""" |
|
if not audio_path: |
|
gr.Error("No audio file path provided for transcription.", duration=None) |
|
return [], [], [], None, gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False) |
|
|
|
audio_name = Path(audio_path).stem |
|
processed_audio_path = None |
|
temp_chunk_paths = [] |
|
|
|
try: |
|
|
|
transcribe_path, info_path_name, duration_sec = preprocess_audio(audio_path, session_dir) |
|
if not transcribe_path or not duration_sec: |
|
return [], [], [], audio_path, gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False) |
|
|
|
processed_audio_path = transcribe_path if transcribe_path != audio_path else None |
|
if duration_sec > 10800: |
|
gr.Info("Audio is longer than 3 hours. Splitting into 1-hour chunks with overlap for transcription.", duration=5) |
|
chunk_paths = split_audio_with_overlap(transcribe_path, session_dir, chunk_length_sec=3600, overlap_sec=30) |
|
temp_chunk_paths = chunk_paths.copy() |
|
all_vis_data = [] |
|
all_raw_times_data = [] |
|
all_word_vis_data = [] |
|
offset = 0.0 |
|
prev_end = 0.0 |
|
for i, chunk_path in enumerate(progress.tqdm(chunk_paths, desc="Processing audio chunks")): |
|
chunk_audio = AudioSegment.from_file(chunk_path) |
|
chunk_duration = chunk_audio.duration_seconds |
|
|
|
result = transcribe_audio(chunk_path, model, chunk_duration, device) |
|
if not result: |
|
continue |
|
vis_data, raw_times_data, word_vis_data = result |
|
|
|
vis_data_offset = [] |
|
raw_times_data_offset = [] |
|
word_vis_data_offset = [] |
|
for row in vis_data: |
|
s, e, seg = float(row[0]), float(row[1]), row[2] |
|
vis_data_offset.append([f"{s+offset:.2f}", f"{e+offset:.2f}", seg]) |
|
for row in raw_times_data: |
|
s, e = float(row[0]), float(row[1]) |
|
raw_times_data_offset.append([s+offset, e+offset]) |
|
for row in word_vis_data: |
|
s, e, w = float(row[0]), float(row[1]), row[2] |
|
word_vis_data_offset.append([f"{s+offset:.2f}", f"{e+offset:.2f}", w]) |
|
|
|
vis_data_offset = [row for row in vis_data_offset if float(row[0]) >= prev_end] |
|
raw_times_data_offset = [row for row in raw_times_data_offset if row[0] >= prev_end] |
|
word_vis_data_offset = [row for row in word_vis_data_offset if float(row[0]) >= prev_end] |
|
if vis_data_offset: |
|
prev_end = float(vis_data_offset[-1][1]) |
|
all_vis_data.extend(vis_data_offset) |
|
all_raw_times_data.extend(raw_times_data_offset) |
|
all_word_vis_data.extend(word_vis_data_offset) |
|
offset += chunk_duration - (30 if i < len(chunk_paths)-1 else 0) |
|
|
|
button_updates = save_transcripts(session_dir, audio_name, all_vis_data, all_word_vis_data) |
|
|
|
for p in temp_chunk_paths: |
|
try: |
|
os.remove(p) |
|
except Exception: |
|
pass |
|
return ( |
|
all_vis_data, |
|
all_raw_times_data, |
|
all_word_vis_data, |
|
audio_path, |
|
*button_updates |
|
) |
|
else: |
|
|
|
result = transcribe_audio(transcribe_path, model, duration_sec, device) |
|
if not result: |
|
return [], [], [], audio_path, gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False) |
|
vis_data, raw_times_data, word_vis_data = result |
|
button_updates = save_transcripts(session_dir, audio_name, vis_data, word_vis_data) |
|
return ( |
|
vis_data, |
|
raw_times_data, |
|
word_vis_data, |
|
audio_path, |
|
*button_updates |
|
) |
|
finally: |
|
if processed_audio_path and os.path.exists(processed_audio_path): |
|
try: |
|
os.remove(processed_audio_path) |
|
print(f"Temporary audio file {processed_audio_path} removed.") |
|
except Exception as e: |
|
print(f"Error removing temporary audio file {processed_audio_path}: {e}") |
|
|
|
for p in temp_chunk_paths: |
|
if os.path.exists(p): |
|
try: |
|
os.remove(p) |
|
except Exception: |
|
pass |
|
|
|
def play_segment(evt: gr.SelectData, raw_ts_list, current_audio_path): |
|
if not isinstance(raw_ts_list, list): |
|
print(f"Warning: raw_ts_list is not a list ({type(raw_ts_list)}). Cannot play segment.") |
|
return gr.Audio(value=None, label="Selected Segment") |
|
|
|
if not current_audio_path: |
|
print("No audio path available to play segment from.") |
|
return gr.Audio(value=None, label="Selected Segment") |
|
|
|
selected_index = evt.index[0] |
|
|
|
if selected_index < 0 or selected_index >= len(raw_ts_list): |
|
print(f"Invalid index {selected_index} selected for list of length {len(raw_ts_list)}.") |
|
return gr.Audio(value=None, label="Selected Segment") |
|
|
|
if not isinstance(raw_ts_list[selected_index], (list, tuple)) or len(raw_ts_list[selected_index]) != 2: |
|
print(f"Warning: Data at index {selected_index} is not in the expected format [start, end].") |
|
return gr.Audio(value=None, label="Selected Segment") |
|
|
|
start_time_s, end_time_s = raw_ts_list[selected_index] |
|
print(f"Attempting to play segment: {current_audio_path} from {start_time_s:.2f}s to {end_time_s:.2f}s") |
|
segment_data = get_audio_segment(current_audio_path, start_time_s, end_time_s) |
|
|
|
if segment_data: |
|
print("Segment data retrieved successfully.") |
|
return gr.Audio(value=segment_data, autoplay=True, label=f"Segment: {start_time_s:.2f}s - {end_time_s:.2f}s", interactive=False) |
|
else: |
|
print("Failed to get audio segment data.") |
|
return gr.Audio(value=None, label="Selected Segment") |
|
|
|
def write_srt(segments, path): |
|
def sec2srt(t): |
|
h, rem = divmod(int(float(t)), 3600) |
|
m, s = divmod(rem, 60) |
|
ms = int((float(t) - int(float(t))) * 1000) |
|
return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
with open(path, "w", encoding="utf-8") as f: |
|
for i, seg in enumerate(segments, 1): |
|
f.write(f"{i}\n{sec2srt(seg[0])} --> {sec2srt(seg[1])}\n{seg[2]}\n\n") |
|
|
|
def write_vtt(segments, words, path): |
|
def sec2vtt(t): |
|
h, rem = divmod(int(float(t)), 3600) |
|
m, s = divmod(rem, 60) |
|
ms = int((float(t) - int(float(t))) * 1000) |
|
return f"{h:02}:{m:02}:{s:02}.{ms:03}" |
|
|
|
with open(path, "w", encoding="utf-8") as f: |
|
f.write("WEBVTT\n\n") |
|
|
|
word_idx = 0 |
|
for seg_idx, seg in enumerate(segments): |
|
s_start = float(seg[0]) |
|
s_end = float(seg[1]) |
|
|
|
|
|
segment_words = [] |
|
temp_word_idx = word_idx |
|
while temp_word_idx < len(words): |
|
w = words[temp_word_idx] |
|
w_start_val = float(w[0]) |
|
w_end_val = float(w[1]) |
|
|
|
|
|
if w_start_val >= s_start and w_end_val <= s_end: |
|
segment_words.append(w) |
|
if temp_word_idx == word_idx: |
|
word_idx = temp_word_idx + 1 |
|
temp_word_idx += 1 |
|
elif w_start_val < s_start and w_end_val > s_start: |
|
|
|
temp_word_idx += 1 |
|
elif w_start_val > s_end: |
|
break |
|
else: |
|
if temp_word_idx == word_idx: |
|
word_idx = temp_word_idx + 1 |
|
temp_word_idx += 1 |
|
|
|
|
|
for i, word_data in enumerate(segment_words): |
|
w_start = float(word_data[0]) |
|
w_end = float(word_data[1]) |
|
|
|
|
|
colored_text = "" |
|
for j, other_word_data in enumerate(segment_words): |
|
if j == i: |
|
colored_text += f"<c.yellow><b>{other_word_data[2]}</b></c> " |
|
else: |
|
colored_text += f"{other_word_data[2]} " |
|
|
|
f.write(f"{sec2vtt(w_start)} --> {sec2vtt(w_end)}\n{colored_text.strip()}\n\n") |
|
|
|
def write_json(segments, words, path): |
|
result = {"segments": []} |
|
word_idx = 0 |
|
for s in segments: |
|
s_start = float(s[0]) |
|
s_end = float(s[1]) |
|
s_text = s[2] |
|
word_list = [] |
|
while word_idx < len(words): |
|
w = words[word_idx] |
|
w_start = float(w[0]) |
|
w_end = float(w[1]) |
|
if w_start >= s_start and w_end <= s_end: |
|
word_list.append({"start": w_start, "end": w_end, "word": w[2]}) |
|
word_idx += 1 |
|
elif w_end < s_start: |
|
word_idx += 1 |
|
else: |
|
break |
|
result["segments"].append({ |
|
"start": s_start, |
|
"end": s_end, |
|
"text": s_text, |
|
"words": word_list |
|
}) |
|
with open(path, "w", encoding="utf-8") as f: |
|
json.dump(result, f, ensure_ascii=False, indent=2) |
|
|
|
def write_lrc(segments, path): |
|
def sec2lrc(t): |
|
m, s = divmod(float(t), 60) |
|
return f"[{int(m):02}:{s:05.2f}]" |
|
with open(path, "w", encoding="utf-8") as f: |
|
for seg in segments: |
|
f.write(f"{sec2lrc(seg[0])}{seg[2]}\n") |
|
|
|
article = ( |
|
"<p style='font-size: 1.1em;'>" |
|
"このデモは <code><a href='https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2' target='_blank'>parakeet-tdt-0.6b-v2</a></code> " |
|
"(約6億パラメータ)を用いた高精度な英語音声文字起こしを実演します。" |
|
"</p>" |
|
"<p><strong style='color: red; font-size: 1.2em;'>主な特長:</strong></p>" |
|
"<ul style='font-size: 1.1em;'>" " <li>自動句読点・大文字化</li>" |
|
" <li>単語レベルのタイムスタンプ(下表クリックで該当区間を再生)</li>" |
|
" <li>文字レベルのタイムスタンプ表示にも対応</li>" |
|
" <li>自動チャンク処理による <strong>長時間音声</strong> の効率的な文字起こし(数時間以上の音声にも対応)</li>" |
|
" <li>数字や歌詞など発話の多様なケースに高いロバスト性</li>" |
|
"</ul>" |
|
"<p style='font-size: 1.1em;'>" |
|
"商用・非商用ともに <strong>ライセンス制限なく利用可能</strong> です。" |
|
"</p>" |
|
"<p style='text-align: center;'>" |
|
"<a href='https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2' target='_blank'>🎙️ モデル詳細</a> | " |
|
"<a href='https://arxiv.org/abs/2305.05084' target='_blank'>📄 Fast Conformer 論文</a> | " |
|
"<a href='https://arxiv.org/abs/2304.06795' target='_blank'>📚 TDT 論文</a> | " |
|
"<a href='https://github.com/NVIDIA/NeMo' target='_blank'>🧑💻 NeMo リポジトリ</a>" |
|
"</p>" |
|
) |
|
|
|
examples = [ |
|
["data/example-yt_saTD1u8PorI.mp3"], |
|
] |
|
|
|
nvidia_theme = gr_themes.Default( |
|
primary_hue=gr_themes.Color( |
|
c50="#E6F1D9", c100="#CEE3B3", c200="#B5D58C", c300="#9CC766", |
|
c400="#84B940", c500="#76B900", c600="#68A600", c700="#5A9200", |
|
c800="#4C7E00", c900="#3E6A00", c950="#2F5600" |
|
), |
|
neutral_hue="gray", |
|
font=[gr_themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"], |
|
).set() |
|
|
|
with gr.Blocks(theme=nvidia_theme) as demo: |
|
model_display_name = MODEL_NAME.split('/')[-1] if '/' in MODEL_NAME else MODEL_NAME |
|
gr.Markdown(f"<h1 style='text-align: center; margin: 0 auto;'>長時間対応 音声文字起こし ({model_display_name})</h1>") |
|
gr.HTML(article) |
|
|
|
current_audio_path_state = gr.State(None) |
|
raw_timestamps_list_state = gr.State([]) |
|
session_dir_state = gr.State() |
|
demo.load(start_session, outputs=[session_dir_state]) |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Audio File"): |
|
file_input = gr.Audio(sources=["upload"], type="filepath", label="Upload Audio File") |
|
gr.Examples(examples=examples, inputs=[file_input], label="Example Audio Files (Click to Load)") |
|
file_transcribe_btn = gr.Button("Transcribe Uploaded File", variant="primary") |
|
|
|
with gr.TabItem("Microphone"): |
|
mic_input = gr.Audio(sources=["microphone"], type="filepath", label="Record Audio") |
|
mic_transcribe_btn = gr.Button("Transcribe Microphone Input", variant="primary") |
|
|
|
gr.Markdown("---") |
|
gr.Markdown("<p><strong style='color: #FF0000; font-size: 1.2em;'>Transcription Results</strong></p>") |
|
|
|
download_btn = gr.DownloadButton(label="Download Segment Transcript (CSV)", visible=False) |
|
srt_btn = gr.DownloadButton(label="Download SRT", visible=False) |
|
vtt_btn = gr.DownloadButton(label="Download VTT", visible=False) |
|
json_btn = gr.DownloadButton(label="Download JSON", visible=False) |
|
lrc_btn = gr.DownloadButton(label="Download LRC", visible=False) |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Segment View (Click row to play segment)"): |
|
vis_timestamps_df = gr.DataFrame( |
|
headers=["Start (s)", "End (s)", "Segment"], |
|
datatype=["number", "number", "str"], |
|
wrap=True, |
|
) |
|
selected_segment_player = gr.Audio(label="Selected Segment", interactive=False) |
|
|
|
with gr.TabItem("Word View"): |
|
word_vis_df = gr.DataFrame( |
|
headers=["Start (s)", "End (s)", "Word"], |
|
datatype=["number", "number", "str"], |
|
wrap=False, |
|
) |
|
|
|
mic_transcribe_btn.click( |
|
fn=get_transcripts_and_raw_times, |
|
inputs=[mic_input, session_dir_state], |
|
outputs=[vis_timestamps_df, raw_timestamps_list_state, word_vis_df, current_audio_path_state, download_btn, srt_btn, vtt_btn, json_btn, lrc_btn], |
|
api_name="transcribe_mic" |
|
) |
|
|
|
file_transcribe_btn.click( |
|
fn=get_transcripts_and_raw_times, |
|
inputs=[file_input, session_dir_state], |
|
outputs=[vis_timestamps_df, raw_timestamps_list_state, word_vis_df, current_audio_path_state, download_btn, srt_btn, vtt_btn, json_btn, lrc_btn], |
|
api_name="transcribe_file" |
|
) |
|
|
|
vis_timestamps_df.select( |
|
fn=play_segment, |
|
inputs=[raw_timestamps_list_state, current_audio_path_state], |
|
outputs=[selected_segment_player], |
|
) |
|
|
|
demo.unload(end_session) |
|
|
|
if __name__ == "__main__": |
|
print("Launching Gradio Demo...") |
|
demo.queue( |
|
max_size=5, |
|
default_concurrency_limit=1 |
|
) |
|
demo.launch( |
|
server_name="127.0.0.1", |
|
server_port=7860, |
|
share=False, |
|
max_threads=1 |
|
) |