import tkinter as tk from tkinter import scrolledtext, messagebox, ttk import threading import os import torch import whisper import google.generativeai as genai from dotenv import load_dotenv import logging import json from datetime import datetime import glob import re # 환경 변수 로드 load_dotenv() # --- 설정: .env 파일에서 API 키를 읽어옵니다 --- GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") # logs 폴더 생성 if not os.path.exists("logs"): os.makedirs("logs") # output 폴더 생성 if not os.path.exists("output"): os.makedirs("output") # data 폴더 생성 if not os.path.exists("data"): os.makedirs("data") # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/stt_processor.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ----------------------------------------- class STTProcessorApp: def __init__(self, root): self.root = root self.root.title("2인 대화 STT 처리기 (AI 화자 분리)") self.root.geometry("1000x750") # 모델 로딩 상태 변수 self.models_loaded = False self.whisper_model = None self.gemini_model = None # UI 요소 생성 self.main_frame = tk.Frame(root, padx=10, pady=10) self.main_frame.pack(fill=tk.BOTH, expand=True) # 제목 title_label = tk.Label(self.main_frame, text="2인 대화 STT 처리기 (AI 화자 분리)", font=("Arial", 16, "bold")) title_label.pack(pady=5) # 설명 desc_label = tk.Label(self.main_frame, text="Whisper STT + Gemini AI 화자 분리로 2명의 대화를 자동으로 구분합니다", font=("Arial", 10)) desc_label.pack(pady=2) # WAV 파일 목록 프레임 files_frame = tk.LabelFrame(self.main_frame, text="data 폴더의 WAV 파일 목록", padx=5, pady=5) files_frame.pack(fill=tk.BOTH, expand=True, pady=5) # 파일 목록과 스크롤바 list_frame = tk.Frame(files_frame) list_frame.pack(fill=tk.BOTH, expand=True) scrollbar = tk.Scrollbar(list_frame) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.file_listbox = tk.Listbox(list_frame, yscrollcommand=scrollbar.set, selectmode=tk.SINGLE) self.file_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.config(command=self.file_listbox.yview) # 버튼 프레임 button_frame = tk.Frame(self.main_frame) button_frame.pack(fill=tk.X, pady=5) self.refresh_button = tk.Button(button_frame, text="파일 목록 새로고침", command=self.refresh_file_list) self.refresh_button.pack(side=tk.LEFT, padx=5) self.process_button = tk.Button(button_frame, text="선택된 파일 처리", command=self.start_processing, state=tk.DISABLED) self.process_button.pack(side=tk.LEFT, padx=5) self.process_all_button = tk.Button(button_frame, text="모든 파일 처리", command=self.start_processing_all, state=tk.DISABLED) self.process_all_button.pack(side=tk.LEFT, padx=5) # 진행률 표시 progress_frame = tk.Frame(self.main_frame) progress_frame.pack(fill=tk.X, pady=5) tk.Label(progress_frame, text="진행률:").pack(side=tk.LEFT) self.progress_var = tk.StringVar(value="대기 중") tk.Label(progress_frame, textvariable=self.progress_var).pack(side=tk.LEFT, padx=10) self.progress_bar = ttk.Progressbar(progress_frame, mode='determinate') self.progress_bar.pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=10) # 상태 표시줄 self.status_label = tk.Label(self.main_frame, text="준비 완료. Google API 키를 설정하고 '처리' 버튼을 누르세요.", bd=1, relief=tk.SUNKEN, anchor=tk.W) self.status_label.pack(side=tk.BOTTOM, fill=tk.X) # 결과 출력 영역 result_frame = tk.LabelFrame(self.main_frame, text="처리 결과", padx=5, pady=5) result_frame.pack(fill=tk.BOTH, expand=True, pady=5) self.result_text = scrolledtext.ScrolledText(result_frame, wrap=tk.WORD, state=tk.DISABLED, height=15) self.result_text.pack(fill=tk.BOTH, expand=True) # 초기 파일 목록 로드 self.refresh_file_list() def refresh_file_list(self): """data 폴더의 WAV 파일 목록을 새로고침합니다.""" self.file_listbox.delete(0, tk.END) wav_files = glob.glob("data/*.wav") if wav_files: for file_path in wav_files: filename = os.path.basename(file_path) self.file_listbox.insert(tk.END, filename) self.process_button.config(state=tk.NORMAL) self.process_all_button.config(state=tk.NORMAL) logger.info(f"{len(wav_files)}개의 WAV 파일을 발견했습니다.") else: self.file_listbox.insert(tk.END, "WAV 파일이 없습니다. data 폴더에 WAV 파일을 넣어주세요.") self.process_button.config(state=tk.DISABLED) self.process_all_button.config(state=tk.DISABLED) logger.warning("data 폴더에 WAV 파일이 없습니다.") def update_status(self, message): """UI의 상태 메시지를 업데이트합니다.""" self.status_label.config(text=message) self.root.update_idletasks() def update_progress(self, current, total, message=""): """진행률을 업데이트합니다.""" if total > 0: progress = (current / total) * 100 self.progress_bar.config(value=progress) if message: self.progress_var.set(f"{message} ({current}/{total})") else: self.progress_var.set(f"{current}/{total}") self.root.update_idletasks() def show_result(self, content): """결과 텍스트 영역에 내용을 표시합니다.""" self.result_text.config(state=tk.NORMAL) self.result_text.insert(tk.END, content + "\n\n") self.result_text.see(tk.END) self.result_text.config(state=tk.DISABLED) def load_models(self): """필요한 AI 모델들을 로드합니다.""" try: if not GOOGLE_API_KEY or GOOGLE_API_KEY == "your_google_api_key_here": messagebox.showerror("API 키 오류", ".env 파일에 올바른 Google AI API 키를 입력해주세요.") logger.error("Google API 키가 설정되지 않았습니다.") return False logger.info("모델 로딩을 시작합니다.") self.update_status("모델 로딩 중... (최초 실행 시 시간이 걸릴 수 있습니다)") # Whisper 모델 로딩 self.update_status("음성 인식 모델(Whisper) 로딩 중...") logger.info("Whisper 모델 로딩을 시작합니다.") self.whisper_model = whisper.load_model("base") # "small", "medium", "large" 등으로 변경 가능 logger.info("Whisper 모델 로딩이 완료되었습니다.") # Gemini 모델 설정 self.update_status("AI 화자 분리 모델(Gemini) 설정 중...") logger.info("Gemini 모델 설정을 시작합니다.") genai.configure(api_key=GOOGLE_API_KEY) # gemini-2.0-flash: 최신 Gemini 2.0 모델, 빠르고 정확한 처리 self.gemini_model = genai.GenerativeModel('gemini-2.0-flash') logger.info("Gemini 2.0 Flash 모델 설정이 완료되었습니다.") self.models_loaded = True self.update_status("모든 모델 로딩 완료. 처리 준비 완료.") logger.info("모든 모델 로딩이 완료되었습니다.") return True except Exception as e: error_msg = f"모델을 로딩하는 중 오류가 발생했습니다: {e}" messagebox.showerror("모델 로딩 오류", error_msg) logger.error(error_msg) self.update_status("오류: 모델 로딩 실패") return False def start_processing(self): """선택된 파일 처리 시작.""" selection = self.file_listbox.curselection() if not selection: messagebox.showwarning("파일 미선택", "처리할 파일을 선택해주세요.") return filename = self.file_listbox.get(selection[0]) if filename == "WAV 파일이 없습니다. data 폴더에 WAV 파일을 넣어주세요.": return self.process_files([filename]) def start_processing_all(self): """모든 파일 처리 시작.""" wav_files = glob.glob("data/*.wav") if not wav_files: messagebox.showwarning("파일 없음", "data 폴더에 처리할 WAV 파일이 없습니다.") return filenames = [os.path.basename(f) for f in wav_files] self.process_files(filenames) def process_files(self, filenames): """파일 처리 시작.""" # 모델이 로드되지 않았으면 먼저 로드 if not self.models_loaded: if not self.load_models(): return # 모델 로딩 실패 시 중단 # UI 비활성화 및 처리 스레드 시작 self.refresh_button.config(state=tk.DISABLED) self.process_button.config(state=tk.DISABLED) self.process_all_button.config(state=tk.DISABLED) processing_thread = threading.Thread(target=self.process_audio_files, args=(filenames,)) processing_thread.start() def process_audio_files(self, filenames): """백그라운드에서 여러 오디오 파일을 처리하는 메인 로직.""" try: total_files = len(filenames) logger.info(f"{total_files}개의 파일 처리를 시작합니다.") for idx, filename in enumerate(filenames): file_path = os.path.join("data", filename) self.update_progress(idx, total_files, f"처리 중: {filename}") result = self.process_single_audio_file(file_path, filename) if result: self.show_result(f"✅ {filename} 처리 완료") else: self.show_result(f"❌ {filename} 처리 실패") self.update_progress(total_files, total_files, "완료") self.update_status("모든 파일 처리 완료!") logger.info("모든 파일 처리가 완료되었습니다.") except Exception as e: error_msg = f"파일 처리 중 오류가 발생했습니다: {e}" logger.error(error_msg) self.update_status(f"오류: {e}") finally: # UI 다시 활성화 self.refresh_button.config(state=tk.NORMAL) self.process_button.config(state=tk.NORMAL) self.process_all_button.config(state=tk.NORMAL) def process_single_audio_file(self, file_path, filename): """단일 오디오 파일을 처리합니다.""" try: logger.info(f"파일 처리 시작: {file_path}") base_name = os.path.splitext(filename)[0] # 1단계: Whisper로 음성 인식 self.update_status(f"1/4: 음성 인식 진행 중: {filename}") logger.info(f"음성 인식 시작: {filename}") result = self.whisper_model.transcribe(file_path) full_text = result['text'].strip() if not full_text: logger.warning(f"파일 {filename}에서 텍스트를 추출할 수 없습니다.") return False # 2단계: Gemini로 화자 분리 self.update_status(f"2/4: AI 화자 분리 진행 중: {filename}") logger.info(f"AI 화자 분리 시작: {filename}") speaker_separated_text = self.separate_speakers_with_gemini(full_text) # 3단계: 맞춤법 교정 self.update_status(f"3/4: 맞춤법 교정 진행 중: {filename}") logger.info(f"맞춤법 교정 시작: {filename}") corrected_text = self.correct_spelling_with_gemini(speaker_separated_text) # 4단계: 결과 저장 self.update_status(f"4/4: 결과 저장 중: {filename}") self.save_separated_conversations(base_name, full_text, speaker_separated_text, corrected_text, result) logger.info(f"파일 처리 완료: {filename}") return True except Exception as e: logger.error(f"파일 {filename} 처리 중 오류: {e}") return False def separate_speakers_with_gemini(self, text): """Gemini API를 사용하여 텍스트를 화자별로 분리합니다.""" try: prompt = f""" 당신은 2명의 화자가 나누는 대화를 분석하는 전문가입니다. 주어진 텍스트를 분석하여 각 발언을 화자별로 구분해주세요. 분석 지침: 1. 대화의 맥락과 내용을 기반으로 화자를 구분하세요 2. 말투, 주제 전환, 질문과 답변의 패턴을 활용하세요 3. 화자1과 화자2로 구분하여 표시하세요 4. 각 발언 앞에 [화자1] 또는 [화자2]를 붙여주세요 5. 발언이 너무 길 경우 자연스러운 지점에서 나누어주세요 출력 형식: [화자1] 첫 번째 발언 내용 [화자2] 두 번째 발언 내용 [화자1] 세 번째 발언 내용 ... 분석할 텍스트: {text} """ response = self.gemini_model.generate_content(prompt) separated_text = response.text.strip() logger.info("Gemini를 통한 화자 분리가 완료되었습니다.") return separated_text except Exception as e: logger.error(f"Gemini 화자 분리 중 오류: {e}") return f"[오류] 화자 분리 실패: {str(e)}" def correct_spelling_with_gemini(self, separated_text): """Gemini API를 사용하여 화자별 분리된 텍스트의 맞춤법을 교정합니다.""" try: prompt = f""" 당신은 한국어 맞춤법 교정 전문가입니다. 주어진 텍스트에서 맞춤법 오류, 띄어쓰기 오류, 오타를 수정해주세요. 교정 지침: 1. 자연스러운 한국어 표현으로 수정하되, 원본의 의미와 말투는 유지하세요 2. [화자1], [화자2] 태그는 그대로 유지하세요 3. 전문 용어나 고유명사는 가능한 정확하게 수정하세요 4. 구어체 특성은 유지하되, 명백한 오타만 수정하세요 5. 문맥에 맞는 올바른 단어로 교체하세요 수정이 필요한 예시: - "치특기" → "치트키" - "실점픽" → "실전 픽" - "복사부천억" → "복사 붙여넣기" - "핵심같이가" → "핵심 가치가" - "재활" → "재활용" - "저정할" → "저장할" - "플레일" → "플레어" - "서벌 수" → "서버리스" - "커리" → "쿼리" - "전력" → "전략" - "클라클라" → "클라크" - "가인만" → "가입만" - "M5U" → "MAU" - "나온 로도" → "다운로드" - "무시무치" → "무시무시" - "송신유금" → "송신 요금" - "10지가" → "10GB" - "유금" → "요금" - "전 색을" → "전 세계" - "도무원은" → "도구들은" - "골차품데" → "골치 아픈데" - "변원해" → "변환해" - "f 운영" → "서비스 운영" - "오류추저개" → "오류 추적기" - "f 늘려질" → "서비스가 늘어날" - "캐시칭" → "캐싱" - "플레이어" → "플레어" - "업스테시" → "업스태시" - "원시근을" → "웬지슨" - "부각이릉도" → "부각들도" - "컴포넌트" → "컴포넌트" - "본이터링" → "모니터링" - "번뜨기는" → "번뜩이는" - "사용적 경험" → "사용자 경험" 교정할 텍스트: {separated_text} """ response = self.gemini_model.generate_content(prompt) corrected_text = response.text.strip() logger.info("Gemini를 통한 맞춤법 교정이 완료되었습니다.") return corrected_text except Exception as e: logger.error(f"Gemini 맞춤법 교정 중 오류: {e}") return separated_text # 오류 발생 시 원본 반환 def parse_separated_text(self, separated_text): """화자별로 분리된 텍스트를 파싱하여 구조화합니다.""" conversations = { "화자1": [], "화자2": [] } # 정규표현식으로 화자별 발언 추출 pattern = r'\[화자([12])\]\s*(.+?)(?=\[화자[12]\]|$)' matches = re.findall(pattern, separated_text, re.DOTALL) for speaker_num, content in matches: speaker = f"화자{speaker_num}" content = content.strip() if content: conversations[speaker].append(content) return conversations def save_separated_conversations(self, base_name, original_text, separated_text, corrected_text, whisper_result): """화자별로 분리되고 맞춤법이 교정된 대화 내용을 파일로 저장합니다.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 교정된 텍스트에서 화자별 대화 파싱 corrected_conversations = self.parse_separated_text(corrected_text) # 원본 화자별 대화 파싱 (비교용) original_conversations = self.parse_separated_text(separated_text) # 1. 전체 대화 저장 (원본, 화자 분리, 맞춤법 교정 포함) all_txt_path = f"output/{base_name}_전체대화_{timestamp}.txt" with open(all_txt_path, 'w', encoding='utf-8') as f: f.write(f"파일명: {base_name}\n") f.write(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"언어: {whisper_result.get('language', 'unknown')}\n") f.write("="*50 + "\n\n") f.write("원본 텍스트:\n") f.write(original_text + "\n\n") f.write("="*50 + "\n\n") f.write("화자별 분리 결과 (원본):\n") f.write(separated_text + "\n\n") f.write("="*50 + "\n\n") f.write("화자별 분리 결과 (맞춤법 교정):\n") f.write(corrected_text + "\n") # 2. 교정된 화자별 개별 파일 저장 for speaker, utterances in corrected_conversations.items(): if utterances: speaker_txt_path = f"output/{base_name}_{speaker}_교정본_{timestamp}.txt" with open(speaker_txt_path, 'w', encoding='utf-8') as f: f.write(f"파일명: {base_name}\n") f.write(f"화자: {speaker}\n") f.write(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"발언 수: {len(utterances)}\n") f.write("="*50 + "\n\n") for idx, utterance in enumerate(utterances, 1): f.write(f"{idx}. {utterance}\n\n") # 3. JSON 형태로도 저장 (분석용) json_path = f"output/{base_name}_data_{timestamp}.json" json_data = { "filename": base_name, "processed_time": datetime.now().isoformat(), "language": whisper_result.get("language", "unknown"), "original_text": original_text, "separated_text": separated_text, "corrected_text": corrected_text, "conversations_by_speaker_original": original_conversations, "conversations_by_speaker_corrected": corrected_conversations, "segments": whisper_result.get("segments", []) } with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2) logger.info(f"결과 저장 완료: {all_txt_path}, {json_path}") logger.info(f"교정된 화자별 파일도 저장되었습니다.") if __name__ == "__main__": root = tk.Tk() app = STTProcessorApp(root) root.mainloop()