Jeongsoo1975
feat: 주요 개선사항 적용 - 코드 재사용, 다운로드, 사용자 정의 화자명
30ff654
import gradio as gr
import os
import logging
from datetime import datetime
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 전역 변수
text_processor = None
whisper_model = None
def safe_import_whisper():
"""Whisper를 안전하게 import합니다."""
try:
import whisper
return whisper, None
except Exception as e:
logger.error(f"Whisper import 실패: {e}")
return None, str(e)
def safe_import_processor():
"""TextProcessor를 안전하게 import합니다."""
try:
from stt_processor import TextProcessor
return TextProcessor, None
except Exception as e:
logger.error(f"TextProcessor import 실패: {e}")
return None, str(e)
def initialize_models():
"""모델들을 초기화합니다."""
global text_processor, whisper_model
try:
# 환경 변수 또는 Hugging Face Secrets에서 API 키 읽기
google_api_key = os.getenv("GOOGLE_API_KEY")
# API 키 안전하게 검증
if not google_api_key or not isinstance(google_api_key, str) or len(google_api_key.strip()) == 0:
return False, "❌ Google API 키가 설정되지 않았습니다. Hugging Face Spaces의 Settings에서 GOOGLE_API_KEY를 설정해주세요."
# Whisper 모델 로드 시도
whisper_lib, whisper_error = safe_import_whisper()
if whisper_lib is None:
return False, f"❌ Whisper 라이브러리 로딩 실패: {whisper_error}"
logger.info("Whisper 모델을 로딩합니다...")
whisper_model = whisper_lib.load_model("base")
logger.info("Whisper 모델 로딩 완료")
# 텍스트 프로세서 초기화
TextProcessor, processor_error = safe_import_processor()
if TextProcessor is None:
return False, f"❌ TextProcessor 로딩 실패: {processor_error}"
text_processor = TextProcessor(google_api_key.strip())
return True, "✅ 모든 모델이 초기화되었습니다."
except Exception as e:
logger.error(f"모델 초기화 실패: {e}")
return False, f"❌ 초기화 실패: {str(e)}"
def process_audio_file(audio_file, speaker1_name, speaker2_name, progress=gr.Progress()):
"""업로드된 오디오 파일을 처리합니다."""
global text_processor, whisper_model
if audio_file is None:
return "❌ 오디오 파일을 업로드해주세요.", "", "", "", "", "", None
try:
# 모델 초기화 (필요한 경우)
if whisper_model is None or text_processor is None:
progress(0.05, desc="모델 초기화 중...")
success, message = initialize_models()
if not success:
return message, "", "", "", "", "", None
# 오디오 파일 경로 확인
audio_path = audio_file.name if hasattr(audio_file, 'name') else str(audio_file)
logger.info(f"오디오 파일 처리 시작: {audio_path}")
# 1단계: Whisper로 음성 인식
progress(0.1, desc="음성을 텍스트로 변환 중...")
logger.info("Whisper를 통한 음성 인식 시작")
result = whisper_model.transcribe(audio_path)
full_text = result['text'].strip()
if not full_text:
return "❌ 오디오에서 텍스트를 추출할 수 없습니다.", "", "", "", "", "", None
language = result.get('language', 'unknown')
logger.info(f"음성 인식 완료. 언어: {language}, 텍스트 길이: {len(full_text)}")
# 2단계: AI 모델 로딩
progress(0.3, desc="AI 모델 로딩 중...")
if not text_processor.models_loaded:
text_processor.load_models()
# 진행 상황 콜백 함수
def progress_callback(status, current, total):
progress_value = 0.3 + (current / total) * 0.6 # 0.3~0.9 범위
progress(progress_value, desc=f"{status} ({current}/{total})")
# 3단계: 텍스트 처리 (화자 분리 + 맞춤법 교정)
progress(0.4, desc="AI 화자 분리 및 맞춤법 교정 중...")
# 사용자 정의 화자 이름 적용
custom_speaker1 = speaker1_name.strip() if speaker1_name and speaker1_name.strip() else None
custom_speaker2 = speaker2_name.strip() if speaker2_name and speaker2_name.strip() else None
text_result = text_processor.process_text(
full_text,
progress_callback=progress_callback,
speaker1_name=custom_speaker1,
speaker2_name=custom_speaker2
)
if not text_result.get("success", False):
return f"❌ 텍스트 처리 실패: {text_result.get('error', 'Unknown error')}", full_text, "", "", "", "", None
# 결과 추출
progress(0.95, desc="결과 정리 중...")
original_text = text_result["original_text"]
separated_text = text_result["separated_text"]
corrected_text = text_result["corrected_text"]
# 화자별 대화 추출
conversations = text_result["conversations_by_speaker_corrected"]
speaker1_key = custom_speaker1 or "화자1"
speaker2_key = custom_speaker2 or "화자2"
speaker1_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker1_key, []))])
speaker2_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker2_key, []))])
# 다운로드 파일 생성
download_file = None
try:
text_processor.save_results_to_files(text_result)
zip_path = text_processor.create_download_zip(text_result)
if zip_path and os.path.exists(zip_path):
download_file = zip_path
except Exception as e:
logger.warning(f"다운로드 파일 생성 실패: {e}")
progress(1.0, desc="처리 완료!")
status_message = f"""
✅ **오디오 처리 완료!**
- 파일명: {os.path.basename(audio_path)}
- 처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 감지된 언어: {language}
- 텍스트 길이: {len(full_text)}
- {speaker1_key} 발언 수: {len(conversations.get(speaker1_key, []))}
- {speaker2_key} 발언 수: {len(conversations.get(speaker2_key, []))}
"""
return status_message, original_text, separated_text, corrected_text, speaker1_text, speaker2_text, download_file
except Exception as e:
logger.error(f"오디오 파일 처리 중 오류: {e}")
return f"❌ 처리 중 오류가 발생했습니다: {str(e)}", "", "", "", "", "", None
def process_text_input(input_text, speaker1_name, speaker2_name, progress=gr.Progress()):
"""입력된 텍스트를 처리합니다."""
global text_processor
if not input_text or not input_text.strip():
return "❌ 처리할 텍스트를 입력해주세요.", "", "", "", "", "", None
try:
# 텍스트 프로세서만 초기화
if text_processor is None:
progress(0.1, desc="텍스트 프로세서 초기화 중...")
google_api_key = os.getenv("GOOGLE_API_KEY")
if not google_api_key or not isinstance(google_api_key, str) or len(google_api_key.strip()) == 0:
return "❌ Google API 키가 설정되지 않았습니다.", "", "", "", "", "", None
TextProcessor, processor_error = safe_import_processor()
if TextProcessor is None:
return f"❌ TextProcessor 로딩 실패: {processor_error}", "", "", "", "", "", None
text_processor = TextProcessor(google_api_key.strip())
# 모델 로딩
progress(0.2, desc="AI 모델 로딩 중...")
if not text_processor.models_loaded:
text_processor.load_models()
# 진행 상황 콜백 함수
def progress_callback(status, current, total):
progress_value = 0.2 + (current / total) * 0.7 # 0.2~0.9 범위
progress(progress_value, desc=f"{status} ({current}/{total})")
# 텍스트 처리
progress(0.3, desc="텍스트 처리 시작...")
# 사용자 정의 화자 이름 적용
custom_speaker1 = speaker1_name.strip() if speaker1_name and speaker1_name.strip() else None
custom_speaker2 = speaker2_name.strip() if speaker2_name and speaker2_name.strip() else None
result = text_processor.process_text(
input_text,
progress_callback=progress_callback,
speaker1_name=custom_speaker1,
speaker2_name=custom_speaker2
)
if not result.get("success", False):
return f"❌ 처리 실패: {result.get('error', 'Unknown error')}", "", "", "", "", "", None
# 결과 추출
progress(0.95, desc="결과 정리 중...")
original_text = result["original_text"]
separated_text = result["separated_text"]
corrected_text = result["corrected_text"]
# 화자별 대화 추출
conversations = result["conversations_by_speaker_corrected"]
speaker1_key = custom_speaker1 or "화자1"
speaker2_key = custom_speaker2 or "화자2"
speaker1_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker1_key, []))])
speaker2_text = "\n\n".join([f"{i+1}. {utterance}" for i, utterance in enumerate(conversations.get(speaker2_key, []))])
# 다운로드 파일 생성
download_file = None
try:
text_processor.save_results_to_files(result)
zip_path = text_processor.create_download_zip(result)
if zip_path and os.path.exists(zip_path):
download_file = zip_path
except Exception as e:
logger.warning(f"다운로드 파일 생성 실패: {e}")
progress(1.0, desc="처리 완료!")
status_message = f"""
✅ **텍스트 처리 완료!**
- 텍스트명: {result['text_name']}
- 처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- {speaker1_key} 발언 수: {len(conversations.get(speaker1_key, []))}
- {speaker2_key} 발언 수: {len(conversations.get(speaker2_key, []))}
"""
return status_message, original_text, separated_text, corrected_text, speaker1_text, speaker2_text, download_file
except Exception as e:
logger.error(f"텍스트 처리 중 오류: {e}")
return f"❌ 처리 중 오류가 발생했습니다: {str(e)}", "", "", "", "", "", None
def create_interface():
"""Gradio 인터페이스를 생성합니다."""
# CSS 스타일링
css = """
.gradio-container {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.status-box {
padding: 15px;
border-radius: 8px;
margin: 10px 0;
}
.main-header {
text-align: center;
color: #2c3e50;
margin-bottom: 20px;
}
.speaker-config {
background-color: #f8f9fa;
padding: 15px;
border-radius: 8px;
margin: 10px 0;
}
"""
with gr.Blocks(css=css, title="2인 대화 STT 처리기") as interface:
# 헤더
gr.HTML("""
<div class="main-header">
<h1>🎤 2인 대화 화자 분리기 (AI)</h1>
<p>Whisper + Gemini 2.0 Flash AI를 사용한 음성 인식, 화자 분리 및 맞춤법 교정</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
# 화자 이름 설정
gr.HTML('<div class="speaker-config">')
gr.Markdown("### 👥 화자 이름 설정 (선택사항)")
with gr.Row():
speaker1_name = gr.Textbox(
label="화자1 이름",
placeholder="예: 김팀장, 홍길동 등 (비워두면 '화자1')",
value="",
scale=1
)
speaker2_name = gr.Textbox(
label="화자2 이름",
placeholder="예: 이대리, 김영희 등 (비워두면 '화자2')",
value="",
scale=1
)
gr.HTML('</div>')
# 입력 섹션
with gr.Tabs():
with gr.TabItem("🎤 오디오 업로드"):
gr.Markdown("### 🎤 오디오 파일 업로드")
audio_input = gr.Audio(
label="2인 대화 오디오 파일을 업로드하세요",
type="filepath"
)
audio_process_btn = gr.Button(
"🚀 오디오 처리 시작",
variant="primary",
size="lg"
)
with gr.TabItem("📝 텍스트 입력"):
gr.Markdown("### 📝 텍스트 직접 입력")
text_input = gr.Textbox(
label="2인 대화 텍스트를 입력하세요",
placeholder="두 명이 나누는 대화 내용을 여기에 붙여넣기하세요...\n\n예시:\n안녕하세요, 오늘 회의에 참석해주셔서 감사합니다. 네, 안녕하세요. 준비된 자료가 있나요? 네, 프레젠테이션 자료를 준비했습니다.",
lines=8,
max_lines=15
)
text_process_btn = gr.Button(
"🚀 텍스트 처리 시작",
variant="primary",
size="lg"
)
# 상태 표시
status_output = gr.Markdown(
"### 📊 처리 상태\n준비 완료. 화자 이름을 설정하고 오디오 파일을 업로드하거나 텍스트를 입력한 후 처리 버튼을 클릭하세요.",
elem_classes=["status-box"]
)
# 다운로드 섹션
gr.Markdown("### 📥 결과 다운로드")
download_file = gr.File(
label="처리 결과 ZIP 파일",
visible=False
)
with gr.Column(scale=2):
# 결과 표시 섹션
with gr.Tabs():
with gr.TabItem("📝 원본 텍스트"):
original_output = gr.Textbox(
label="추출/입력된 원본 텍스트",
lines=10,
max_lines=20,
placeholder="처리 후 원본 텍스트가 여기에 표시됩니다..."
)
with gr.TabItem("👥 화자 분리 (원본)"):
separated_output = gr.Textbox(
label="AI 화자 분리 결과 (원본)",
lines=10,
max_lines=20,
placeholder="처리 후 화자별로 분리된 대화가 여기에 표시됩니다..."
)
with gr.TabItem("✏️ 화자 분리 (교정)"):
corrected_output = gr.Textbox(
label="AI 화자 분리 결과 (맞춤법 교정)",
lines=10,
max_lines=20,
placeholder="처리 후 맞춤법이 교정된 화자 분리 결과가 여기에 표시됩니다..."
)
with gr.TabItem("👤 화자1 대화"):
speaker1_output = gr.Textbox(
label="화자1의 모든 발언",
lines=10,
max_lines=20,
placeholder="처리 후 화자1의 발언들이 여기에 표시됩니다..."
)
with gr.TabItem("👤 화자2 대화"):
speaker2_output = gr.Textbox(
label="화자2의 모든 발언",
lines=10,
max_lines=20,
placeholder="처리 후 화자2의 발언들이 여기에 표시됩니다..."
)
# 사용법 안내
gr.Markdown("""
### 📖 사용법
**🎤 오디오 파일 처리:**
1. **화자 이름 설정**: 원하는 화자 이름을 입력하세요 (예: 김팀장, 이대리)
2. **오디오 업로드**: WAV, MP3, MP4 등의 오디오 파일을 업로드하세요
3. **처리 시작**: '🚀 오디오 처리 시작' 버튼을 클릭하세요
4. **결과 확인**: 음성 인식 → 화자 분리 → 맞춤법 교정 순으로 처리됩니다
5. **다운로드**: 처리 완료 후 ZIP 파일로 모든 결과를 다운로드할 수 있습니다
**📝 텍스트 직접 입력:**
1. **화자 이름 설정**: 원하는 화자 이름을 입력하세요
2. **텍스트 입력**: 2인 대화 텍스트를 입력란에 붙여넣기하세요
3. **처리 시작**: '🚀 텍스트 처리 시작' 버튼을 클릭하세요
4. **결과 확인**: 각 탭에서 화자 분리 결과를 확인하세요
### ⚙️ 기술 정보
- **음성 인식**: OpenAI Whisper (다국어 지원)
- **화자 분리**: Google Gemini 2.0 Flash + AI 응답 검증
- **맞춤법 교정**: 고급 AI 기반 한국어 교정
- **청킹 처리**: 대용량 텍스트 자동 분할 처리
- **지원 형식**: WAV, MP3, MP4, M4A 등
- **최적 환경**: 2인 대화, 명확한 음질
### 🆕 새로운 기능
- **사용자 정의 화자 이름**: '화자1', '화자2' 대신 실제 이름 사용
- **다운로드 기능**: 전체 결과를 ZIP 파일로 다운로드
- **AI 응답 검증**: 화자 분리 실패 시 자동 감지 및 오류 처리
- **대용량 파일 지원**: 긴 오디오도 청킹으로 안정적 처리
### ⚠️ 주의사항
- 처리 시간은 오디오 길이에 따라 달라집니다 (보통 1-5분)
- Google AI API 사용량 제한이 있을 수 있습니다
- 2인 대화에 최적화되어 있습니다
- 음질이 좋을수록 더 정확한 결과를 얻을 수 있습니다
- 배경 소음이 적고 화자 구분이 명확한 오디오를 권장합니다
""")
# 이벤트 연결 - 다운로드 파일 포함
def update_download_visibility(download_path):
"""다운로드 파일이 생성되면 표시합니다."""
if download_path and os.path.exists(download_path):
return gr.File(value=download_path, visible=True)
else:
return gr.File(visible=False)
outputs = [
status_output,
original_output,
separated_output,
corrected_output,
speaker1_output,
speaker2_output,
download_file
]
# 오디오 처리 이벤트
audio_process_btn.click(
fn=process_audio_file,
inputs=[audio_input, speaker1_name, speaker2_name],
outputs=outputs
).then(
fn=update_download_visibility,
inputs=[download_file],
outputs=[download_file]
)
# 텍스트 처리 이벤트
text_process_btn.click(
fn=process_text_input,
inputs=[text_input, speaker1_name, speaker2_name],
outputs=outputs
).then(
fn=update_download_visibility,
inputs=[download_file],
outputs=[download_file]
)
return interface
# 메인 실행
if __name__ == "__main__":
logger.info("Gradio 앱을 시작합니다...")
try:
# 인터페이스 생성
app = create_interface()
# Hugging Face Spaces 환경 감지
is_spaces = os.getenv('SPACE_ID') is not None
if is_spaces:
# Hugging Face Spaces용 설정
app.launch()
else:
# 로컬 개발용 설정
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)
except Exception as e:
logger.error(f"앱 시작 실패: {e}")
# 최소한의 fallback
try:
import gradio as gr
with gr.Blocks() as fallback_app:
gr.HTML("<h1>🔧 시스템 점검 중</h1>")
gr.Markdown("잠시 후 다시 시도해주세요.")
gr.Markdown(f"**디버그 정보:** {str(e)[:200]}...")
if os.getenv('SPACE_ID') is not None:
fallback_app.launch()
else:
fallback_app.launch(server_name="0.0.0.0", server_port=7860)
except Exception as fallback_error:
logger.error(f"Fallback 앱도 실패: {fallback_error}")
print("🚨 시스템 점검 중입니다. 관리자에게 문의해주세요.")