|
import os |
|
import json |
|
import time |
|
import sys |
|
import requests |
|
from pathlib import Path |
|
|
|
import argparse |
|
from typing import List, Optional, Dict |
|
import shutil |
|
import subprocess |
|
import collections |
|
import tkinter as tk |
|
from tkinter import filedialog |
|
|
|
try: |
|
from gradio_client import Client, file as gradio_file |
|
GRADIO_CLIENT_AVAILABLE = True |
|
except ImportError: |
|
GRADIO_CLIENT_AVAILABLE = False |
|
print("Warning: gradio_client not found. Please install it with: pip install gradio_client") |
|
|
|
|
|
CHUNK_LENGTH_SECONDS = 3600 |
|
CHUNK_OVERLAP_SECONDS = 30 |
|
SPACE_URL = "https://sungo-ganpare-parakeet-tdt-0-6b-v2.hf.space" |
|
MAX_VTT_SIZE_BYTES = 100 * 1024 * 1024 |
|
TARGET_AUDIO_VIDEO_EXTENSIONS = [ |
|
'.wav', '.mp3', '.m4a', '.flac', '.ogg', |
|
'.mp4', '.mkv', '.mov', '.avi', '.webm' |
|
] |
|
|
|
PRIMARY_OUTPUT_EXTENSION_FOR_SKIP_CHECK = '.json' |
|
|
|
|
|
def get_audio_duration_with_ffprobe(audio_path: str) -> Optional[float]: |
|
"""ffprobeを使用して音声ファイルの長さを取得""" |
|
try: |
|
if not shutil.which('ffprobe'): |
|
print("Warning: ffprobe not found") |
|
return None |
|
cmd = ['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'csv=p=0', audio_path] |
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) |
|
if result.returncode == 0 and result.stdout.strip(): |
|
return float(result.stdout.strip()) |
|
print(f"Warning: Could not get duration for {Path(audio_path).name} using ffprobe. Return code: {result.returncode}, Error: {result.stderr.strip()}") |
|
return None |
|
except Exception as e: |
|
print(f"Error getting audio duration for {Path(audio_path).name}: {e}") |
|
return None |
|
|
|
def split_audio_with_ffmpeg(audio_path: str, output_dir_base: str, chunk_length_sec: int, overlap_sec: int) -> List[str]: |
|
"""ffmpegを使用して音声ファイルを分割。一時チャンクは output_dir_base/temp_chunks 以下に保存""" |
|
audio_file_obj = Path(audio_path) |
|
try: |
|
if not shutil.which('ffmpeg'): |
|
print(f"Error: ffmpeg not found. Cannot split {audio_file_obj.name}.") |
|
return [] |
|
|
|
duration_sec = get_audio_duration_with_ffprobe(audio_path) |
|
if duration_sec is None: |
|
print(f"Could not determine duration for {audio_file_obj.name}. Skipping split.") |
|
return [] |
|
|
|
chunk_paths = [] |
|
audio_stem = audio_file_obj.stem |
|
|
|
temp_chunk_storage_dir = Path(output_dir_base) / "temp_chunks" / audio_stem |
|
temp_chunk_storage_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
start_sec = 0 |
|
chunk_idx = 0 |
|
|
|
print(f"Splitting {audio_file_obj.name} into chunks (max {chunk_length_sec}s each)...") |
|
while start_sec < duration_sec: |
|
actual_start_sec = max(0, start_sec - (overlap_sec if start_sec > 0 else 0)) |
|
base_end_sec = start_sec + chunk_length_sec |
|
actual_end_sec = min(base_end_sec + (overlap_sec if base_end_sec < duration_sec else 0), duration_sec) |
|
|
|
if actual_start_sec >= actual_end_sec: break |
|
|
|
chunk_duration = actual_end_sec - actual_start_sec |
|
chunk_file_name = f"{audio_stem}_chunk_{chunk_idx:03d}.wav" |
|
chunk_file_path = temp_chunk_storage_dir / chunk_file_name |
|
|
|
cmd = [ |
|
'ffmpeg', '-y', '-loglevel', 'error', '-ss', str(actual_start_sec), |
|
'-i', audio_path, '-t', str(chunk_duration), |
|
'-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', str(chunk_file_path) |
|
] |
|
try: |
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) |
|
if result.returncode == 0: |
|
chunk_paths.append(chunk_file_path.as_posix()) |
|
|
|
else: |
|
print(f" Error creating chunk {chunk_idx+1} for {audio_file_obj.name}: {result.stderr.strip()}") |
|
except subprocess.TimeoutExpired: |
|
print(f" Timeout creating chunk {chunk_idx+1} for {audio_file_obj.name}") |
|
|
|
start_sec += chunk_length_sec |
|
chunk_idx += 1 |
|
|
|
if chunk_paths: print(f" Finished splitting {audio_file_obj.name} into {len(chunk_paths)} chunks.") |
|
else: print(f" No chunks created for {audio_file_obj.name}.") |
|
return chunk_paths |
|
except Exception as e: |
|
print(f"Error splitting audio {audio_file_obj.name}: {e}") |
|
return [] |
|
|
|
|
|
|
|
class GPUQuotaExceededError(Exception): |
|
"""GPU制限に達した場合の例外""" |
|
pass |
|
|
|
def process_chunk(chunk_path: str, original_audio_filename: str) -> Optional[Dict]: |
|
"""チャンクをSpaceに送信して処理""" |
|
chunk_name = Path(chunk_path).name |
|
if not GRADIO_CLIENT_AVAILABLE: |
|
print(f"Error (gradio_client unavailable) processing {chunk_name} for {original_audio_filename}") |
|
return None |
|
try: |
|
client = None |
|
for attempt in range(3): |
|
try: |
|
|
|
client = Client(SPACE_URL) |
|
break |
|
except Exception as e: |
|
error_msg = str(e).lower() |
|
|
|
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable']): |
|
print(f" GPU quota exceeded detected: {e}") |
|
raise GPUQuotaExceededError(f"GPU quota exceeded: {e}") |
|
print(f" Connection attempt {attempt + 1} for {chunk_name} (from {original_audio_filename}) failed: {e}") |
|
if attempt < 2: time.sleep(5) |
|
else: raise |
|
if client is None: return None |
|
|
|
result = None |
|
api_methods_to_try = [{"name": "fn_index=1", "fn_index": 1}, {"name": "fn_index=0", "fn_index": 0}, {"name": "default", "fn_index": None}] |
|
for method_info in api_methods_to_try: |
|
try: |
|
if method_info["fn_index"] is not None: |
|
result = client.predict(gradio_file(chunk_path), fn_index=method_info["fn_index"]) |
|
else: |
|
result = client.predict(gradio_file(chunk_path)) |
|
|
|
break |
|
except Exception as api_e: |
|
error_msg = str(api_e).lower() |
|
|
|
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable', 'out of memory', 'resource']): |
|
print(f" GPU quota exceeded during API call: {api_e}") |
|
raise GPUQuotaExceededError(f"GPU quota exceeded during API call: {api_e}") |
|
|
|
result = None |
|
|
|
if result is None: |
|
print(f" All API call methods failed for {chunk_name} (from {original_audio_filename})") |
|
return None |
|
|
|
if isinstance(result, dict): |
|
return result |
|
elif isinstance(result, str): |
|
try: return json.loads(result) |
|
except json.JSONDecodeError: |
|
print(f" Failed to parse JSON response for {chunk_name}: {result[:100]}...") |
|
return None |
|
else: |
|
print(f" Unexpected response format for {chunk_name}: {type(result)}") |
|
return None |
|
except GPUQuotaExceededError: |
|
|
|
raise |
|
except Exception as e: |
|
error_msg = str(e).lower() |
|
|
|
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable', 'out of memory', 'resource']): |
|
print(f"GPU quota exceeded detected in general exception: {e}") |
|
raise GPUQuotaExceededError(f"GPU quota exceeded: {e}") |
|
print(f"Error sending chunk {chunk_name} (from {original_audio_filename}) to Space: {e}") |
|
return None |
|
|
|
def merge_transcripts(chunk_results: List[Dict], overlap_sec: int, audio_filename: str) -> Dict: |
|
merged_segments = [] |
|
|
|
|
|
cumulative_offset = 0.0 |
|
|
|
for i, chunk_result in enumerate(chunk_results): |
|
if not isinstance(chunk_result, dict) or "segments" not in chunk_result: |
|
|
|
continue |
|
if "error" in chunk_result: |
|
|
|
continue |
|
|
|
chunk_start_time_in_global = cumulative_offset |
|
|
|
last_segment_end_from_this_chunk = 0.0 |
|
|
|
for seg_idx, seg in enumerate(chunk_result["segments"]): |
|
if not (isinstance(seg, dict) and "start" in seg and "end" in seg and "text" in seg): |
|
|
|
continue |
|
|
|
original_seg_start = float(seg["start"]) |
|
original_seg_end = float(seg["end"]) |
|
|
|
|
|
|
|
if i > 0 and original_seg_end < overlap_sec * 0.5: |
|
continue |
|
|
|
|
|
seg_start = original_seg_start + chunk_start_time_in_global |
|
seg_end = original_seg_end + chunk_start_time_in_global |
|
|
|
|
|
if merged_segments: |
|
last_merged_seg_end = merged_segments[-1]["end"] |
|
if seg_start < last_merged_seg_end: |
|
if seg_end <= last_merged_seg_end: |
|
continue |
|
seg_start = last_merged_seg_end |
|
|
|
if seg_start >= seg_end: continue |
|
|
|
processed_words = [] |
|
if "words" in seg and isinstance(seg["words"], list): |
|
for word_data in seg["words"]: |
|
if not (isinstance(word_data, dict) and "start" in word_data and "end" in word_data and "word" in word_data): |
|
continue |
|
w_start = float(word_data["start"]) + chunk_start_time_in_global |
|
w_end = float(word_data["end"]) + chunk_start_time_in_global |
|
|
|
|
|
w_start = max(w_start, seg_start) |
|
w_end = min(w_end, seg_end) |
|
if w_start >= w_end: continue |
|
|
|
processed_words.append({"start": round(w_start, 3), "end": round(w_end, 3), "word": word_data["word"]}) |
|
|
|
merged_segments.append({ |
|
"start": round(seg_start, 3), "end": round(seg_end, 3), |
|
"text": seg["text"], "words": processed_words |
|
}) |
|
last_segment_end_from_this_chunk = max(last_segment_end_from_this_chunk, original_seg_end) |
|
|
|
|
|
|
|
|
|
|
|
if last_segment_end_from_this_chunk > overlap_sec : |
|
cumulative_offset += max(0, last_segment_end_from_this_chunk - overlap_sec) |
|
else: |
|
cumulative_offset += (CHUNK_LENGTH_SECONDS - overlap_sec) |
|
|
|
if merged_segments: print(f" Finished merging transcripts for {audio_filename}.") |
|
else: print(f" No segments to merge for {audio_filename}.") |
|
return {"segments": merged_segments} |
|
|
|
|
|
def save_transcript(result: Dict, output_path_stem_str: str, audio_filename: str): |
|
output_path_obj = Path(output_path_stem_str) |
|
|
|
|
|
segments_for_output = [] |
|
all_words_for_output = [] |
|
|
|
if "segments" in result and isinstance(result["segments"], list): |
|
for seg in result["segments"]: |
|
if isinstance(seg, dict) and "start" in seg and "end" in seg and "text" in seg: |
|
segments_for_output.append( (seg["start"], seg["end"], seg["text"]) ) |
|
if "words" in seg and isinstance(seg["words"], list): |
|
for word_info in seg["words"]: |
|
if isinstance(word_info, dict) and "start" in word_info and "end" in word_info and "word" in word_info: |
|
all_words_for_output.append( (word_info["start"], word_info["end"], word_info["word"]) ) |
|
if not segments_for_output: |
|
print(f" No segments to write for {audio_filename}. Output files will be empty or not created.") |
|
|
|
|
|
|
|
|
|
json_path = output_path_obj.with_suffix(".json") |
|
write_json_output(segments_for_output, all_words_for_output, json_path) |
|
print(f" Transcript saved: {json_path.name}") |
|
|
|
srt_path = output_path_obj.with_suffix(".srt") |
|
write_srt(segments_for_output, srt_path) |
|
print(f" Transcript saved: {srt_path.name}") |
|
|
|
vtt_path = output_path_obj.with_suffix(".vtt") |
|
try: |
|
write_vtt(segments_for_output, all_words_for_output, vtt_path) |
|
print(f" Transcript saved: {vtt_path.name}") |
|
except ValueError as e: |
|
print(f" Error saving VTT for {audio_filename} ({vtt_path.name}): {e}") |
|
if vtt_path.exists(): |
|
try: vtt_path.unlink() |
|
except OSError as ose: print(f" Could not delete incomplete VTT file {vtt_path}: {ose}") |
|
|
|
lrc_path = output_path_obj.with_suffix(".lrc") |
|
write_lrc(segments_for_output, lrc_path) |
|
print(f" Transcript saved: {lrc_path.name}") |
|
|
|
|
|
def write_srt(segments: List, path: Path): |
|
def sec2srt(t_float: float) -> str: |
|
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60) |
|
ms = int((t_float - int(t_float)) * 1000) |
|
return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
with open(path, "w", encoding="utf-8") as f: |
|
if not segments: f.write("") |
|
for i, seg_list in enumerate(segments, 1): |
|
f.write(f"{i}\n{sec2srt(float(seg_list[0]))} --> {sec2srt(float(seg_list[1]))}\n{seg_list[2]}\n\n") |
|
|
|
def write_vtt(segments: List, words: List, path: Path): |
|
def sec2vtt(t_float: float) -> str: |
|
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60) |
|
ms = int((t_float - int(t_float)) * 1000) |
|
return f"{h:02}:{m:02}:{s:02}.{ms:03}" |
|
|
|
with open(path, "w", encoding="utf-8") as f: |
|
f.write("WEBVTT\n\n") |
|
if not segments: return |
|
|
|
f.write("STYLE\n") |
|
f.write("::cue(.current) { color: #ffff00; font-weight: bold; }\n") |
|
f.write("::cue(.past) { color: #888888; }\n") |
|
f.write("::cue(.future) { color: #ffffff; }\n") |
|
f.write("::cue(.line) { background: rgba(0,0,0,0.7); padding: 4px; }\n\n") |
|
|
|
|
|
use_word_timestamps = bool(words) |
|
|
|
if not use_word_timestamps: |
|
for i, seg_data in enumerate(segments, 1): |
|
f.write(f"NOTE Segment {i}\n") |
|
f.write(f"{sec2vtt(float(seg_data[0]))} --> {sec2vtt(float(seg_data[1]))}\n{seg_data[2]}\n\n") |
|
if f.tell() > MAX_VTT_SIZE_BYTES: |
|
raise ValueError(f"VTT file size limit ({MAX_VTT_SIZE_BYTES/1024/1024:.1f}MB) exceeded for {path.name}") |
|
return |
|
|
|
|
|
segment_word_map = collections.defaultdict(list) |
|
word_iter = iter(sorted(words, key=lambda x: float(x[0]))) |
|
current_word = next(word_iter, None) |
|
|
|
for seg_idx, seg_data in enumerate(segments): |
|
seg_start, seg_end, seg_text_full = float(seg_data[0]), float(seg_data[1]), seg_data[2] |
|
|
|
while current_word: |
|
word_start, word_end_time, word_text = float(current_word[0]), float(current_word[1]), current_word[2] |
|
|
|
if word_start < seg_end - 0.01: |
|
if word_start >= seg_start - 0.01 : |
|
segment_word_map[seg_idx].append(current_word) |
|
current_word = next(word_iter, None) |
|
else: |
|
break |
|
|
|
|
|
|
|
|
|
|
|
for seg_idx, seg_data in enumerate(segments): |
|
seg_start, seg_end, seg_text_full = float(seg_data[0]), float(seg_data[1]), seg_data[2] |
|
|
|
|
|
current_segment_words = [] |
|
for word_data in words: |
|
w_start, w_end = float(word_data[0]), float(word_data[1]) |
|
|
|
if max(seg_start, w_start) < min(seg_end, w_end): |
|
current_segment_words.append(word_data) |
|
|
|
current_segment_words.sort(key=lambda x: float(x[0])) |
|
|
|
if not current_segment_words: |
|
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(seg_end)}\n{seg_text_full}\n\n") |
|
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") |
|
continue |
|
|
|
all_words_text_in_segment = [w[2] for w in current_segment_words] |
|
|
|
|
|
first_word_actual_start = float(current_segment_words[0][0]) |
|
if seg_start < first_word_actual_start - 0.05: |
|
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(first_word_actual_start)}\n") |
|
f.write(f'<c.line>{" ".join(f"<c.future>{w_txt}</c>" for w_txt in all_words_text_in_segment)}</c>\n\n') |
|
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") |
|
|
|
for local_idx, word_data in enumerate(current_segment_words): |
|
w_s, w_e, w_txt = float(word_data[0]), float(word_data[1]), word_data[2] |
|
|
|
f.write(f"{sec2vtt(w_s)} --> {sec2vtt(w_e)}\n") |
|
line_parts = [f'<c.past>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i < local_idx] |
|
line_parts.append(f'<c.current>{w_txt}</c>') |
|
line_parts.extend(f'<c.future>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i > local_idx) |
|
f.write(f'<c.line>{" ".join(line_parts)}</c>\n\n') |
|
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") |
|
|
|
|
|
if local_idx < len(current_segment_words) - 1: |
|
next_word_actual_start = float(current_segment_words[local_idx + 1][0]) |
|
if w_e < next_word_actual_start - 0.05: |
|
f.write(f"{sec2vtt(w_e)} --> {sec2vtt(next_word_actual_start)}\n") |
|
|
|
past_part = [f'<c.past>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i <= local_idx] |
|
future_part = [f'<c.future>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i > local_idx] |
|
f.write(f'<c.line>{" ".join(past_part + future_part)}</c>\n\n') |
|
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") |
|
|
|
|
|
last_word_actual_end = float(current_segment_words[-1][1]) |
|
if last_word_actual_end < seg_end - 0.05: |
|
f.write(f"{sec2vtt(last_word_actual_end)} --> {sec2vtt(seg_end)}\n") |
|
f.write(f'<c.line>{" ".join(f"<c.past>{w_txt}</c>" for w_txt in all_words_text_in_segment)}</c>\n\n') |
|
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") |
|
|
|
|
|
def write_json_output(segments: List, words: List, path: Path): |
|
result_data = {"segments": []} |
|
|
|
|
|
|
|
for seg_start, seg_end, seg_text in segments: |
|
segment_entry = {"start": seg_start, "end": seg_end, "text": seg_text, "words": []} |
|
if words: |
|
|
|
|
|
|
|
for w_start, w_end, w_text in words: |
|
|
|
if max(seg_start, w_start) < min(seg_end, w_end): |
|
segment_entry["words"].append({"start": w_start, "end": w_end, "word": w_text}) |
|
result_data["segments"].append(segment_entry) |
|
|
|
with open(path, "w", encoding="utf-8") as f: |
|
json.dump(result_data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
def write_lrc(segments: List, path: Path): |
|
def sec2lrc(t_float: float) -> str: |
|
m, s = divmod(float(t_float), 60) |
|
return f"[{int(m):02d}:{s:05.2f}]" |
|
with open(path, "w", encoding="utf-8") as f: |
|
if not segments: f.write("") |
|
for seg_list in segments: |
|
f.write(f"{sec2lrc(float(seg_list[0]))}{seg_list[2]}\n") |
|
|
|
def process_audio_file(input_path_str: str, output_dir_str: str): |
|
original_input_path_obj = Path(input_path_str) |
|
audio_filename = original_input_path_obj.name |
|
print(f"Processing: {audio_filename}") |
|
|
|
temp_wav_path_obj: Optional[Path] = None |
|
current_processing_input_path = input_path_str |
|
|
|
output_dir_path = Path(output_dir_str) |
|
|
|
|
|
base_temp_dir = output_dir_path / "temp_processing" / original_input_path_obj.stem |
|
temp_conversion_dir = base_temp_dir / "conversion" |
|
|
|
|
|
try: |
|
if original_input_path_obj.suffix.lower() not in ['.wav']: |
|
print(f" Converting {audio_filename} to WAV...") |
|
temp_conversion_dir.mkdir(parents=True, exist_ok=True) |
|
temp_wav_path_obj = temp_conversion_dir / f"{original_input_path_obj.stem}_converted.wav" |
|
|
|
if not shutil.which('ffmpeg'): |
|
print(f" Error: ffmpeg not found. Cannot convert {audio_filename}.") |
|
return |
|
|
|
cmd = [ |
|
'ffmpeg', '-y', '-loglevel', 'error', '-i', input_path_str, |
|
'-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', |
|
temp_wav_path_obj.as_posix() |
|
] |
|
try: |
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) |
|
if result.returncode == 0: |
|
print(f" Successfully converted to {temp_wav_path_obj.name}") |
|
current_processing_input_path = temp_wav_path_obj.as_posix() |
|
else: |
|
print(f" Error converting {audio_filename} to WAV: {result.stderr.strip()}") |
|
return |
|
except subprocess.TimeoutExpired: |
|
print(f" Timeout converting {audio_filename} to WAV.") |
|
return |
|
except Exception as e_conv: |
|
print(f" Exception during WAV conversion for {audio_filename}: {e_conv}") |
|
return |
|
|
|
|
|
chunk_paths = split_audio_with_ffmpeg( |
|
current_processing_input_path, output_dir_path.as_posix(), |
|
CHUNK_LENGTH_SECONDS, CHUNK_OVERLAP_SECONDS |
|
) |
|
|
|
if not chunk_paths: |
|
print(f" Failed to split {audio_filename}. Skipping transcription.") |
|
return |
|
|
|
print(f" Processing {len(chunk_paths)} chunks for {audio_filename} via API...") |
|
chunk_results = [] |
|
for i, chunk_p_str in enumerate(chunk_paths): |
|
try: |
|
api_result = process_chunk(chunk_p_str, audio_filename) |
|
if api_result: |
|
chunk_results.append(api_result) |
|
print(f" Successfully processed chunk {i+1}/{len(chunk_paths)}") |
|
else: |
|
print(f" Failed to process chunk {i+1}/{len(chunk_paths)}") |
|
except GPUQuotaExceededError as gpu_error: |
|
print(f" GPU quota exceeded while processing {audio_filename}") |
|
print(f" Error: {gpu_error}") |
|
print(f" GPU制限に達しました。処理を強制終了します。") |
|
raise |
|
|
|
|
|
if i < len(chunk_paths) - 1: |
|
wait_seconds = 5 |
|
print(f" Waiting for {wait_seconds} seconds before processing next chunk...") |
|
time.sleep(wait_seconds) |
|
|
|
if not chunk_results: |
|
print(f" No chunks successfully processed via API for {audio_filename}.") |
|
return |
|
|
|
merged_result = merge_transcripts(chunk_results, CHUNK_OVERLAP_SECONDS, audio_filename) |
|
|
|
output_stem_str = (output_dir_path / original_input_path_obj.stem).as_posix() |
|
save_transcript(merged_result, output_stem_str, audio_filename) |
|
|
|
except Exception as e_main_proc: |
|
print(f"An unexpected error occurred while processing {audio_filename}: {e_main_proc}") |
|
import traceback |
|
traceback.print_exc() |
|
finally: |
|
|
|
|
|
chunk_temp_parent_dir = output_dir_path / "temp_chunks" / original_input_path_obj.stem |
|
if chunk_temp_parent_dir.exists(): |
|
try: |
|
shutil.rmtree(chunk_temp_parent_dir) |
|
|
|
except OSError as e_del_chunk: |
|
print(f" Error deleting temp chunk dir {chunk_temp_parent_dir}: {e_del_chunk}") |
|
|
|
if temp_conversion_dir.exists() and temp_conversion_dir.parent == base_temp_dir : |
|
pass |
|
|
|
|
|
if base_temp_dir.exists(): |
|
try: |
|
shutil.rmtree(base_temp_dir) |
|
print(f" Cleaned up temporary processing directory: {base_temp_dir}") |
|
except OSError as e_del_base: |
|
print(f" Error deleting base temp dir {base_temp_dir}: {e_del_base}") |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="Transcribes audio/video files from a specified path (file or directory). " |
|
"Outputs are saved in the same location as input files. " |
|
"Skips already processed files (checks for .json output). " |
|
"Prefers MP3 over MP4 if both exist with the same base name." |
|
) |
|
parser.add_argument( |
|
"input_path", |
|
nargs="?", |
|
help="Path to an input audio/video file or a directory containing such files." |
|
) |
|
args = parser.parse_args() |
|
|
|
|
|
if args.input_path is None: |
|
root = tk.Tk() |
|
root.withdraw() |
|
|
|
input_path = filedialog.askdirectory( |
|
title="処理したい音声/動画ファイルのあるフォルダを選択してください", |
|
initialdir=os.getcwd() |
|
) |
|
|
|
if not input_path: |
|
print("フォルダが選択されませんでした。") |
|
return |
|
|
|
input_path_obj = Path(input_path) |
|
else: |
|
input_path_obj = Path(args.input_path) |
|
|
|
if not input_path_obj.exists(): |
|
print(f"Error: Input path '{args.input_path}' does not exist.") |
|
return |
|
|
|
|
|
files_to_consider_processing = [] |
|
if input_path_obj.is_file(): |
|
if input_path_obj.suffix.lower() in TARGET_AUDIO_VIDEO_EXTENSIONS: |
|
files_to_consider_processing.append(input_path_obj) |
|
else: |
|
print(f"Input file '{input_path_obj.name}' is not a supported type. Supported: {TARGET_AUDIO_VIDEO_EXTENSIONS}") |
|
elif input_path_obj.is_dir(): |
|
print(f"Scanning directory: {input_path_obj.resolve()}") |
|
|
|
grouped_files = collections.defaultdict(list) |
|
for item in sorted(input_path_obj.iterdir()): |
|
if item.is_file() and item.suffix.lower() in TARGET_AUDIO_VIDEO_EXTENSIONS: |
|
grouped_files[item.stem].append(item) |
|
|
|
if not grouped_files: |
|
print(f"No supported files found in directory: {input_path_obj.resolve()}") |
|
return |
|
|
|
for base_name, file_group in grouped_files.items(): |
|
mp3_file = next((f for f in file_group if f.suffix.lower() == '.mp3'), None) |
|
mp4_file = next((f for f in file_group if f.suffix.lower() == '.mp4'), None) |
|
|
|
chosen_file = None |
|
if mp3_file: |
|
chosen_file = mp3_file |
|
if mp4_file and mp4_file != mp3_file: |
|
print(f" MP3 found for '{base_name}', prioritizing '{mp3_file.name}' over '{mp4_file.name}'.") |
|
elif mp4_file: |
|
chosen_file = mp4_file |
|
else: |
|
|
|
if file_group: chosen_file = file_group[0] |
|
|
|
if chosen_file: |
|
files_to_consider_processing.append(chosen_file) |
|
else: |
|
print(f"Error: Input path '{args.input_path}' is not a valid file or directory.") |
|
return |
|
|
|
if not files_to_consider_processing: |
|
print("No files selected for processing.") |
|
return |
|
|
|
|
|
actual_files_to_process = [] |
|
print(f"\nFound {len(files_to_consider_processing)} potential file(s). Checking for existing transcripts...") |
|
for file_path in files_to_consider_processing: |
|
output_dir = file_path.parent |
|
|
|
expected_output_file = output_dir / f"{file_path.stem}{PRIMARY_OUTPUT_EXTENSION_FOR_SKIP_CHECK}" |
|
if expected_output_file.exists(): |
|
print(f" Skipping '{file_path.name}': Output '{expected_output_file.name}' already exists.") |
|
else: |
|
actual_files_to_process.append(file_path) |
|
|
|
if not actual_files_to_process: |
|
print("\nNo new files to process. All selected files seem to have existing transcripts.") |
|
return |
|
|
|
total_to_process_count = len(actual_files_to_process) |
|
print(f"\nStarting processing for {total_to_process_count} new file(s)...") |
|
for i, file_to_process_obj in enumerate(actual_files_to_process): |
|
print(f"\n--- [{i+1}/{total_to_process_count}] Processing: {file_to_process_obj.name} ---") |
|
output_dir_for_this_file = file_to_process_obj.parent.as_posix() |
|
try: |
|
process_audio_file(file_to_process_obj.as_posix(), output_dir_for_this_file) |
|
print(f"--- Finished: {file_to_process_obj.name} ---") |
|
except GPUQuotaExceededError as gpu_error: |
|
print(f"\n=== GPU QUOTA EXCEEDED ===") |
|
print(f"処理を中断します。GPU制限に達しました。") |
|
print(f"Error details: {gpu_error}") |
|
sys.exit(1) |
|
|
|
print(f"\nAll {total_to_process_count} new file(s) processed.") |
|
|
|
|
|
if __name__ == "__main__": |
|
if not GRADIO_CLIENT_AVAILABLE: |
|
print("Critical: gradio_client library is not installed. Please run: pip install gradio_client") |
|
else: |
|
main() |