import whisper import google.generativeai as genai import os import json from datetime import datetime import re def test_speaker_separation(): """Gemini를 사용한 화자 분리 테스트""" # API 키 로드 from dotenv import load_dotenv load_dotenv() GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") if not GOOGLE_API_KEY or GOOGLE_API_KEY == "your_google_api_key_here": print("ERROR: Please set GOOGLE_API_KEY in .env file") return print("Loading models...") try: # Whisper 모델 로드 whisper_model = whisper.load_model("base") print("Whisper model loaded!") # Gemini 모델 설정 genai.configure(api_key=GOOGLE_API_KEY) # gemini-2.0-flash: 최신 Gemini 2.0 모델, 빠르고 정확한 화자 분리 gemini_model = genai.GenerativeModel('gemini-2.0-flash') print("Gemini 2.0 Flash model configured!") # WAV 파일 찾기 wav_files = [] if os.path.exists("data"): for file in os.listdir("data"): if file.endswith(".wav"): wav_files.append(os.path.join("data", file)) if not wav_files: print("No WAV files found in data folder.") return print(f"Found {len(wav_files)} WAV file(s)") for wav_file in wav_files[:1]: # 첫 번째 파일만 테스트 print(f"\nProcessing: {os.path.basename(wav_file)}") # 1단계: 음성 인식 print("Step 1: Speech recognition...") result = whisper_model.transcribe(wav_file) full_text = result['text'].strip() print(f"Language detected: {result['language']}") print(f"Text length: {len(full_text)} characters") print(f"Text preview: {full_text[:200]}...") # 2단계: 화자 분리 print("\nStep 2: Speaker separation with Gemini...") prompt = f""" 당신은 2명의 화자가 나누는 대화를 분석하는 전문가입니다. 주어진 텍스트를 분석하여 각 발언을 화자별로 구분해주세요. 분석 지침: 1. 대화의 맥락과 내용을 기반으로 화자를 구분하세요 2. 말투, 주제 전환, 질문과 답변의 패턴을 활용하세요 3. 화자1과 화자2로 구분하여 표시하세요 4. 각 발언 앞에 [화자1] 또는 [화자2]를 붙여주세요 5. 발언이 너무 길 경우 자연스러운 지점에서 나누어주세요 출력 형식: [화자1] 첫 번째 발언 내용 [화자2] 두 번째 발언 내용 [화자1] 세 번째 발언 내용 ... 분석할 텍스트: {full_text} """ response = gemini_model.generate_content(prompt) separated_text = response.text.strip() print("Speaker separation completed!") # 3단계: 맞춤법 교정 print("\nStep 3: Spell checking with Gemini...") spelling_prompt = f""" 당신은 한국어 맞춤법 교정 전문가입니다. 주어진 텍스트에서 맞춤법 오류, 띄어쓰기 오류, 오타를 수정해주세요. 교정 지침: 1. 자연스러운 한국어 표현으로 수정하되, 원본의 의미와 말투는 유지하세요 2. [화자1], [화자2] 태그는 그대로 유지하세요 3. 전문 용어나 고유명사는 가능한 정확하게 수정하세요 4. 구어체 특성은 유지하되, 명백한 오타만 수정하세요 5. 문맥에 맞는 올바른 단어로 교체하세요 교정할 텍스트: {separated_text} """ corrected_response = gemini_model.generate_content(spelling_prompt) corrected_text = corrected_response.text.strip() print("Spell checking completed!") # 4단계: 결과 저장 print("\nStep 4: Saving results...") base_name = os.path.splitext(os.path.basename(wav_file))[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # output 폴더 생성 if not os.path.exists("output"): os.makedirs("output") # 전체 결과 저장 (원본 + 분리 + 교정) result_path = f"output/{base_name}_complete_result_{timestamp}.txt" with open(result_path, 'w', encoding='utf-8') as f: f.write(f"Filename: {base_name}\n") f.write(f"Processing time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Language: {result['language']}\n") f.write("="*50 + "\n\n") f.write("Original text:\n") f.write(full_text + "\n\n") f.write("="*50 + "\n\n") f.write("Speaker separated text (original):\n") f.write(separated_text + "\n\n") f.write("="*50 + "\n\n") f.write("Speaker separated text (spell corrected):\n") f.write(corrected_text + "\n") # 교정된 텍스트에서 화자별 분리 결과 파싱 corrected_conversations = {"화자1": [], "화자2": []} pattern = r'\[화자([12])\]\s*(.+?)(?=\[화자[12]\]|$)' matches = re.findall(pattern, corrected_text, re.DOTALL) for speaker_num, content in matches: speaker = f"화자{speaker_num}" content = content.strip() if content: corrected_conversations[speaker].append(content) # 원본 화자별 분리 결과도 파싱 (비교용) original_conversations = {"화자1": [], "화자2": []} matches = re.findall(pattern, separated_text, re.DOTALL) for speaker_num, content in matches: speaker = f"화자{speaker_num}" content = content.strip() if content: original_conversations[speaker].append(content) # 교정된 화자별 개별 파일 저장 for speaker, utterances in corrected_conversations.items(): if utterances: speaker_path = f"output/{base_name}_{speaker}_교정본_{timestamp}.txt" with open(speaker_path, 'w', encoding='utf-8') as f: f.write(f"Filename: {base_name}\n") f.write(f"Speaker: {speaker}\n") f.write(f"Processing time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Number of utterances: {len(utterances)}\n") f.write("="*50 + "\n\n") for idx, utterance in enumerate(utterances, 1): f.write(f"{idx}. {utterance}\n\n") # JSON 저장 (원본과 교정본 모두 포함) json_path = f"output/{base_name}_complete_data_{timestamp}.json" json_data = { "filename": base_name, "processed_time": datetime.now().isoformat(), "language": result['language'], "original_text": full_text, "separated_text": separated_text, "corrected_text": corrected_text, "conversations_by_speaker_original": original_conversations, "conversations_by_speaker_corrected": corrected_conversations, "segments": result.get("segments", []) } with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2) print(f"Results saved:") print(f" - Complete result: {result_path}") print(f" - JSON data: {json_path}") for speaker in corrected_conversations: if corrected_conversations[speaker]: print(f" - {speaker} (교정본): {len(corrected_conversations[speaker])} utterances") print("\nProcessing completed successfully!") print("✓ Speech recognition with Whisper") print("✓ Speaker separation with Gemini 2.0") print("✓ Spell checking with Gemini 2.0") print("✓ Results saved (original + corrected versions)") except Exception as e: print(f"Error occurred: {e}") import traceback traceback.print_exc() if __name__ == "__main__": test_speaker_separation()