🎤 2인 대화 화자 분리기 (AI)
Whisper + Gemini 2.0 Flash AI를 사용한 음성 인식, 화자 분리 및 맞춤법 교정
import gradio as gr import os import logging from datetime import datetime # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 전역 변수 text_processor = None whisper_model = None def safe_import_whisper(): """Whisper를 안전하게 import합니다.""" try: import whisper return whisper, None except Exception as e: logger.error(f"Whisper import 실패: {e}") return None, str(e) def safe_import_processor(): """TextProcessor를 안전하게 import합니다.""" try: from stt_processor import TextProcessor return TextProcessor, None except Exception as e: logger.error(f"TextProcessor import 실패: {e}") return None, str(e) def initialize_models(): """모델들을 초기화합니다.""" global text_processor, whisper_model try: # 환경 변수 또는 Hugging Face Secrets에서 API 키 읽기 google_api_key = os.getenv("GOOGLE_API_KEY") # API 키 안전하게 검증 if not google_api_key or not isinstance(google_api_key, str) or len(google_api_key.strip()) == 0: return False, "❌ Google API 키가 설정되지 않았습니다. Hugging Face Spaces의 Settings에서 GOOGLE_API_KEY를 설정해주세요." # Whisper 모델 로드 시도 whisper_lib, whisper_error = safe_import_whisper() if whisper_lib is None: return False, f"❌ Whisper 라이브러리 로딩 실패: {whisper_error}" logger.info("Whisper 모델을 로딩합니다...") whisper_model = whisper_lib.load_model("base") logger.info("Whisper 모델 로딩 완료") # 텍스트 프로세서 초기화 TextProcessor, processor_error = safe_import_processor() if TextProcessor is None: return False, f"❌ TextProcessor 로딩 실패: {processor_error}" text_processor = TextProcessor(google_api_key.strip()) return True, "✅ 모든 모델이 초기화되었습니다." except Exception as e: logger.error(f"모델 초기화 실패: {e}") return False, f"❌ 초기화 실패: {str(e)}" def process_audio_file(audio_file, speaker1_name, speaker2_name, progress=gr.Progress()): """업로드된 오디오 파일을 처리합니다.""" global text_processor, whisper_model if audio_file is None: return "❌ 오디오 파일을 업로드해주세요.", "", "", "", "", "", None try: # 모델 초기화 (필요한 경우) if whisper_model is None or text_processor is None: progress(0.05, desc="모델 초기화 중...") success, message = initialize_models() if not success: return message, "", "", "", "", "", None # 오디오 파일 경로 확인 audio_path = audio_file.name if hasattr(audio_file, 'name') else str(audio_file) logger.info(f"오디오 파일 처리 시작: {audio_path}") # 1단계: Whisper로 음성 인식 progress(0.1, desc="음성을 텍스트로 변환 중...") logger.info("Whisper를 통한 음성 인식 시작") result = whisper_model.transcribe(audio_path) full_text = result['text'].strip() if not full_text: return "❌ 오디오에서 텍스트를 추출할 수 없습니다.", "", "", "", "", "", None language = result.get('language', 'unknown') logger.info(f"음성 인식 완료. 언어: {language}, 텍스트 길이: {len(full_text)}") # 2단계: AI 모델 로딩 progress(0.3, desc="AI 모델 로딩 중...") if not text_processor.models_loaded: text_processor.load_models() # 진행 상황 콜백 함수 def progress_callback(status, current, total): progress_value = 0.3 + (current / total) * 0.6 # 0.3~0.9 범위 progress(progress_value, desc=f"{status} ({current}/{total})") # 3단계: 텍스트 처리 (화자 분리 + 맞춤법 교정) progress(0.4, desc="AI 화자 분리 및 맞춤법 교정 중...") # 사용자 정의 화자 이름 적용 custom_speaker1 = speaker1_name.strip() if speaker1_name and speaker1_name.strip() else None custom_speaker2 = speaker2_name.strip() if speaker2_name and speaker2_name.strip() else None text_result = text_processor.process_text( full_text, progress_callback=progress_callback, speaker1_name=custom_speaker1, speaker2_name=custom_speaker2 ) if not text_result.get("success", False): return f"❌ 텍스트 처리 실패: {text_result.get('error', 'Unknown error')}", full_text, "", "", "", "", None # 결과 추출 progress(0.95, desc="결과 정리 중...") original_text = text_result["original_text"] separated_text = text_result["separated_text"] corrected_text = text_result["corrected_text"] # 화자별 대화 추출 conversations = text_result["conversations_by_speaker_corrected"] speaker1_key = custom_speaker1 or "화자1" speaker2_key = custom_speaker2 or "화자2" speaker1_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker1_key, []))]) speaker2_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker2_key, []))]) # 다운로드 파일 생성 download_file = None try: text_processor.save_results_to_files(text_result) zip_path = text_processor.create_download_zip(text_result) if zip_path and os.path.exists(zip_path): download_file = zip_path except Exception as e: logger.warning(f"다운로드 파일 생성 실패: {e}") progress(1.0, desc="처리 완료!") status_message = f""" ✅ **오디오 처리 완료!** - 파일명: {os.path.basename(audio_path)} - 처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 감지된 언어: {language} - 텍스트 길이: {len(full_text)}자 - {speaker1_key} 발언 수: {len(conversations.get(speaker1_key, []))}개 - {speaker2_key} 발언 수: {len(conversations.get(speaker2_key, []))}개 """ return status_message, original_text, separated_text, corrected_text, speaker1_text, speaker2_text, download_file except Exception as e: logger.error(f"오디오 파일 처리 중 오류: {e}") return f"❌ 처리 중 오류가 발생했습니다: {str(e)}", "", "", "", "", "", None def process_text_input(input_text, speaker1_name, speaker2_name, progress=gr.Progress()): """입력된 텍스트를 처리합니다.""" global text_processor if not input_text or not input_text.strip(): return "❌ 처리할 텍스트를 입력해주세요.", "", "", "", "", "", None try: # 텍스트 프로세서만 초기화 if text_processor is None: progress(0.1, desc="텍스트 프로세서 초기화 중...") google_api_key = os.getenv("GOOGLE_API_KEY") if not google_api_key or not isinstance(google_api_key, str) or len(google_api_key.strip()) == 0: return "❌ Google API 키가 설정되지 않았습니다.", "", "", "", "", "", None TextProcessor, processor_error = safe_import_processor() if TextProcessor is None: return f"❌ TextProcessor 로딩 실패: {processor_error}", "", "", "", "", "", None text_processor = TextProcessor(google_api_key.strip()) # 모델 로딩 progress(0.2, desc="AI 모델 로딩 중...") if not text_processor.models_loaded: text_processor.load_models() # 진행 상황 콜백 함수 def progress_callback(status, current, total): progress_value = 0.2 + (current / total) * 0.7 # 0.2~0.9 범위 progress(progress_value, desc=f"{status} ({current}/{total})") # 텍스트 처리 progress(0.3, desc="텍스트 처리 시작...") # 사용자 정의 화자 이름 적용 custom_speaker1 = speaker1_name.strip() if speaker1_name and speaker1_name.strip() else None custom_speaker2 = speaker2_name.strip() if speaker2_name and speaker2_name.strip() else None result = text_processor.process_text( input_text, progress_callback=progress_callback, speaker1_name=custom_speaker1, speaker2_name=custom_speaker2 ) if not result.get("success", False): return f"❌ 처리 실패: {result.get('error', 'Unknown error')}", "", "", "", "", "", None # 결과 추출 progress(0.95, desc="결과 정리 중...") original_text = result["original_text"] separated_text = result["separated_text"] corrected_text = result["corrected_text"] # 화자별 대화 추출 conversations = result["conversations_by_speaker_corrected"] speaker1_key = custom_speaker1 or "화자1" speaker2_key = custom_speaker2 or "화자2" speaker1_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker1_key, []))]) speaker2_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker2_key, []))]) # 다운로드 파일 생성 download_file = None try: text_processor.save_results_to_files(result) zip_path = text_processor.create_download_zip(result) if zip_path and os.path.exists(zip_path): download_file = zip_path except Exception as e: logger.warning(f"다운로드 파일 생성 실패: {e}") progress(1.0, desc="처리 완료!") status_message = f""" ✅ **텍스트 처리 완료!** - 텍스트명: {result['text_name']} - 처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {speaker1_key} 발언 수: {len(conversations.get(speaker1_key, []))}개 - {speaker2_key} 발언 수: {len(conversations.get(speaker2_key, []))}개 """ return status_message, original_text, separated_text, corrected_text, speaker1_text, speaker2_text, download_file except Exception as e: logger.error(f"텍스트 처리 중 오류: {e}") return f"❌ 처리 중 오류가 발생했습니다: {str(e)}", "", "", "", "", "", None def create_interface(): """Gradio 인터페이스를 생성합니다.""" # CSS 스타일링 css = """ .gradio-container { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } .status-box { padding: 15px; border-radius: 8px; margin: 10px 0; } .main-header { text-align: center; color: #2c3e50; margin-bottom: 20px; } .speaker-config { background-color: #f8f9fa; padding: 15px; border-radius: 8px; margin: 10px 0; } """ with gr.Blocks(css=css, title="2인 대화 STT 처리기") as interface: # 헤더 gr.HTML("""
Whisper + Gemini 2.0 Flash AI를 사용한 음성 인식, 화자 분리 및 맞춤법 교정