audio_summarizer / stt_processor.py
Jeongsoo1975
feat: 주요 개선사항 적용 - 코드 재사용, 다운로드, 사용자 정의 화자명
30ff654
import os
import google.generativeai as genai
from dotenv import load_dotenv
import logging
import json
from datetime import datetime
import re
import tempfile
import zipfile
# 환경 변수 로드
load_dotenv()
# 로깅 설정
logger = logging.getLogger(__name__)
class TextProcessor:
"""
텍스트를 AI를 통한 화자 분리 및 맞춤법 교정을 수행하는 클래스
"""
def __init__(self, google_api_key=None, config_path="config.json"):
"""
TextProcessor 초기화
Args:
google_api_key (str): Google AI API 키. None인 경우 환경 변수에서 읽음
config_path (str): 설정 파일 경로
"""
# API 키 안전하게 가져오기
if google_api_key:
self.google_api_key = str(google_api_key)
else:
self.google_api_key = os.getenv("GOOGLE_API_KEY")
self.gemini_model = None
self.models_loaded = False
# 설정 파일 로드
self.config = self.load_config(config_path)
# API 키 검증 - 더 안전한 체크
if (self.google_api_key is None or
not isinstance(self.google_api_key, str) or
len(self.google_api_key.strip()) == 0 or
self.google_api_key.strip() == "your_google_api_key_here"):
raise ValueError("Google AI API 키가 설정되지 않았습니다. 환경 변수 GOOGLE_API_KEY를 설정하거나 매개변수로 전달하세요.")
def load_config(self, config_path):
"""설정 파일을 로드합니다."""
try:
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
logger.info(f"설정 파일 로드 완료: {config_path}")
return config
else:
logger.warning(f"설정 파일을 찾을 수 없습니다: {config_path}. 기본 설정을 사용합니다.")
return self.get_default_config()
except Exception as e:
logger.error(f"설정 파일 로드 실패: {e}. 기본 설정을 사용합니다.")
return self.get_default_config()
def get_default_config(self):
"""기본 설정을 반환합니다."""
return {
"models": {
"gemini": {"name": "gemini-2.0-flash", "temperature": 0.3}
},
"processing": {
"chunk_size": 20000,
"enable_chunking": True,
"validate_ai_response": True,
"required_speaker_tags": ["[화자1]", "[화자2]"]
},
"output": {
"save_original": True,
"save_separated": True,
"save_corrected": True,
"save_individual_speakers": True,
"save_json": True,
"create_download_zip": True
}
}
def load_models(self):
"""Gemini AI 모델을 로드합니다."""
try:
logger.info("Gemini 모델 로딩을 시작합니다.")
# 설정에서 모델 이름 가져오기
model_name = self.config.get("models", {}).get("gemini", {}).get("name", "gemini-2.0-flash")
# Gemini 모델 설정
genai.configure(api_key=self.google_api_key)
self.gemini_model = genai.GenerativeModel(model_name)
logger.info(f"{model_name} 모델 설정이 완료되었습니다.")
self.models_loaded = True
logger.info("Gemini 모델 로딩이 완료되었습니다.")
return True
except Exception as e:
error_msg = f"Gemini 모델을 로딩하는 중 오류가 발생했습니다: {e}"
logger.error(error_msg)
raise Exception(error_msg)
def split_text_into_chunks(self, text, chunk_size=None):
"""텍스트를 청크로 분할합니다."""
if chunk_size is None:
chunk_size = self.config.get("processing", {}).get("chunk_size", 20000)
if len(text) <= chunk_size:
return [text]
chunks = []
sentences = re.split(r'[.!?。!?]\s+', text)
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) <= chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
logger.info(f"텍스트를 {len(chunks)}개 청크로 분할했습니다.")
return chunks
def validate_ai_response(self, response_text, expected_tags=None):
"""AI 응답의 유효성을 검증합니다."""
if not self.config.get("processing", {}).get("validate_ai_response", True):
return True, "검증 비활성화됨"
if expected_tags is None:
expected_tags = self.config.get("processing", {}).get("required_speaker_tags", ["[화자1]", "[화자2]"])
# 응답이 비어있는지 확인
if not response_text or not response_text.strip():
return False, "AI 응답이 비어 있습니다."
# 필요한 태그가 포함되어 있는지 확인
found_tags = []
for tag in expected_tags:
if tag in response_text:
found_tags.append(tag)
if not found_tags:
return False, f"화자 태그({', '.join(expected_tags)})가 응답에 포함되지 않았습니다."
if len(found_tags) < 2:
return False, f"최소 2개의 화자 태그가 필요하지만 {len(found_tags)}개만 발견되었습니다."
return True, f"검증 성공: {', '.join(found_tags)} 태그 발견"
def get_prompt(self, prompt_type, **kwargs):
"""설정에서 프롬프트를 가져와 포맷팅합니다."""
prompts = self.config.get("prompts", {})
if prompt_type == "speaker_separation":
template = prompts.get("speaker_separation",
"당신은 2명의 화자가 나누는 대화를 분석하는 전문가입니다. 주어진 텍스트를 화자별로 분리해주세요.\n\n분석할 텍스트:\n{text}")
elif prompt_type == "spell_correction":
template = prompts.get("spell_correction",
"한국어 맞춤법을 교정해주세요. [화자1], [화자2] 태그는 유지하세요.\n\n교정할 텍스트:\n{text}")
else:
raise ValueError(f"알 수 없는 프롬프트 타입: {prompt_type}")
return template.format(**kwargs)
def process_text(self, input_text, text_name=None, progress_callback=None, speaker1_name=None, speaker2_name=None):
"""
텍스트를 처리하여 화자 분리 및 맞춤법 교정을 수행합니다.
Args:
input_text (str): 처리할 텍스트
text_name (str): 텍스트 이름 (선택사항)
progress_callback (function): 진행 상황을 알려주는 콜백 함수
speaker1_name (str): 화자1의 사용자 정의 이름
speaker2_name (str): 화자2의 사용자 정의 이름
Returns:
dict: 처리 결과 딕셔너리
"""
if not self.models_loaded:
self.load_models()
try:
text_name = text_name or f"text_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
logger.info(f"텍스트 처리 시작: {text_name}")
# 입력 텍스트 검증
if not input_text or not input_text.strip():
raise ValueError("처리할 텍스트가 비어 있습니다.")
full_text = input_text.strip()
# 청킹 여부 결정
enable_chunking = self.config.get("processing", {}).get("enable_chunking", True)
chunk_size = self.config.get("processing", {}).get("chunk_size", 20000)
if enable_chunking and len(full_text) > chunk_size:
return self.process_text_with_chunking(full_text, text_name, progress_callback, speaker1_name, speaker2_name)
else:
return self.process_text_single(full_text, text_name, progress_callback, speaker1_name, speaker2_name)
except Exception as e:
logger.error(f"텍스트 {text_name} 처리 중 오류: {e}")
return {
"text_name": text_name or "unknown",
"success": False,
"error": str(e)
}
def process_text_single(self, full_text, text_name, progress_callback, speaker1_name, speaker2_name):
"""단일 텍스트를 처리합니다."""
# 1단계: Gemini로 화자 분리
if progress_callback:
progress_callback("AI 화자 분리 중...", 1, 3)
logger.info(f"AI 화자 분리 시작: {text_name}")
speaker_separated_text = self.separate_speakers_with_gemini(full_text)
# AI 응답 검증
is_valid, validation_msg = self.validate_ai_response(speaker_separated_text)
if not is_valid:
raise ValueError(f"화자 분리 실패: {validation_msg}")
logger.info(f"화자 분리 검증 완료: {validation_msg}")
# 2단계: 맞춤법 교정
if progress_callback:
progress_callback("맞춤법 교정 중...", 2, 3)
logger.info(f"맞춤법 교정 시작: {text_name}")
corrected_text = self.correct_spelling_with_gemini(speaker_separated_text)
# 3단계: 결과 파싱 및 사용자 정의 이름 적용
if progress_callback:
progress_callback("결과 정리 중...", 3, 3)
# 교정된 텍스트에서 화자별 대화 파싱
corrected_conversations = self.parse_separated_text(corrected_text)
original_conversations = self.parse_separated_text(speaker_separated_text)
# 사용자 정의 화자 이름 적용
if speaker1_name or speaker2_name:
corrected_conversations, corrected_text = self.apply_custom_speaker_names(
corrected_conversations, corrected_text, speaker1_name, speaker2_name)
original_conversations, speaker_separated_text = self.apply_custom_speaker_names(
original_conversations, speaker_separated_text, speaker1_name, speaker2_name)
# 결과 딕셔너리 생성
processing_result = {
"text_name": text_name,
"processed_time": datetime.now().isoformat(),
"original_text": full_text,
"separated_text": speaker_separated_text,
"corrected_text": corrected_text,
"conversations_by_speaker_original": original_conversations,
"conversations_by_speaker_corrected": corrected_conversations,
"speaker1_name": speaker1_name or "화자1",
"speaker2_name": speaker2_name or "화자2",
"success": True
}
logger.info(f"텍스트 처리 완료: {text_name}")
return processing_result
def process_text_with_chunking(self, full_text, text_name, progress_callback, speaker1_name, speaker2_name):
"""청킹을 사용하여 대용량 텍스트를 처리합니다."""
logger.info(f"대용량 텍스트 청킹 처리 시작: {text_name}")
chunks = self.split_text_into_chunks(full_text)
total_steps = len(chunks) * 2 # 화자 분리 + 맞춤법 교정
current_step = 0
separated_chunks = []
corrected_chunks = []
# 각 청크 처리
for i, chunk in enumerate(chunks):
# 화자 분리
current_step += 1
if progress_callback:
progress_callback(f"청크 {i+1}/{len(chunks)} 화자 분리 중...", current_step, total_steps)
separated_chunk = self.separate_speakers_with_gemini(chunk)
# AI 응답 검증
is_valid, validation_msg = self.validate_ai_response(separated_chunk)
if not is_valid:
logger.warning(f"청크 {i+1} 화자 분리 검증 실패: {validation_msg}")
# 검증 실패한 청크는 원본을 사용하되 기본 태그 추가
separated_chunk = f"[화자1] {chunk}"
separated_chunks.append(separated_chunk)
# 맞춤법 교정
current_step += 1
if progress_callback:
progress_callback(f"청크 {i+1}/{len(chunks)} 맞춤법 교정 중...", current_step, total_steps)
corrected_chunk = self.correct_spelling_with_gemini(separated_chunk)
corrected_chunks.append(corrected_chunk)
# 청크들을 다시 합치기
speaker_separated_text = "\n\n".join(separated_chunks)
corrected_text = "\n\n".join(corrected_chunks)
# 결과 파싱 및 사용자 정의 이름 적용
corrected_conversations = self.parse_separated_text(corrected_text)
original_conversations = self.parse_separated_text(speaker_separated_text)
if speaker1_name or speaker2_name:
corrected_conversations, corrected_text = self.apply_custom_speaker_names(
corrected_conversations, corrected_text, speaker1_name, speaker2_name)
original_conversations, speaker_separated_text = self.apply_custom_speaker_names(
original_conversations, speaker_separated_text, speaker1_name, speaker2_name)
processing_result = {
"text_name": text_name,
"processed_time": datetime.now().isoformat(),
"original_text": full_text,
"separated_text": speaker_separated_text,
"corrected_text": corrected_text,
"conversations_by_speaker_original": original_conversations,
"conversations_by_speaker_corrected": corrected_conversations,
"speaker1_name": speaker1_name or "화자1",
"speaker2_name": speaker2_name or "화자2",
"chunks_processed": len(chunks),
"success": True
}
logger.info(f"청킹 처리 완료: {text_name} ({len(chunks)}개 청크)")
return processing_result
def apply_custom_speaker_names(self, conversations, text, speaker1_name, speaker2_name):
"""사용자 정의 화자 이름을 적용합니다."""
updated_conversations = {}
updated_text = text
# 대화 딕셔너리 업데이트
if speaker1_name:
updated_conversations[speaker1_name] = conversations.get("화자1", [])
updated_text = updated_text.replace("[화자1]", f"[{speaker1_name}]")
else:
updated_conversations["화자1"] = conversations.get("화자1", [])
if speaker2_name:
updated_conversations[speaker2_name] = conversations.get("화자2", [])
updated_text = updated_text.replace("[화자2]", f"[{speaker2_name}]")
else:
updated_conversations["화자2"] = conversations.get("화자2", [])
return updated_conversations, updated_text
def separate_speakers_with_gemini(self, text):
"""Gemini API를 사용하여 텍스트를 화자별로 분리합니다."""
try:
prompt = self.get_prompt("speaker_separation", text=text)
response = self.gemini_model.generate_content(prompt)
separated_text = response.text.strip()
logger.info("Gemini를 통한 화자 분리가 완료되었습니다.")
return separated_text
except Exception as e:
logger.error(f"Gemini 화자 분리 중 오류: {e}")
return f"[오류] 화자 분리 실패: {str(e)}"
def correct_spelling_with_gemini(self, separated_text):
"""Gemini API를 사용하여 화자별 분리된 텍스트의 맞춤법을 교정합니다."""
try:
prompt = self.get_prompt("spell_correction", text=separated_text)
response = self.gemini_model.generate_content(prompt)
corrected_text = response.text.strip()
logger.info("Gemini를 통한 맞춤법 교정이 완료되었습니다.")
return corrected_text
except Exception as e:
logger.error(f"Gemini 맞춤법 교정 중 오류: {e}")
return separated_text # 오류 발생 시 원본 반환
def parse_separated_text(self, separated_text):
"""화자별로 분리된 텍스트를 파싱하여 구조화합니다."""
conversations = {
"화자1": [],
"화자2": []
}
# 정규표현식으로 화자별 발언 추출
pattern = r'\[화자([12])\]\s*(.+?)(?=\[화자[12]\]|$)'
matches = re.findall(pattern, separated_text, re.DOTALL)
for speaker_num, content in matches:
speaker = f"화자{speaker_num}"
content = content.strip()
if content:
conversations[speaker].append(content)
return conversations
def create_download_zip(self, result, output_dir="output"):
"""처리 결과를 ZIP 파일로 생성합니다."""
try:
if not self.config.get("output", {}).get("create_download_zip", True):
return None
base_name = result["text_name"]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_path = os.path.join(output_dir, f"{base_name}_complete_{timestamp}.zip")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 전체 대화 저장
all_content = self._generate_complete_text_content(result)
zipf.writestr(f"{base_name}_전체대화_{timestamp}.txt", all_content)
# 화자별 개별 파일
for speaker, utterances in result['conversations_by_speaker_corrected'].items():
if utterances:
speaker_content = self._generate_speaker_content(result, speaker, utterances)
zipf.writestr(f"{base_name}_{speaker}_교정본_{timestamp}.txt", speaker_content)
# JSON 데이터
json_content = json.dumps(result, ensure_ascii=False, indent=2)
zipf.writestr(f"{base_name}_data_{timestamp}.json", json_content)
logger.info(f"ZIP 파일 생성 완료: {zip_path}")
return zip_path
except Exception as e:
logger.error(f"ZIP 파일 생성 중 오류: {e}")
return None
def _generate_complete_text_content(self, result):
"""전체 대화 텍스트 내용을 생성합니다."""
content = []
content.append(f"파일명: {result['text_name']}")
content.append(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
content.append(f"화자1: {result.get('speaker1_name', '화자1')}")
content.append(f"화자2: {result.get('speaker2_name', '화자2')}")
content.append("="*50)
content.append("원본 텍스트:")
content.append(result['original_text'])
content.append("="*50)
content.append("화자별 분리 결과 (원본):")
content.append(result['separated_text'])
content.append("="*50)
content.append("화자별 분리 결과 (맞춤법 교정):")
content.append(result['corrected_text'])
return "\n".join(content)
def _generate_speaker_content(self, result, speaker, utterances):
"""화자별 개별 파일 내용을 생성합니다."""
content = []
content.append(f"파일명: {result['text_name']}")
content.append(f"화자: {speaker}")
content.append(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
content.append(f"발언 수: {len(utterances)}")
content.append("="*50)
for idx, utterance in enumerate(utterances, 1):
content.append(f"{idx}. {utterance}")
content.append("")
return "\n".join(content)
def save_results_to_files(self, result, output_dir="output"):
"""처리 결과를 파일로 저장합니다."""
if not result.get("success", False):
logger.error(f"결과 저장 실패: {result.get('error', 'Unknown error')}")
return False
try:
# output 폴더 생성
if not os.path.exists(output_dir):
os.makedirs(output_dir)
base_name = result["text_name"]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
saved_files = []
output_config = self.config.get("output", {})
# 1. 전체 대화 저장
if output_config.get("save_original", True) or output_config.get("save_separated", True) or output_config.get("save_corrected", True):
all_txt_path = f"{output_dir}/{base_name}_전체대화_{timestamp}.txt"
with open(all_txt_path, 'w', encoding='utf-8') as f:
f.write(self._generate_complete_text_content(result))
saved_files.append(all_txt_path)
# 2. 화자별 개별 파일 저장
if output_config.get("save_individual_speakers", True):
for speaker, utterances in result['conversations_by_speaker_corrected'].items():
if utterances:
speaker_txt_path = f"{output_dir}/{base_name}_{speaker}_교정본_{timestamp}.txt"
with open(speaker_txt_path, 'w', encoding='utf-8') as f:
f.write(self._generate_speaker_content(result, speaker, utterances))
saved_files.append(speaker_txt_path)
# 3. JSON 형태로도 저장
if output_config.get("save_json", True):
json_path = f"{output_dir}/{base_name}_data_{timestamp}.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
saved_files.append(json_path)
# 4. ZIP 파일 생성
zip_path = self.create_download_zip(result, output_dir)
if zip_path:
saved_files.append(zip_path)
logger.info(f"결과 파일 저장 완료: {len(saved_files)}개 파일")
result["saved_files"] = saved_files
return True
except Exception as e:
logger.error(f"결과 파일 저장 중 오류: {e}")
return False