Spaces:
Runtime error
Runtime error
import tkinter as tk | |
from tkinter import scrolledtext, messagebox, ttk | |
import threading | |
import os | |
import whisper | |
from dotenv import load_dotenv | |
import logging | |
import glob | |
from datetime import datetime | |
from stt_processor import TextProcessor | |
# 환경 변수 로드 | |
load_dotenv() | |
# --- 설정: .env 파일에서 API 키를 읽어옵니다 --- | |
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
# 폴더 생성 | |
for folder in ["logs", "output", "data"]: | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
# 로깅 설정 | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('logs/stt_processor.log', encoding='utf-8'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
class STTProcessorApp: | |
def __init__(self, root): | |
self.root = root | |
self.root.title("2인 대화 STT 처리기 (AI 화자 분리)") | |
self.root.geometry("1000x750") | |
# 모델 초기화 | |
self.whisper_model = None | |
self.text_processor = None | |
# UI 구성 요소 | |
self.setup_ui() | |
# 상태 추적 | |
self.is_processing = False | |
logger.info("STT 처리기 앱이 시작되었습니다.") | |
def setup_ui(self): | |
"""UI 컴포넌트를 설정합니다.""" | |
# 상단 프레임 - 상태 정보 | |
status_frame = ttk.Frame(self.root) | |
status_frame.pack(fill=tk.X, padx=10, pady=5) | |
ttk.Label(status_frame, text="상태:").pack(side=tk.LEFT) | |
self.status_label = ttk.Label(status_frame, text="준비", foreground="green") | |
self.status_label.pack(side=tk.LEFT, padx=(5, 0)) | |
# 중앙 프레임 - 로그 출력 | |
log_frame = ttk.LabelFrame(self.root, text="처리 로그") | |
log_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) | |
self.log_text = scrolledtext.ScrolledText(log_frame, height=25, wrap=tk.WORD) | |
self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) | |
# 하단 프레임 - 컨트롤 | |
control_frame = ttk.Frame(self.root) | |
control_frame.pack(fill=tk.X, padx=10, pady=5) | |
# 왼쪽: 모델 로딩 버튼 | |
self.load_models_btn = ttk.Button( | |
control_frame, | |
text="모델 로딩", | |
command=self.load_models_threaded | |
) | |
self.load_models_btn.pack(side=tk.LEFT, padx=(0, 10)) | |
# 중앙: 처리 버튼 | |
self.process_btn = ttk.Button( | |
control_frame, | |
text="오디오 파일 처리 시작", | |
command=self.process_files_threaded, | |
state=tk.DISABLED | |
) | |
self.process_btn.pack(side=tk.LEFT, padx=(0, 10)) | |
# 오른쪽: 종료 버튼 | |
ttk.Button( | |
control_frame, | |
text="종료", | |
command=self.root.quit | |
).pack(side=tk.RIGHT) | |
# 진행률 표시 | |
self.progress = ttk.Progressbar( | |
control_frame, | |
mode='indeterminate' | |
) | |
self.progress.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10)) | |
def log(self, message): | |
"""로그 메시지를 UI에 출력합니다.""" | |
def append_log(): | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n") | |
self.log_text.see(tk.END) | |
self.root.after(0, append_log) | |
logger.info(message) | |
def update_status(self, status, color="black"): | |
"""상태 라벨을 업데이트합니다.""" | |
def update(): | |
self.status_label.config(text=status, foreground=color) | |
self.root.after(0, update) | |
def load_models_threaded(self): | |
"""별도 스레드에서 모델을 로딩합니다.""" | |
threading.Thread(target=self.load_models, daemon=True).start() | |
def load_models(self): | |
"""AI 모델들을 로딩합니다.""" | |
try: | |
self.update_status("모델 로딩 중...", "orange") | |
self.log("AI 모델 로딩을 시작합니다...") | |
# API 키 검증 | |
if not GOOGLE_API_KEY or GOOGLE_API_KEY == "your_google_api_key_here": | |
raise ValueError("Google API 키가 설정되지 않았습니다. .env 파일을 확인하세요.") | |
# Whisper 모델 로딩 | |
self.log("Whisper 모델을 로딩합니다...") | |
self.whisper_model = whisper.load_model("base") | |
self.log("Whisper 모델 로딩 완료!") | |
# TextProcessor 초기화 | |
self.log("Gemini 텍스트 프로세서를 초기화합니다...") | |
self.text_processor = TextProcessor(GOOGLE_API_KEY) | |
self.text_processor.load_models() | |
self.log("모든 모델이 성공적으로 로딩되었습니다!") | |
self.update_status("준비 완료", "green") | |
# 처리 버튼 활성화 | |
def enable_button(): | |
self.process_btn.config(state=tk.NORMAL) | |
self.root.after(0, enable_button) | |
except Exception as e: | |
error_msg = f"모델 로딩 실패: {str(e)}" | |
self.log(error_msg) | |
self.update_status("모델 로딩 실패", "red") | |
messagebox.showerror("오류", error_msg) | |
def process_files_threaded(self): | |
"""별도 스레드에서 파일을 처리합니다.""" | |
if self.is_processing: | |
messagebox.showwarning("경고", "이미 처리 중입니다.") | |
return | |
threading.Thread(target=self.process_files, daemon=True).start() | |
def process_files(self): | |
"""data 폴더의 모든 WAV 파일을 처리합니다.""" | |
try: | |
self.is_processing = True | |
self.update_status("처리 중...", "orange") | |
# 진행률 표시 시작 | |
def start_progress(): | |
self.progress.start(10) | |
self.root.after(0, start_progress) | |
# WAV 파일 찾기 | |
wav_files = glob.glob("data/*.wav") | |
if not wav_files: | |
self.log("data 폴더에 WAV 파일이 없습니다.") | |
return | |
self.log(f"{len(wav_files)}개의 WAV 파일을 발견했습니다.") | |
# 각 파일 처리 | |
for i, wav_file in enumerate(wav_files): | |
self.log(f"\n=== 파일 처리 ({i+1}/{len(wav_files)}) ===") | |
success = self.process_single_audio_file(wav_file) | |
if success: | |
self.log(f"✅ {os.path.basename(wav_file)} 처리 완료") | |
else: | |
self.log(f"❌ {os.path.basename(wav_file)} 처리 실패") | |
self.log(f"\n모든 파일 처리 완료! 총 {len(wav_files)}개 파일") | |
self.update_status("처리 완료", "green") | |
except Exception as e: | |
error_msg = f"파일 처리 중 오류: {str(e)}" | |
self.log(error_msg) | |
self.update_status("처리 실패", "red") | |
finally: | |
self.is_processing = False | |
# 진행률 표시 중지 | |
def stop_progress(): | |
self.progress.stop() | |
self.root.after(0, stop_progress) | |
def process_single_audio_file(self, file_path): | |
"""단일 오디오 파일을 처리합니다.""" | |
try: | |
filename = os.path.basename(file_path) | |
base_name = os.path.splitext(filename)[0] | |
self.log(f"파일 처리 시작: {filename}") | |
# 1단계: Whisper로 음성 인식 | |
self.log("1/3: 음성 인식 진행 중...") | |
result = self.whisper_model.transcribe(file_path) | |
full_text = result['text'].strip() | |
if not full_text: | |
self.log(f"❌ 파일 {filename}에서 텍스트를 추출할 수 없습니다.") | |
return False | |
language = result.get('language', 'unknown') | |
self.log(f"음성 인식 완료 (언어: {language}, 길이: {len(full_text)}자)") | |
# 2단계: TextProcessor로 화자 분리 및 맞춤법 교정 | |
self.log("2/3: AI 화자 분리 및 맞춤법 교정 진행 중...") | |
def progress_callback(status, current, total): | |
self.log(f" → {status} ({current}/{total})") | |
text_result = self.text_processor.process_text( | |
full_text, | |
text_name=base_name, | |
progress_callback=progress_callback | |
) | |
if not text_result.get("success", False): | |
self.log(f"❌ 텍스트 처리 실패: {text_result.get('error', 'Unknown error')}") | |
return False | |
# 3단계: 결과 저장 | |
self.log("3/3: 결과 저장 중...") | |
# 기존 결과에 Whisper 정보 추가 | |
enhanced_result = text_result.copy() | |
enhanced_result.update({ | |
"base_name": base_name, | |
"language": language, | |
"whisper_segments": result.get("segments", []) | |
}) | |
# 파일 저장 | |
saved = self.text_processor.save_results_to_files(enhanced_result) | |
if saved: | |
self.log("결과 파일 저장 완료!") | |
else: | |
self.log("⚠️ 결과 파일 저장 중 일부 오류 발생") | |
return True | |
except Exception as e: | |
self.log(f"❌ 파일 {filename} 처리 중 오류: {str(e)}") | |
return False | |
def main(): | |
"""메인 함수""" | |
root = tk.Tk() | |
app = STTProcessorApp(root) | |
try: | |
root.mainloop() | |
except KeyboardInterrupt: | |
logger.info("사용자에 의해 프로그램이 종료되었습니다.") | |
except Exception as e: | |
logger.error(f"예상치 못한 오류: {e}") | |
messagebox.showerror("오류", f"예상치 못한 오류가 발생했습니다: {str(e)}") | |
if __name__ == "__main__": | |
main() | |