Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import logging | |
from datetime import datetime | |
# 로깅 설정 | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# 전역 변수 | |
text_processor = None | |
whisper_model = None | |
def safe_import_whisper(): | |
"""Whisper를 안전하게 import합니다.""" | |
try: | |
import whisper | |
return whisper, None | |
except Exception as e: | |
logger.error(f"Whisper import 실패: {e}") | |
return None, str(e) | |
def safe_import_processor(): | |
"""TextProcessor를 안전하게 import합니다.""" | |
try: | |
from stt_processor import TextProcessor | |
return TextProcessor, None | |
except Exception as e: | |
logger.error(f"TextProcessor import 실패: {e}") | |
return None, str(e) | |
def initialize_models(): | |
"""모델들을 초기화합니다.""" | |
global text_processor, whisper_model | |
try: | |
# 환경 변수 또는 Hugging Face Secrets에서 API 키 읽기 | |
google_api_key = os.getenv("GOOGLE_API_KEY") | |
# API 키 안전하게 검증 | |
if not google_api_key or not isinstance(google_api_key, str) or len(google_api_key.strip()) == 0: | |
return False, "❌ Google API 키가 설정되지 않았습니다. Hugging Face Spaces의 Settings에서 GOOGLE_API_KEY를 설정해주세요." | |
# Whisper 모델 로드 시도 | |
whisper_lib, whisper_error = safe_import_whisper() | |
if whisper_lib is None: | |
return False, f"❌ Whisper 라이브러리 로딩 실패: {whisper_error}" | |
logger.info("Whisper 모델을 로딩합니다...") | |
whisper_model = whisper_lib.load_model("base") | |
logger.info("Whisper 모델 로딩 완료") | |
# 텍스트 프로세서 초기화 | |
TextProcessor, processor_error = safe_import_processor() | |
if TextProcessor is None: | |
return False, f"❌ TextProcessor 로딩 실패: {processor_error}" | |
text_processor = TextProcessor(google_api_key.strip()) | |
return True, "✅ 모든 모델이 초기화되었습니다." | |
except Exception as e: | |
logger.error(f"모델 초기화 실패: {e}") | |
return False, f"❌ 초기화 실패: {str(e)}" | |
def process_audio_file(audio_file, speaker1_name, speaker2_name, progress=gr.Progress()): | |
"""업로드된 오디오 파일을 처리합니다.""" | |
global text_processor, whisper_model | |
if audio_file is None: | |
return "❌ 오디오 파일을 업로드해주세요.", "", "", "", "", "", None | |
try: | |
# 모델 초기화 (필요한 경우) | |
if whisper_model is None or text_processor is None: | |
progress(0.05, desc="모델 초기화 중...") | |
success, message = initialize_models() | |
if not success: | |
return message, "", "", "", "", "", None | |
# 오디오 파일 경로 확인 | |
audio_path = audio_file.name if hasattr(audio_file, 'name') else str(audio_file) | |
logger.info(f"오디오 파일 처리 시작: {audio_path}") | |
# 1단계: Whisper로 음성 인식 | |
progress(0.1, desc="음성을 텍스트로 변환 중...") | |
logger.info("Whisper를 통한 음성 인식 시작") | |
result = whisper_model.transcribe(audio_path) | |
full_text = result['text'].strip() | |
if not full_text: | |
return "❌ 오디오에서 텍스트를 추출할 수 없습니다.", "", "", "", "", "", None | |
language = result.get('language', 'unknown') | |
logger.info(f"음성 인식 완료. 언어: {language}, 텍스트 길이: {len(full_text)}") | |
# 2단계: AI 모델 로딩 | |
progress(0.3, desc="AI 모델 로딩 중...") | |
if not text_processor.models_loaded: | |
text_processor.load_models() | |
# 진행 상황 콜백 함수 | |
def progress_callback(status, current, total): | |
progress_value = 0.3 + (current / total) * 0.6 # 0.3~0.9 범위 | |
progress(progress_value, desc=f"{status} ({current}/{total})") | |
# 3단계: 텍스트 처리 (화자 분리 + 맞춤법 교정) | |
progress(0.4, desc="AI 화자 분리 및 맞춤법 교정 중...") | |
# 사용자 정의 화자 이름 적용 | |
custom_speaker1 = speaker1_name.strip() if speaker1_name and speaker1_name.strip() else None | |
custom_speaker2 = speaker2_name.strip() if speaker2_name and speaker2_name.strip() else None | |
text_result = text_processor.process_text( | |
full_text, | |
progress_callback=progress_callback, | |
speaker1_name=custom_speaker1, | |
speaker2_name=custom_speaker2 | |
) | |
if not text_result.get("success", False): | |
return f"❌ 텍스트 처리 실패: {text_result.get('error', 'Unknown error')}", full_text, "", "", "", "", None | |
# 결과 추출 | |
progress(0.95, desc="결과 정리 중...") | |
original_text = text_result["original_text"] | |
separated_text = text_result["separated_text"] | |
corrected_text = text_result["corrected_text"] | |
# 화자별 대화 추출 | |
conversations = text_result["conversations_by_speaker_corrected"] | |
speaker1_key = custom_speaker1 or "화자1" | |
speaker2_key = custom_speaker2 or "화자2" | |
speaker1_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker1_key, []))]) | |
speaker2_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker2_key, []))]) | |
# 다운로드 파일 생성 | |
download_file = None | |
try: | |
text_processor.save_results_to_files(text_result) | |
zip_path = text_processor.create_download_zip(text_result) | |
if zip_path and os.path.exists(zip_path): | |
download_file = zip_path | |
except Exception as e: | |
logger.warning(f"다운로드 파일 생성 실패: {e}") | |
progress(1.0, desc="처리 완료!") | |
status_message = f""" | |
✅ **오디오 처리 완료!** | |
- 파일명: {os.path.basename(audio_path)} | |
- 처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
- 감지된 언어: {language} | |
- 텍스트 길이: {len(full_text)}자 | |
- {speaker1_key} 발언 수: {len(conversations.get(speaker1_key, []))}개 | |
- {speaker2_key} 발언 수: {len(conversations.get(speaker2_key, []))}개 | |
""" | |
return status_message, original_text, separated_text, corrected_text, speaker1_text, speaker2_text, download_file | |
except Exception as e: | |
logger.error(f"오디오 파일 처리 중 오류: {e}") | |
return f"❌ 처리 중 오류가 발생했습니다: {str(e)}", "", "", "", "", "", None | |
def process_text_input(input_text, speaker1_name, speaker2_name, progress=gr.Progress()): | |
"""입력된 텍스트를 처리합니다.""" | |
global text_processor | |
if not input_text or not input_text.strip(): | |
return "❌ 처리할 텍스트를 입력해주세요.", "", "", "", "", "", None | |
try: | |
# 텍스트 프로세서만 초기화 | |
if text_processor is None: | |
progress(0.1, desc="텍스트 프로세서 초기화 중...") | |
google_api_key = os.getenv("GOOGLE_API_KEY") | |
if not google_api_key or not isinstance(google_api_key, str) or len(google_api_key.strip()) == 0: | |
return "❌ Google API 키가 설정되지 않았습니다.", "", "", "", "", "", None | |
TextProcessor, processor_error = safe_import_processor() | |
if TextProcessor is None: | |
return f"❌ TextProcessor 로딩 실패: {processor_error}", "", "", "", "", "", None | |
text_processor = TextProcessor(google_api_key.strip()) | |
# 모델 로딩 | |
progress(0.2, desc="AI 모델 로딩 중...") | |
if not text_processor.models_loaded: | |
text_processor.load_models() | |
# 진행 상황 콜백 함수 | |
def progress_callback(status, current, total): | |
progress_value = 0.2 + (current / total) * 0.7 # 0.2~0.9 범위 | |
progress(progress_value, desc=f"{status} ({current}/{total})") | |
# 텍스트 처리 | |
progress(0.3, desc="텍스트 처리 시작...") | |
# 사용자 정의 화자 이름 적용 | |
custom_speaker1 = speaker1_name.strip() if speaker1_name and speaker1_name.strip() else None | |
custom_speaker2 = speaker2_name.strip() if speaker2_name and speaker2_name.strip() else None | |
result = text_processor.process_text( | |
input_text, | |
progress_callback=progress_callback, | |
speaker1_name=custom_speaker1, | |
speaker2_name=custom_speaker2 | |
) | |
if not result.get("success", False): | |
return f"❌ 처리 실패: {result.get('error', 'Unknown error')}", "", "", "", "", "", None | |
# 결과 추출 | |
progress(0.95, desc="결과 정리 중...") | |
original_text = result["original_text"] | |
separated_text = result["separated_text"] | |
corrected_text = result["corrected_text"] | |
# 화자별 대화 추출 | |
conversations = result["conversations_by_speaker_corrected"] | |
speaker1_key = custom_speaker1 or "화자1" | |
speaker2_key = custom_speaker2 or "화자2" | |
speaker1_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker1_key, []))]) | |
speaker2_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker2_key, []))]) | |
# 다운로드 파일 생성 | |
download_file = None | |
try: | |
text_processor.save_results_to_files(result) | |
zip_path = text_processor.create_download_zip(result) | |
if zip_path and os.path.exists(zip_path): | |
download_file = zip_path | |
except Exception as e: | |
logger.warning(f"다운로드 파일 생성 실패: {e}") | |
progress(1.0, desc="처리 완료!") | |
status_message = f""" | |
✅ **텍스트 처리 완료!** | |
- 텍스트명: {result['text_name']} | |
- 처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
- {speaker1_key} 발언 수: {len(conversations.get(speaker1_key, []))}개 | |
- {speaker2_key} 발언 수: {len(conversations.get(speaker2_key, []))}개 | |
""" | |
return status_message, original_text, separated_text, corrected_text, speaker1_text, speaker2_text, download_file | |
except Exception as e: | |
logger.error(f"텍스트 처리 중 오류: {e}") | |
return f"❌ 처리 중 오류가 발생했습니다: {str(e)}", "", "", "", "", "", None | |
def create_interface(): | |
"""Gradio 인터페이스를 생성합니다.""" | |
# CSS 스타일링 | |
css = """ | |
.gradio-container { | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
} | |
.status-box { | |
padding: 15px; | |
border-radius: 8px; | |
margin: 10px 0; | |
} | |
.main-header { | |
text-align: center; | |
color: #2c3e50; | |
margin-bottom: 20px; | |
} | |
.speaker-config { | |
background-color: #f8f9fa; | |
padding: 15px; | |
border-radius: 8px; | |
margin: 10px 0; | |
} | |
""" | |
with gr.Blocks(css=css, title="2인 대화 STT 처리기") as interface: | |
# 헤더 | |
gr.HTML(""" | |
<div class="main-header"> | |
<h1>🎤 2인 대화 화자 분리기 (AI)</h1> | |
<p>Whisper + Gemini 2.0 Flash AI를 사용한 음성 인식, 화자 분리 및 맞춤법 교정</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# 화자 이름 설정 | |
gr.HTML('<div class="speaker-config">') | |
gr.Markdown("### 👥 화자 이름 설정 (선택사항)") | |
with gr.Row(): | |
speaker1_name = gr.Textbox( | |
label="화자1 이름", | |
placeholder="예: 김팀장, 홍길동 등 (비워두면 '화자1')", | |
value="", | |
scale=1 | |
) | |
speaker2_name = gr.Textbox( | |
label="화자2 이름", | |
placeholder="예: 이대리, 김영희 등 (비워두면 '화자2')", | |
value="", | |
scale=1 | |
) | |
gr.HTML('</div>') | |
# 입력 섹션 | |
with gr.Tabs(): | |
with gr.TabItem("🎤 오디오 업로드"): | |
gr.Markdown("### 🎤 오디오 파일 업로드") | |
audio_input = gr.Audio( | |
label="2인 대화 오디오 파일을 업로드하세요", | |
type="filepath" | |
) | |
audio_process_btn = gr.Button( | |
"🚀 오디오 처리 시작", | |
variant="primary", | |
size="lg" | |
) | |
with gr.TabItem("📝 텍스트 입력"): | |
gr.Markdown("### 📝 텍스트 직접 입력") | |
text_input = gr.Textbox( | |
label="2인 대화 텍스트를 입력하세요", | |
placeholder="두 명이 나누는 대화 내용을 여기에 붙여넣기하세요...\n\n예시:\n안녕하세요, 오늘 회의에 참석해주셔서 감사합니다. 네, 안녕하세요. 준비된 자료가 있나요? 네, 프레젠테이션 자료를 준비했습니다.", | |
lines=8, | |
max_lines=15 | |
) | |
text_process_btn = gr.Button( | |
"🚀 텍스트 처리 시작", | |
variant="primary", | |
size="lg" | |
) | |
# 상태 표시 | |
status_output = gr.Markdown( | |
"### 📊 처리 상태\n준비 완료. 화자 이름을 설정하고 오디오 파일을 업로드하거나 텍스트를 입력한 후 처리 버튼을 클릭하세요.", | |
elem_classes=["status-box"] | |
) | |
# 다운로드 섹션 | |
gr.Markdown("### 📥 결과 다운로드") | |
download_file = gr.File( | |
label="처리 결과 ZIP 파일", | |
visible=False | |
) | |
with gr.Column(scale=2): | |
# 결과 표시 섹션 | |
with gr.Tabs(): | |
with gr.TabItem("📝 원본 텍스트"): | |
original_output = gr.Textbox( | |
label="추출/입력된 원본 텍스트", | |
lines=10, | |
max_lines=20, | |
placeholder="처리 후 원본 텍스트가 여기에 표시됩니다..." | |
) | |
with gr.TabItem("👥 화자 분리 (원본)"): | |
separated_output = gr.Textbox( | |
label="AI 화자 분리 결과 (원본)", | |
lines=10, | |
max_lines=20, | |
placeholder="처리 후 화자별로 분리된 대화가 여기에 표시됩니다..." | |
) | |
with gr.TabItem("✏️ 화자 분리 (교정)"): | |
corrected_output = gr.Textbox( | |
label="AI 화자 분리 결과 (맞춤법 교정)", | |
lines=10, | |
max_lines=20, | |
placeholder="처리 후 맞춤법이 교정된 화자 분리 결과가 여기에 표시됩니다..." | |
) | |
with gr.TabItem("👤 화자1 대화"): | |
speaker1_output = gr.Textbox( | |
label="화자1의 모든 발언", | |
lines=10, | |
max_lines=20, | |
placeholder="처리 후 화자1의 발언들이 여기에 표시됩니다..." | |
) | |
with gr.TabItem("👤 화자2 대화"): | |
speaker2_output = gr.Textbox( | |
label="화자2의 모든 발언", | |
lines=10, | |
max_lines=20, | |
placeholder="처리 후 화자2의 발언들이 여기에 표시됩니다..." | |
) | |
# 사용법 안내 | |
gr.Markdown(""" | |
### 📖 사용법 | |
**🎤 오디오 파일 처리:** | |
1. **화자 이름 설정**: 원하는 화자 이름을 입력하세요 (예: 김팀장, 이대리) | |
2. **오디오 업로드**: WAV, MP3, MP4 등의 오디오 파일을 업로드하세요 | |
3. **처리 시작**: '🚀 오디오 처리 시작' 버튼을 클릭하세요 | |
4. **결과 확인**: 음성 인식 → 화자 분리 → 맞춤법 교정 순으로 처리됩니다 | |
5. **다운로드**: 처리 완료 후 ZIP 파일로 모든 결과를 다운로드할 수 있습니다 | |
**📝 텍스트 직접 입력:** | |
1. **화자 이름 설정**: 원하는 화자 이름을 입력하세요 | |
2. **텍스트 입력**: 2인 대화 텍스트를 입력란에 붙여넣기하세요 | |
3. **처리 시작**: '🚀 텍스트 처리 시작' 버튼을 클릭하세요 | |
4. **결과 확인**: 각 탭에서 화자 분리 결과를 확인하세요 | |
### ⚙️ 기술 정보 | |
- **음성 인식**: OpenAI Whisper (다국어 지원) | |
- **화자 분리**: Google Gemini 2.0 Flash + AI 응답 검증 | |
- **맞춤법 교정**: 고급 AI 기반 한국어 교정 | |
- **청킹 처리**: 대용량 텍스트 자동 분할 처리 | |
- **지원 형식**: WAV, MP3, MP4, M4A 등 | |
- **최적 환경**: 2인 대화, 명확한 음질 | |
### 🆕 새로운 기능 | |
- **사용자 정의 화자 이름**: '화자1', '화자2' 대신 실제 이름 사용 | |
- **다운로드 기능**: 전체 결과를 ZIP 파일로 다운로드 | |
- **AI 응답 검증**: 화자 분리 실패 시 자동 감지 및 오류 처리 | |
- **대용량 파일 지원**: 긴 오디오도 청킹으로 안정적 처리 | |
### ⚠️ 주의사항 | |
- 처리 시간은 오디오 길이에 따라 달라집니다 (보통 1-5분) | |
- Google AI API 사용량 제한이 있을 수 있습니다 | |
- 2인 대화에 최적화되어 있습니다 | |
- 음질이 좋을수록 더 정확한 결과를 얻을 수 있습니다 | |
- 배경 소음이 적고 화자 구분이 명확한 오디오를 권장합니다 | |
""") | |
# 이벤트 연결 - 다운로드 파일 포함 | |
def update_download_visibility(download_path): | |
"""다운로드 파일이 생성되면 표시합니다.""" | |
if download_path and os.path.exists(download_path): | |
return gr.File(value=download_path, visible=True) | |
else: | |
return gr.File(visible=False) | |
outputs = [ | |
status_output, | |
original_output, | |
separated_output, | |
corrected_output, | |
speaker1_output, | |
speaker2_output, | |
download_file | |
] | |
# 오디오 처리 이벤트 | |
audio_process_btn.click( | |
fn=process_audio_file, | |
inputs=[audio_input, speaker1_name, speaker2_name], | |
outputs=outputs | |
).then( | |
fn=update_download_visibility, | |
inputs=[download_file], | |
outputs=[download_file] | |
) | |
# 텍스트 처리 이벤트 | |
text_process_btn.click( | |
fn=process_text_input, | |
inputs=[text_input, speaker1_name, speaker2_name], | |
outputs=outputs | |
).then( | |
fn=update_download_visibility, | |
inputs=[download_file], | |
outputs=[download_file] | |
) | |
return interface | |
# 메인 실행 | |
if __name__ == "__main__": | |
logger.info("Gradio 앱을 시작합니다...") | |
try: | |
# 인터페이스 생성 | |
app = create_interface() | |
# Hugging Face Spaces 환경 감지 | |
is_spaces = os.getenv('SPACE_ID') is not None | |
if is_spaces: | |
# Hugging Face Spaces용 설정 | |
app.launch() | |
else: | |
# 로컬 개발용 설정 | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False | |
) | |
except Exception as e: | |
logger.error(f"앱 시작 실패: {e}") | |
# 최소한의 fallback | |
try: | |
import gradio as gr | |
with gr.Blocks() as fallback_app: | |
gr.HTML("<h1>🔧 시스템 점검 중</h1>") | |
gr.Markdown("잠시 후 다시 시도해주세요.") | |
gr.Markdown(f"**디버그 정보:** {str(e)[:200]}...") | |
if os.getenv('SPACE_ID') is not None: | |
fallback_app.launch() | |
else: | |
fallback_app.launch(server_name="0.0.0.0", server_port=7860) | |
except Exception as fallback_error: | |
logger.error(f"Fallback 앱도 실패: {fallback_error}") | |
print("🚨 시스템 점검 중입니다. 관리자에게 문의해주세요.") | |