Spaces:
Runtime error
Runtime error
Jeongsoo1975
Initial commit: Gradio text-based speaker separation app for Hugging Face Spaces
ae9ec05
import tkinter as tk | |
from tkinter import scrolledtext, messagebox, ttk | |
import threading | |
import os | |
import torch | |
import whisper | |
import google.generativeai as genai | |
from dotenv import load_dotenv | |
import logging | |
import json | |
from datetime import datetime | |
import glob | |
import re | |
# 환경 변수 로드 | |
load_dotenv() | |
# --- 설정: .env 파일에서 API 키를 읽어옵니다 --- | |
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
# logs 폴더 생성 | |
if not os.path.exists("logs"): | |
os.makedirs("logs") | |
# output 폴더 생성 | |
if not os.path.exists("output"): | |
os.makedirs("output") | |
# data 폴더 생성 | |
if not os.path.exists("data"): | |
os.makedirs("data") | |
# 로깅 설정 | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('logs/stt_processor.log', encoding='utf-8'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# ----------------------------------------- | |
class STTProcessorApp: | |
def __init__(self, root): | |
self.root = root | |
self.root.title("2인 대화 STT 처리기 (AI 화자 분리)") | |
self.root.geometry("1000x750") | |
# 모델 로딩 상태 변수 | |
self.models_loaded = False | |
self.whisper_model = None | |
self.gemini_model = None | |
# UI 요소 생성 | |
self.main_frame = tk.Frame(root, padx=10, pady=10) | |
self.main_frame.pack(fill=tk.BOTH, expand=True) | |
# 제목 | |
title_label = tk.Label(self.main_frame, text="2인 대화 STT 처리기 (AI 화자 분리)", font=("Arial", 16, "bold")) | |
title_label.pack(pady=5) | |
# 설명 | |
desc_label = tk.Label(self.main_frame, text="Whisper STT + Gemini AI 화자 분리로 2명의 대화를 자동으로 구분합니다", font=("Arial", 10)) | |
desc_label.pack(pady=2) | |
# WAV 파일 목록 프레임 | |
files_frame = tk.LabelFrame(self.main_frame, text="data 폴더의 WAV 파일 목록", padx=5, pady=5) | |
files_frame.pack(fill=tk.BOTH, expand=True, pady=5) | |
# 파일 목록과 스크롤바 | |
list_frame = tk.Frame(files_frame) | |
list_frame.pack(fill=tk.BOTH, expand=True) | |
scrollbar = tk.Scrollbar(list_frame) | |
scrollbar.pack(side=tk.RIGHT, fill=tk.Y) | |
self.file_listbox = tk.Listbox(list_frame, yscrollcommand=scrollbar.set, selectmode=tk.SINGLE) | |
self.file_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) | |
scrollbar.config(command=self.file_listbox.yview) | |
# 버튼 프레임 | |
button_frame = tk.Frame(self.main_frame) | |
button_frame.pack(fill=tk.X, pady=5) | |
self.refresh_button = tk.Button(button_frame, text="파일 목록 새로고침", command=self.refresh_file_list) | |
self.refresh_button.pack(side=tk.LEFT, padx=5) | |
self.process_button = tk.Button(button_frame, text="선택된 파일 처리", command=self.start_processing, | |
state=tk.DISABLED) | |
self.process_button.pack(side=tk.LEFT, padx=5) | |
self.process_all_button = tk.Button(button_frame, text="모든 파일 처리", command=self.start_processing_all, | |
state=tk.DISABLED) | |
self.process_all_button.pack(side=tk.LEFT, padx=5) | |
# 진행률 표시 | |
progress_frame = tk.Frame(self.main_frame) | |
progress_frame.pack(fill=tk.X, pady=5) | |
tk.Label(progress_frame, text="진행률:").pack(side=tk.LEFT) | |
self.progress_var = tk.StringVar(value="대기 중") | |
tk.Label(progress_frame, textvariable=self.progress_var).pack(side=tk.LEFT, padx=10) | |
self.progress_bar = ttk.Progressbar(progress_frame, mode='determinate') | |
self.progress_bar.pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=10) | |
# 상태 표시줄 | |
self.status_label = tk.Label(self.main_frame, text="준비 완료. Google API 키를 설정하고 '처리' 버튼을 누르세요.", bd=1, | |
relief=tk.SUNKEN, anchor=tk.W) | |
self.status_label.pack(side=tk.BOTTOM, fill=tk.X) | |
# 결과 출력 영역 | |
result_frame = tk.LabelFrame(self.main_frame, text="처리 결과", padx=5, pady=5) | |
result_frame.pack(fill=tk.BOTH, expand=True, pady=5) | |
self.result_text = scrolledtext.ScrolledText(result_frame, wrap=tk.WORD, state=tk.DISABLED, height=15) | |
self.result_text.pack(fill=tk.BOTH, expand=True) | |
# 초기 파일 목록 로드 | |
self.refresh_file_list() | |
def refresh_file_list(self): | |
"""data 폴더의 WAV 파일 목록을 새로고침합니다.""" | |
self.file_listbox.delete(0, tk.END) | |
wav_files = glob.glob("data/*.wav") | |
if wav_files: | |
for file_path in wav_files: | |
filename = os.path.basename(file_path) | |
self.file_listbox.insert(tk.END, filename) | |
self.process_button.config(state=tk.NORMAL) | |
self.process_all_button.config(state=tk.NORMAL) | |
logger.info(f"{len(wav_files)}개의 WAV 파일을 발견했습니다.") | |
else: | |
self.file_listbox.insert(tk.END, "WAV 파일이 없습니다. data 폴더에 WAV 파일을 넣어주세요.") | |
self.process_button.config(state=tk.DISABLED) | |
self.process_all_button.config(state=tk.DISABLED) | |
logger.warning("data 폴더에 WAV 파일이 없습니다.") | |
def update_status(self, message): | |
"""UI의 상태 메시지를 업데이트합니다.""" | |
self.status_label.config(text=message) | |
self.root.update_idletasks() | |
def update_progress(self, current, total, message=""): | |
"""진행률을 업데이트합니다.""" | |
if total > 0: | |
progress = (current / total) * 100 | |
self.progress_bar.config(value=progress) | |
if message: | |
self.progress_var.set(f"{message} ({current}/{total})") | |
else: | |
self.progress_var.set(f"{current}/{total}") | |
self.root.update_idletasks() | |
def show_result(self, content): | |
"""결과 텍스트 영역에 내용을 표시합니다.""" | |
self.result_text.config(state=tk.NORMAL) | |
self.result_text.insert(tk.END, content + "\n\n") | |
self.result_text.see(tk.END) | |
self.result_text.config(state=tk.DISABLED) | |
def load_models(self): | |
"""필요한 AI 모델들을 로드합니다.""" | |
try: | |
if not GOOGLE_API_KEY or GOOGLE_API_KEY == "your_google_api_key_here": | |
messagebox.showerror("API 키 오류", ".env 파일에 올바른 Google AI API 키를 입력해주세요.") | |
logger.error("Google API 키가 설정되지 않았습니다.") | |
return False | |
logger.info("모델 로딩을 시작합니다.") | |
self.update_status("모델 로딩 중... (최초 실행 시 시간이 걸릴 수 있습니다)") | |
# Whisper 모델 로딩 | |
self.update_status("음성 인식 모델(Whisper) 로딩 중...") | |
logger.info("Whisper 모델 로딩을 시작합니다.") | |
self.whisper_model = whisper.load_model("base") # "small", "medium", "large" 등으로 변경 가능 | |
logger.info("Whisper 모델 로딩이 완료되었습니다.") | |
# Gemini 모델 설정 | |
self.update_status("AI 화자 분리 모델(Gemini) 설정 중...") | |
logger.info("Gemini 모델 설정을 시작합니다.") | |
genai.configure(api_key=GOOGLE_API_KEY) | |
# gemini-2.0-flash: 최신 Gemini 2.0 모델, 빠르고 정확한 처리 | |
self.gemini_model = genai.GenerativeModel('gemini-2.0-flash') | |
logger.info("Gemini 2.0 Flash 모델 설정이 완료되었습니다.") | |
self.models_loaded = True | |
self.update_status("모든 모델 로딩 완료. 처리 준비 완료.") | |
logger.info("모든 모델 로딩이 완료되었습니다.") | |
return True | |
except Exception as e: | |
error_msg = f"모델을 로딩하는 중 오류가 발생했습니다: {e}" | |
messagebox.showerror("모델 로딩 오류", error_msg) | |
logger.error(error_msg) | |
self.update_status("오류: 모델 로딩 실패") | |
return False | |
def start_processing(self): | |
"""선택된 파일 처리 시작.""" | |
selection = self.file_listbox.curselection() | |
if not selection: | |
messagebox.showwarning("파일 미선택", "처리할 파일을 선택해주세요.") | |
return | |
filename = self.file_listbox.get(selection[0]) | |
if filename == "WAV 파일이 없습니다. data 폴더에 WAV 파일을 넣어주세요.": | |
return | |
self.process_files([filename]) | |
def start_processing_all(self): | |
"""모든 파일 처리 시작.""" | |
wav_files = glob.glob("data/*.wav") | |
if not wav_files: | |
messagebox.showwarning("파일 없음", "data 폴더에 처리할 WAV 파일이 없습니다.") | |
return | |
filenames = [os.path.basename(f) for f in wav_files] | |
self.process_files(filenames) | |
def process_files(self, filenames): | |
"""파일 처리 시작.""" | |
# 모델이 로드되지 않았으면 먼저 로드 | |
if not self.models_loaded: | |
if not self.load_models(): | |
return # 모델 로딩 실패 시 중단 | |
# UI 비활성화 및 처리 스레드 시작 | |
self.refresh_button.config(state=tk.DISABLED) | |
self.process_button.config(state=tk.DISABLED) | |
self.process_all_button.config(state=tk.DISABLED) | |
processing_thread = threading.Thread(target=self.process_audio_files, args=(filenames,)) | |
processing_thread.start() | |
def process_audio_files(self, filenames): | |
"""백그라운드에서 여러 오디오 파일을 처리하는 메인 로직.""" | |
try: | |
total_files = len(filenames) | |
logger.info(f"{total_files}개의 파일 처리를 시작합니다.") | |
for idx, filename in enumerate(filenames): | |
file_path = os.path.join("data", filename) | |
self.update_progress(idx, total_files, f"처리 중: {filename}") | |
result = self.process_single_audio_file(file_path, filename) | |
if result: | |
self.show_result(f"✅ {filename} 처리 완료") | |
else: | |
self.show_result(f"❌ {filename} 처리 실패") | |
self.update_progress(total_files, total_files, "완료") | |
self.update_status("모든 파일 처리 완료!") | |
logger.info("모든 파일 처리가 완료되었습니다.") | |
except Exception as e: | |
error_msg = f"파일 처리 중 오류가 발생했습니다: {e}" | |
logger.error(error_msg) | |
self.update_status(f"오류: {e}") | |
finally: | |
# UI 다시 활성화 | |
self.refresh_button.config(state=tk.NORMAL) | |
self.process_button.config(state=tk.NORMAL) | |
self.process_all_button.config(state=tk.NORMAL) | |
def process_single_audio_file(self, file_path, filename): | |
"""단일 오디오 파일을 처리합니다.""" | |
try: | |
logger.info(f"파일 처리 시작: {file_path}") | |
base_name = os.path.splitext(filename)[0] | |
# 1단계: Whisper로 음성 인식 | |
self.update_status(f"1/4: 음성 인식 진행 중: {filename}") | |
logger.info(f"음성 인식 시작: {filename}") | |
result = self.whisper_model.transcribe(file_path) | |
full_text = result['text'].strip() | |
if not full_text: | |
logger.warning(f"파일 {filename}에서 텍스트를 추출할 수 없습니다.") | |
return False | |
# 2단계: Gemini로 화자 분리 | |
self.update_status(f"2/4: AI 화자 분리 진행 중: {filename}") | |
logger.info(f"AI 화자 분리 시작: {filename}") | |
speaker_separated_text = self.separate_speakers_with_gemini(full_text) | |
# 3단계: 맞춤법 교정 | |
self.update_status(f"3/4: 맞춤법 교정 진행 중: {filename}") | |
logger.info(f"맞춤법 교정 시작: {filename}") | |
corrected_text = self.correct_spelling_with_gemini(speaker_separated_text) | |
# 4단계: 결과 저장 | |
self.update_status(f"4/4: 결과 저장 중: {filename}") | |
self.save_separated_conversations(base_name, full_text, speaker_separated_text, corrected_text, result) | |
logger.info(f"파일 처리 완료: {filename}") | |
return True | |
except Exception as e: | |
logger.error(f"파일 {filename} 처리 중 오류: {e}") | |
return False | |
def separate_speakers_with_gemini(self, text): | |
"""Gemini API를 사용하여 텍스트를 화자별로 분리합니다.""" | |
try: | |
prompt = f""" | |
당신은 2명의 화자가 나누는 대화를 분석하는 전문가입니다. | |
주어진 텍스트를 분석하여 각 발언을 화자별로 구분해주세요. | |
분석 지침: | |
1. 대화의 맥락과 내용을 기반으로 화자를 구분하세요 | |
2. 말투, 주제 전환, 질문과 답변의 패턴을 활용하세요 | |
3. 화자1과 화자2로 구분하여 표시하세요 | |
4. 각 발언 앞에 [화자1] 또는 [화자2]를 붙여주세요 | |
5. 발언이 너무 길 경우 자연스러운 지점에서 나누어주세요 | |
출력 형식: | |
[화자1] 첫 번째 발언 내용 | |
[화자2] 두 번째 발언 내용 | |
[화자1] 세 번째 발언 내용 | |
... | |
분석할 텍스트: | |
{text} | |
""" | |
response = self.gemini_model.generate_content(prompt) | |
separated_text = response.text.strip() | |
logger.info("Gemini를 통한 화자 분리가 완료되었습니다.") | |
return separated_text | |
except Exception as e: | |
logger.error(f"Gemini 화자 분리 중 오류: {e}") | |
return f"[오류] 화자 분리 실패: {str(e)}" | |
def correct_spelling_with_gemini(self, separated_text): | |
"""Gemini API를 사용하여 화자별 분리된 텍스트의 맞춤법을 교정합니다.""" | |
try: | |
prompt = f""" | |
당신은 한국어 맞춤법 교정 전문가입니다. | |
주어진 텍스트에서 맞춤법 오류, 띄어쓰기 오류, 오타를 수정해주세요. | |
교정 지침: | |
1. 자연스러운 한국어 표현으로 수정하되, 원본의 의미와 말투는 유지하세요 | |
2. [화자1], [화자2] 태그는 그대로 유지하세요 | |
3. 전문 용어나 고유명사는 가능한 정확하게 수정하세요 | |
4. 구어체 특성은 유지하되, 명백한 오타만 수정하세요 | |
5. 문맥에 맞는 올바른 단어로 교체하세요 | |
수정이 필요한 예시: | |
- "치특기" → "치트키" | |
- "실점픽" → "실전 픽" | |
- "복사부천억" → "복사 붙여넣기" | |
- "핵심같이가" → "핵심 가치가" | |
- "재활" → "재활용" | |
- "저정할" → "저장할" | |
- "플레일" → "플레어" | |
- "서벌 수" → "서버리스" | |
- "커리" → "쿼리" | |
- "전력" → "전략" | |
- "클라클라" → "클라크" | |
- "가인만" → "가입만" | |
- "M5U" → "MAU" | |
- "나온 로도" → "다운로드" | |
- "무시무치" → "무시무시" | |
- "송신유금" → "송신 요금" | |
- "10지가" → "10GB" | |
- "유금" → "요금" | |
- "전 색을" → "전 세계" | |
- "도무원은" → "도구들은" | |
- "골차품데" → "골치 아픈데" | |
- "변원해" → "변환해" | |
- "f 운영" → "서비스 운영" | |
- "오류추저개" → "오류 추적기" | |
- "f 늘려질" → "서비스가 늘어날" | |
- "캐시칭" → "캐싱" | |
- "플레이어" → "플레어" | |
- "업스테시" → "업스태시" | |
- "원시근을" → "웬지슨" | |
- "부각이릉도" → "부각들도" | |
- "컴포넌트" → "컴포넌트" | |
- "본이터링" → "모니터링" | |
- "번뜨기는" → "번뜩이는" | |
- "사용적 경험" → "사용자 경험" | |
교정할 텍스트: | |
{separated_text} | |
""" | |
response = self.gemini_model.generate_content(prompt) | |
corrected_text = response.text.strip() | |
logger.info("Gemini를 통한 맞춤법 교정이 완료되었습니다.") | |
return corrected_text | |
except Exception as e: | |
logger.error(f"Gemini 맞춤법 교정 중 오류: {e}") | |
return separated_text # 오류 발생 시 원본 반환 | |
def parse_separated_text(self, separated_text): | |
"""화자별로 분리된 텍스트를 파싱하여 구조화합니다.""" | |
conversations = { | |
"화자1": [], | |
"화자2": [] | |
} | |
# 정규표현식으로 화자별 발언 추출 | |
pattern = r'\[화자([12])\]\s*(.+?)(?=\[화자[12]\]|$)' | |
matches = re.findall(pattern, separated_text, re.DOTALL) | |
for speaker_num, content in matches: | |
speaker = f"화자{speaker_num}" | |
content = content.strip() | |
if content: | |
conversations[speaker].append(content) | |
return conversations | |
def save_separated_conversations(self, base_name, original_text, separated_text, corrected_text, whisper_result): | |
"""화자별로 분리되고 맞춤법이 교정된 대화 내용을 파일로 저장합니다.""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# 교정된 텍스트에서 화자별 대화 파싱 | |
corrected_conversations = self.parse_separated_text(corrected_text) | |
# 원본 화자별 대화 파싱 (비교용) | |
original_conversations = self.parse_separated_text(separated_text) | |
# 1. 전체 대화 저장 (원본, 화자 분리, 맞춤법 교정 포함) | |
all_txt_path = f"output/{base_name}_전체대화_{timestamp}.txt" | |
with open(all_txt_path, 'w', encoding='utf-8') as f: | |
f.write(f"파일명: {base_name}\n") | |
f.write(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
f.write(f"언어: {whisper_result.get('language', 'unknown')}\n") | |
f.write("="*50 + "\n\n") | |
f.write("원본 텍스트:\n") | |
f.write(original_text + "\n\n") | |
f.write("="*50 + "\n\n") | |
f.write("화자별 분리 결과 (원본):\n") | |
f.write(separated_text + "\n\n") | |
f.write("="*50 + "\n\n") | |
f.write("화자별 분리 결과 (맞춤법 교정):\n") | |
f.write(corrected_text + "\n") | |
# 2. 교정된 화자별 개별 파일 저장 | |
for speaker, utterances in corrected_conversations.items(): | |
if utterances: | |
speaker_txt_path = f"output/{base_name}_{speaker}_교정본_{timestamp}.txt" | |
with open(speaker_txt_path, 'w', encoding='utf-8') as f: | |
f.write(f"파일명: {base_name}\n") | |
f.write(f"화자: {speaker}\n") | |
f.write(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
f.write(f"발언 수: {len(utterances)}\n") | |
f.write("="*50 + "\n\n") | |
for idx, utterance in enumerate(utterances, 1): | |
f.write(f"{idx}. {utterance}\n\n") | |
# 3. JSON 형태로도 저장 (분석용) | |
json_path = f"output/{base_name}_data_{timestamp}.json" | |
json_data = { | |
"filename": base_name, | |
"processed_time": datetime.now().isoformat(), | |
"language": whisper_result.get("language", "unknown"), | |
"original_text": original_text, | |
"separated_text": separated_text, | |
"corrected_text": corrected_text, | |
"conversations_by_speaker_original": original_conversations, | |
"conversations_by_speaker_corrected": corrected_conversations, | |
"segments": whisper_result.get("segments", []) | |
} | |
with open(json_path, 'w', encoding='utf-8') as f: | |
json.dump(json_data, f, ensure_ascii=False, indent=2) | |
logger.info(f"결과 저장 완료: {all_txt_path}, {json_path}") | |
logger.info(f"교정된 화자별 파일도 저장되었습니다.") | |
if __name__ == "__main__": | |
root = tk.Tk() | |
app = STTProcessorApp(root) | |
root.mainloop() | |