Spaces:
Runtime error
Runtime error
Jeongsoo1975
Initial commit: Gradio text-based speaker separation app for Hugging Face Spaces
ae9ec05
import os | |
import google.generativeai as genai | |
from dotenv import load_dotenv | |
import logging | |
import json | |
from datetime import datetime | |
import re | |
# 환경 변수 로드 | |
load_dotenv() | |
# 로깅 설정 | |
logger = logging.getLogger(__name__) | |
class TextProcessor: | |
""" | |
텍스트를 AI를 통한 화자 분리 및 맞춤법 교정을 수행하는 클래스 | |
""" | |
def __init__(self, google_api_key=None): | |
""" | |
TextProcessor 초기화 | |
Args: | |
google_api_key (str): Google AI API 키. None인 경우 환경 변수에서 읽음 | |
""" | |
self.google_api_key = google_api_key or os.getenv("GOOGLE_API_KEY") | |
self.gemini_model = None | |
self.models_loaded = False | |
if not self.google_api_key or self.google_api_key == "your_google_api_key_here": | |
raise ValueError("Google AI API 키가 설정되지 않았습니다. 환경 변수 GOOGLE_API_KEY를 설정하거나 매개변수로 전달하세요.") | |
def load_models(self): | |
"""Gemini AI 모델을 로드합니다.""" | |
try: | |
logger.info("Gemini 모델 로딩을 시작합니다.") | |
# Gemini 모델 설정 | |
genai.configure(api_key=self.google_api_key) | |
self.gemini_model = genai.GenerativeModel('gemini-2.0-flash') | |
logger.info("Gemini 2.0 Flash 모델 설정이 완료되었습니다.") | |
self.models_loaded = True | |
logger.info("Gemini 모델 로딩이 완료되었습니다.") | |
return True | |
except Exception as e: | |
error_msg = f"Gemini 모델을 로딩하는 중 오류가 발생했습니다: {e}" | |
logger.error(error_msg) | |
raise Exception(error_msg) | |
def process_text(self, input_text, text_name=None, progress_callback=None): | |
""" | |
텍스트를 처리하여 화자 분리 및 맞춤법 교정을 수행합니다. | |
Args: | |
input_text (str): 처리할 텍스트 | |
text_name (str): 텍스트 이름 (선택사항) | |
progress_callback (function): 진행 상황을 알려주는 콜백 함수 | |
Returns: | |
dict: 처리 결과 딕셔너리 | |
""" | |
if not self.models_loaded: | |
self.load_models() | |
try: | |
text_name = text_name or f"text_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
logger.info(f"텍스트 처리 시작: {text_name}") | |
# 입력 텍스트 검증 | |
if not input_text or not input_text.strip(): | |
raise ValueError("처리할 텍스트가 비어 있습니다.") | |
full_text = input_text.strip() | |
# 1단계: Gemini로 화자 분리 | |
if progress_callback: | |
progress_callback("AI 화자 분리 중...", 1, 3) | |
logger.info(f"AI 화자 분리 시작: {text_name}") | |
speaker_separated_text = self.separate_speakers_with_gemini(full_text) | |
# 2단계: 맞춤법 교정 | |
if progress_callback: | |
progress_callback("맞춤법 교정 중...", 2, 3) | |
logger.info(f"맞춤법 교정 시작: {text_name}") | |
corrected_text = self.correct_spelling_with_gemini(speaker_separated_text) | |
# 3단계: 결과 파싱 | |
if progress_callback: | |
progress_callback("결과 정리 중...", 3, 3) | |
# 교정된 텍스트에서 화자별 대화 파싱 | |
corrected_conversations = self.parse_separated_text(corrected_text) | |
original_conversations = self.parse_separated_text(speaker_separated_text) | |
# 결과 딕셔너리 생성 | |
processing_result = { | |
"text_name": text_name, | |
"processed_time": datetime.now().isoformat(), | |
"original_text": full_text, | |
"separated_text": speaker_separated_text, | |
"corrected_text": corrected_text, | |
"conversations_by_speaker_original": original_conversations, | |
"conversations_by_speaker_corrected": corrected_conversations, | |
"success": True | |
} | |
logger.info(f"텍스트 처리 완료: {text_name}") | |
return processing_result | |
except Exception as e: | |
logger.error(f"텍스트 {text_name} 처리 중 오류: {e}") | |
return { | |
"text_name": text_name or "unknown", | |
"success": False, | |
"error": str(e) | |
} | |
def separate_speakers_with_gemini(self, text): | |
"""Gemini API를 사용하여 텍스트를 화자별로 분리합니다.""" | |
try: | |
prompt = f""" | |
당신은 2명의 화자가 나누는 대화를 분석하는 전문가입니다. | |
주어진 텍스트를 분석하여 각 발언을 화자별로 구분해주세요. | |
분석 지침: | |
1. 대화의 맥락과 내용을 기반으로 화자를 구분하세요 | |
2. 말투, 주제 전환, 질문과 답변의 패턴을 활용하세요 | |
3. 화자1과 화자2로 구분하여 표시하세요 | |
4. 각 발언 앞에 [화자1] 또는 [화자2]를 붙여주세요 | |
5. 발언이 너무 길 경우 자연스러운 지점에서 나누어주세요 | |
출력 형식: | |
[화자1] 첫 번째 발언 내용 | |
[화자2] 두 번째 발언 내용 | |
[화자1] 세 번째 발언 내용 | |
... | |
분석할 텍스트: | |
{text} | |
""" | |
response = self.gemini_model.generate_content(prompt) | |
separated_text = response.text.strip() | |
logger.info("Gemini를 통한 화자 분리가 완료되었습니다.") | |
return separated_text | |
except Exception as e: | |
logger.error(f"Gemini 화자 분리 중 오류: {e}") | |
return f"[오류] 화자 분리 실패: {str(e)}" | |
def correct_spelling_with_gemini(self, separated_text): | |
"""Gemini API를 사용하여 화자별 분리된 텍스트의 맞춤법을 교정합니다.""" | |
try: | |
prompt = f""" | |
당신은 한국어 맞춤법 교정 전문가입니다. | |
주어진 텍스트에서 맞춤법 오류, 띄어쓰기 오류, 오타를 수정해주세요. | |
교정 지침: | |
1. 자연스러운 한국어 표현으로 수정하되, 원본의 의미와 말투는 유지하세요 | |
2. [화자1], [화자2] 태그는 그대로 유지하세요 | |
3. 전문 용어나 고유명사는 가능한 정확하게 수정하세요 | |
4. 구어체 특성은 유지하되, 명백한 오타만 수정하세요 | |
5. 문맥에 맞는 올바른 단어로 교체하세요 | |
수정이 필요한 예시: | |
- "치특기" → "치트키" | |
- "실점픽" → "실전 픽" | |
- "복사부천억" → "복사 붙여넣기" | |
- "핵심같이가" → "핵심 가치가" | |
- "재활" → "재활용" | |
- "저정할" → "저장할" | |
- "플레일" → "플레어" | |
- "서벌 수" → "서버리스" | |
- "커리" → "쿼리" | |
- "전력" → "전략" | |
- "클라클라" → "클라크" | |
- "가인만" → "가입만" | |
- "M5U" → "MAU" | |
- "나온 로도" → "다운로드" | |
- "무시무치" → "무시무시" | |
- "송신유금" → "송신 요금" | |
- "10지가" → "10GB" | |
- "유금" → "요금" | |
- "전 색을" → "전 세계" | |
- "도무원은" → "도구들은" | |
- "골차품데" → "골치 아픈데" | |
- "변원해" → "변환해" | |
- "f 운영" → "서비스 운영" | |
- "오류추저개" → "오류 추적기" | |
- "f 늘려질" → "서비스가 늘어날" | |
- "캐시칭" → "캐싱" | |
- "플레이어" → "플레어" | |
- "업스테시" → "업스태시" | |
- "원시근을" → "웬지슨" | |
- "부각이릉도" → "부각들도" | |
- "컴포넌트" → "컴포넌트" | |
- "본이터링" → "모니터링" | |
- "번뜨기는" → "번뜩이는" | |
- "사용적 경험" → "사용자 경험" | |
교정할 텍스트: | |
{separated_text} | |
""" | |
response = self.gemini_model.generate_content(prompt) | |
corrected_text = response.text.strip() | |
logger.info("Gemini를 통한 맞춤법 교정이 완료되었습니다.") | |
return corrected_text | |
except Exception as e: | |
logger.error(f"Gemini 맞춤법 교정 중 오류: {e}") | |
return separated_text # 오류 발생 시 원본 반환 | |
def parse_separated_text(self, separated_text): | |
"""화자별로 분리된 텍스트를 파싱하여 구조화합니다.""" | |
conversations = { | |
"화자1": [], | |
"화자2": [] | |
} | |
# 정규표현식으로 화자별 발언 추출 | |
pattern = r'\[화자([12])\]\s*(.+?)(?=\[화자[12]\]|$)' | |
matches = re.findall(pattern, separated_text, re.DOTALL) | |
for speaker_num, content in matches: | |
speaker = f"화자{speaker_num}" | |
content = content.strip() | |
if content: | |
conversations[speaker].append(content) | |
return conversations | |
def save_results_to_files(self, result, output_dir="output"): | |
"""처리 결과를 파일로 저장합니다.""" | |
if not result.get("success", False): | |
logger.error(f"결과 저장 실패: {result.get('error', 'Unknown error')}") | |
return False | |
try: | |
# output 폴더 생성 | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
base_name = result["base_name"] | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# 1. 전체 대화 저장 (원본, 화자 분리, 맞춤법 교정 포함) | |
all_txt_path = f"{output_dir}/{base_name}_전체대화_{timestamp}.txt" | |
with open(all_txt_path, 'w', encoding='utf-8') as f: | |
f.write(f"파일명: {base_name}\n") | |
f.write(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
f.write(f"언어: {result['language']}\n") | |
f.write("="*50 + "\n\n") | |
f.write("원본 텍스트:\n") | |
f.write(result['original_text'] + "\n\n") | |
f.write("="*50 + "\n\n") | |
f.write("화자별 분리 결과 (원본):\n") | |
f.write(result['separated_text'] + "\n\n") | |
f.write("="*50 + "\n\n") | |
f.write("화자별 분리 결과 (맞춤법 교정):\n") | |
f.write(result['corrected_text'] + "\n") | |
# 2. 교정된 화자별 개별 파일 저장 | |
for speaker, utterances in result['conversations_by_speaker_corrected'].items(): | |
if utterances: | |
speaker_txt_path = f"{output_dir}/{base_name}_{speaker}_교정본_{timestamp}.txt" | |
with open(speaker_txt_path, 'w', encoding='utf-8') as f: | |
f.write(f"파일명: {base_name}\n") | |
f.write(f"화자: {speaker}\n") | |
f.write(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
f.write(f"발언 수: {len(utterances)}\n") | |
f.write("="*50 + "\n\n") | |
for idx, utterance in enumerate(utterances, 1): | |
f.write(f"{idx}. {utterance}\n\n") | |
# 3. JSON 형태로도 저장 (분석용) | |
json_path = f"{output_dir}/{base_name}_data_{timestamp}.json" | |
with open(json_path, 'w', encoding='utf-8') as f: | |
json.dump(result, f, ensure_ascii=False, indent=2) | |
logger.info(f"결과 파일 저장 완료: {output_dir}") | |
return True | |
except Exception as e: | |
logger.error(f"결과 파일 저장 중 오류: {e}") | |
return False |