Spaces:
Runtime error
Runtime error
import os | |
import google.generativeai as genai | |
from dotenv import load_dotenv | |
import logging | |
import json | |
from datetime import datetime | |
import re | |
import tempfile | |
import zipfile | |
# 환경 변수 로드 | |
load_dotenv() | |
# 로깅 설정 | |
logger = logging.getLogger(__name__) | |
class TextProcessor: | |
""" | |
텍스트를 AI를 통한 화자 분리 및 맞춤법 교정을 수행하는 클래스 | |
""" | |
def __init__(self, google_api_key=None, config_path="config.json"): | |
""" | |
TextProcessor 초기화 | |
Args: | |
google_api_key (str): Google AI API 키. None인 경우 환경 변수에서 읽음 | |
config_path (str): 설정 파일 경로 | |
""" | |
# API 키 안전하게 가져오기 | |
if google_api_key: | |
self.google_api_key = str(google_api_key) | |
else: | |
self.google_api_key = os.getenv("GOOGLE_API_KEY") | |
self.gemini_model = None | |
self.models_loaded = False | |
# 설정 파일 로드 | |
self.config = self.load_config(config_path) | |
# API 키 검증 - 더 안전한 체크 | |
if (self.google_api_key is None or | |
not isinstance(self.google_api_key, str) or | |
len(self.google_api_key.strip()) == 0 or | |
self.google_api_key.strip() == "your_google_api_key_here"): | |
raise ValueError("Google AI API 키가 설정되지 않았습니다. 환경 변수 GOOGLE_API_KEY를 설정하거나 매개변수로 전달하세요.") | |
def load_config(self, config_path): | |
"""설정 파일을 로드합니다.""" | |
try: | |
if os.path.exists(config_path): | |
with open(config_path, 'r', encoding='utf-8') as f: | |
config = json.load(f) | |
logger.info(f"설정 파일 로드 완료: {config_path}") | |
return config | |
else: | |
logger.warning(f"설정 파일을 찾을 수 없습니다: {config_path}. 기본 설정을 사용합니다.") | |
return self.get_default_config() | |
except Exception as e: | |
logger.error(f"설정 파일 로드 실패: {e}. 기본 설정을 사용합니다.") | |
return self.get_default_config() | |
def get_default_config(self): | |
"""기본 설정을 반환합니다.""" | |
return { | |
"models": { | |
"gemini": {"name": "gemini-2.0-flash", "temperature": 0.3} | |
}, | |
"processing": { | |
"chunk_size": 20000, | |
"enable_chunking": True, | |
"validate_ai_response": True, | |
"required_speaker_tags": ["[화자1]", "[화자2]"] | |
}, | |
"output": { | |
"save_original": True, | |
"save_separated": True, | |
"save_corrected": True, | |
"save_individual_speakers": True, | |
"save_json": True, | |
"create_download_zip": True | |
} | |
} | |
def load_models(self): | |
"""Gemini AI 모델을 로드합니다.""" | |
try: | |
logger.info("Gemini 모델 로딩을 시작합니다.") | |
# 설정에서 모델 이름 가져오기 | |
model_name = self.config.get("models", {}).get("gemini", {}).get("name", "gemini-2.0-flash") | |
# Gemini 모델 설정 | |
genai.configure(api_key=self.google_api_key) | |
self.gemini_model = genai.GenerativeModel(model_name) | |
logger.info(f"{model_name} 모델 설정이 완료되었습니다.") | |
self.models_loaded = True | |
logger.info("Gemini 모델 로딩이 완료되었습니다.") | |
return True | |
except Exception as e: | |
error_msg = f"Gemini 모델을 로딩하는 중 오류가 발생했습니다: {e}" | |
logger.error(error_msg) | |
raise Exception(error_msg) | |
def split_text_into_chunks(self, text, chunk_size=None): | |
"""텍스트를 청크로 분할합니다.""" | |
if chunk_size is None: | |
chunk_size = self.config.get("processing", {}).get("chunk_size", 20000) | |
if len(text) <= chunk_size: | |
return [text] | |
chunks = [] | |
sentences = re.split(r'[.!?。!?]\s+', text) | |
current_chunk = "" | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence) <= chunk_size: | |
current_chunk += sentence + ". " | |
else: | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
current_chunk = sentence + ". " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
logger.info(f"텍스트를 {len(chunks)}개 청크로 분할했습니다.") | |
return chunks | |
def validate_ai_response(self, response_text, expected_tags=None): | |
"""AI 응답의 유효성을 검증합니다.""" | |
if not self.config.get("processing", {}).get("validate_ai_response", True): | |
return True, "검증 비활성화됨" | |
if expected_tags is None: | |
expected_tags = self.config.get("processing", {}).get("required_speaker_tags", ["[화자1]", "[화자2]"]) | |
# 응답이 비어있는지 확인 | |
if not response_text or not response_text.strip(): | |
return False, "AI 응답이 비어 있습니다." | |
# 필요한 태그가 포함되어 있는지 확인 | |
found_tags = [] | |
for tag in expected_tags: | |
if tag in response_text: | |
found_tags.append(tag) | |
if not found_tags: | |
return False, f"화자 태그({', '.join(expected_tags)})가 응답에 포함되지 않았습니다." | |
if len(found_tags) < 2: | |
return False, f"최소 2개의 화자 태그가 필요하지만 {len(found_tags)}개만 발견되었습니다." | |
return True, f"검증 성공: {', '.join(found_tags)} 태그 발견" | |
def get_prompt(self, prompt_type, **kwargs): | |
"""설정에서 프롬프트를 가져와 포맷팅합니다.""" | |
prompts = self.config.get("prompts", {}) | |
if prompt_type == "speaker_separation": | |
template = prompts.get("speaker_separation", | |
"당신은 2명의 화자가 나누는 대화를 분석하는 전문가입니다. 주어진 텍스트를 화자별로 분리해주세요.\n\n분석할 텍스트:\n{text}") | |
elif prompt_type == "spell_correction": | |
template = prompts.get("spell_correction", | |
"한국어 맞춤법을 교정해주세요. [화자1], [화자2] 태그는 유지하세요.\n\n교정할 텍스트:\n{text}") | |
else: | |
raise ValueError(f"알 수 없는 프롬프트 타입: {prompt_type}") | |
return template.format(**kwargs) | |
def process_text(self, input_text, text_name=None, progress_callback=None, speaker1_name=None, speaker2_name=None): | |
""" | |
텍스트를 처리하여 화자 분리 및 맞춤법 교정을 수행합니다. | |
Args: | |
input_text (str): 처리할 텍스트 | |
text_name (str): 텍스트 이름 (선택사항) | |
progress_callback (function): 진행 상황을 알려주는 콜백 함수 | |
speaker1_name (str): 화자1의 사용자 정의 이름 | |
speaker2_name (str): 화자2의 사용자 정의 이름 | |
Returns: | |
dict: 처리 결과 딕셔너리 | |
""" | |
if not self.models_loaded: | |
self.load_models() | |
try: | |
text_name = text_name or f"text_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
logger.info(f"텍스트 처리 시작: {text_name}") | |
# 입력 텍스트 검증 | |
if not input_text or not input_text.strip(): | |
raise ValueError("처리할 텍스트가 비어 있습니다.") | |
full_text = input_text.strip() | |
# 청킹 여부 결정 | |
enable_chunking = self.config.get("processing", {}).get("enable_chunking", True) | |
chunk_size = self.config.get("processing", {}).get("chunk_size", 20000) | |
if enable_chunking and len(full_text) > chunk_size: | |
return self.process_text_with_chunking(full_text, text_name, progress_callback, speaker1_name, speaker2_name) | |
else: | |
return self.process_text_single(full_text, text_name, progress_callback, speaker1_name, speaker2_name) | |
except Exception as e: | |
logger.error(f"텍스트 {text_name} 처리 중 오류: {e}") | |
return { | |
"text_name": text_name or "unknown", | |
"success": False, | |
"error": str(e) | |
} | |
def process_text_single(self, full_text, text_name, progress_callback, speaker1_name, speaker2_name): | |
"""단일 텍스트를 처리합니다.""" | |
# 1단계: Gemini로 화자 분리 | |
if progress_callback: | |
progress_callback("AI 화자 분리 중...", 1, 3) | |
logger.info(f"AI 화자 분리 시작: {text_name}") | |
speaker_separated_text = self.separate_speakers_with_gemini(full_text) | |
# AI 응답 검증 | |
is_valid, validation_msg = self.validate_ai_response(speaker_separated_text) | |
if not is_valid: | |
raise ValueError(f"화자 분리 실패: {validation_msg}") | |
logger.info(f"화자 분리 검증 완료: {validation_msg}") | |
# 2단계: 맞춤법 교정 | |
if progress_callback: | |
progress_callback("맞춤법 교정 중...", 2, 3) | |
logger.info(f"맞춤법 교정 시작: {text_name}") | |
corrected_text = self.correct_spelling_with_gemini(speaker_separated_text) | |
# 3단계: 결과 파싱 및 사용자 정의 이름 적용 | |
if progress_callback: | |
progress_callback("결과 정리 중...", 3, 3) | |
# 교정된 텍스트에서 화자별 대화 파싱 | |
corrected_conversations = self.parse_separated_text(corrected_text) | |
original_conversations = self.parse_separated_text(speaker_separated_text) | |
# 사용자 정의 화자 이름 적용 | |
if speaker1_name or speaker2_name: | |
corrected_conversations, corrected_text = self.apply_custom_speaker_names( | |
corrected_conversations, corrected_text, speaker1_name, speaker2_name) | |
original_conversations, speaker_separated_text = self.apply_custom_speaker_names( | |
original_conversations, speaker_separated_text, speaker1_name, speaker2_name) | |
# 결과 딕셔너리 생성 | |
processing_result = { | |
"text_name": text_name, | |
"processed_time": datetime.now().isoformat(), | |
"original_text": full_text, | |
"separated_text": speaker_separated_text, | |
"corrected_text": corrected_text, | |
"conversations_by_speaker_original": original_conversations, | |
"conversations_by_speaker_corrected": corrected_conversations, | |
"speaker1_name": speaker1_name or "화자1", | |
"speaker2_name": speaker2_name or "화자2", | |
"success": True | |
} | |
logger.info(f"텍스트 처리 완료: {text_name}") | |
return processing_result | |
def process_text_with_chunking(self, full_text, text_name, progress_callback, speaker1_name, speaker2_name): | |
"""청킹을 사용하여 대용량 텍스트를 처리합니다.""" | |
logger.info(f"대용량 텍스트 청킹 처리 시작: {text_name}") | |
chunks = self.split_text_into_chunks(full_text) | |
total_steps = len(chunks) * 2 # 화자 분리 + 맞춤법 교정 | |
current_step = 0 | |
separated_chunks = [] | |
corrected_chunks = [] | |
# 각 청크 처리 | |
for i, chunk in enumerate(chunks): | |
# 화자 분리 | |
current_step += 1 | |
if progress_callback: | |
progress_callback(f"청크 {i+1}/{len(chunks)} 화자 분리 중...", current_step, total_steps) | |
separated_chunk = self.separate_speakers_with_gemini(chunk) | |
# AI 응답 검증 | |
is_valid, validation_msg = self.validate_ai_response(separated_chunk) | |
if not is_valid: | |
logger.warning(f"청크 {i+1} 화자 분리 검증 실패: {validation_msg}") | |
# 검증 실패한 청크는 원본을 사용하되 기본 태그 추가 | |
separated_chunk = f"[화자1] {chunk}" | |
separated_chunks.append(separated_chunk) | |
# 맞춤법 교정 | |
current_step += 1 | |
if progress_callback: | |
progress_callback(f"청크 {i+1}/{len(chunks)} 맞춤법 교정 중...", current_step, total_steps) | |
corrected_chunk = self.correct_spelling_with_gemini(separated_chunk) | |
corrected_chunks.append(corrected_chunk) | |
# 청크들을 다시 합치기 | |
speaker_separated_text = "\n\n".join(separated_chunks) | |
corrected_text = "\n\n".join(corrected_chunks) | |
# 결과 파싱 및 사용자 정의 이름 적용 | |
corrected_conversations = self.parse_separated_text(corrected_text) | |
original_conversations = self.parse_separated_text(speaker_separated_text) | |
if speaker1_name or speaker2_name: | |
corrected_conversations, corrected_text = self.apply_custom_speaker_names( | |
corrected_conversations, corrected_text, speaker1_name, speaker2_name) | |
original_conversations, speaker_separated_text = self.apply_custom_speaker_names( | |
original_conversations, speaker_separated_text, speaker1_name, speaker2_name) | |
processing_result = { | |
"text_name": text_name, | |
"processed_time": datetime.now().isoformat(), | |
"original_text": full_text, | |
"separated_text": speaker_separated_text, | |
"corrected_text": corrected_text, | |
"conversations_by_speaker_original": original_conversations, | |
"conversations_by_speaker_corrected": corrected_conversations, | |
"speaker1_name": speaker1_name or "화자1", | |
"speaker2_name": speaker2_name or "화자2", | |
"chunks_processed": len(chunks), | |
"success": True | |
} | |
logger.info(f"청킹 처리 완료: {text_name} ({len(chunks)}개 청크)") | |
return processing_result | |
def apply_custom_speaker_names(self, conversations, text, speaker1_name, speaker2_name): | |
"""사용자 정의 화자 이름을 적용합니다.""" | |
updated_conversations = {} | |
updated_text = text | |
# 대화 딕셔너리 업데이트 | |
if speaker1_name: | |
updated_conversations[speaker1_name] = conversations.get("화자1", []) | |
updated_text = updated_text.replace("[화자1]", f"[{speaker1_name}]") | |
else: | |
updated_conversations["화자1"] = conversations.get("화자1", []) | |
if speaker2_name: | |
updated_conversations[speaker2_name] = conversations.get("화자2", []) | |
updated_text = updated_text.replace("[화자2]", f"[{speaker2_name}]") | |
else: | |
updated_conversations["화자2"] = conversations.get("화자2", []) | |
return updated_conversations, updated_text | |
def separate_speakers_with_gemini(self, text): | |
"""Gemini API를 사용하여 텍스트를 화자별로 분리합니다.""" | |
try: | |
prompt = self.get_prompt("speaker_separation", text=text) | |
response = self.gemini_model.generate_content(prompt) | |
separated_text = response.text.strip() | |
logger.info("Gemini를 통한 화자 분리가 완료되었습니다.") | |
return separated_text | |
except Exception as e: | |
logger.error(f"Gemini 화자 분리 중 오류: {e}") | |
return f"[오류] 화자 분리 실패: {str(e)}" | |
def correct_spelling_with_gemini(self, separated_text): | |
"""Gemini API를 사용하여 화자별 분리된 텍스트의 맞춤법을 교정합니다.""" | |
try: | |
prompt = self.get_prompt("spell_correction", text=separated_text) | |
response = self.gemini_model.generate_content(prompt) | |
corrected_text = response.text.strip() | |
logger.info("Gemini를 통한 맞춤법 교정이 완료되었습니다.") | |
return corrected_text | |
except Exception as e: | |
logger.error(f"Gemini 맞춤법 교정 중 오류: {e}") | |
return separated_text # 오류 발생 시 원본 반환 | |
def parse_separated_text(self, separated_text): | |
"""화자별로 분리된 텍스트를 파싱하여 구조화합니다.""" | |
conversations = { | |
"화자1": [], | |
"화자2": [] | |
} | |
# 정규표현식으로 화자별 발언 추출 | |
pattern = r'\[화자([12])\]\s*(.+?)(?=\[화자[12]\]|$)' | |
matches = re.findall(pattern, separated_text, re.DOTALL) | |
for speaker_num, content in matches: | |
speaker = f"화자{speaker_num}" | |
content = content.strip() | |
if content: | |
conversations[speaker].append(content) | |
return conversations | |
def create_download_zip(self, result, output_dir="output"): | |
"""처리 결과를 ZIP 파일로 생성합니다.""" | |
try: | |
if not self.config.get("output", {}).get("create_download_zip", True): | |
return None | |
base_name = result["text_name"] | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
zip_path = os.path.join(output_dir, f"{base_name}_complete_{timestamp}.zip") | |
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
# 전체 대화 저장 | |
all_content = self._generate_complete_text_content(result) | |
zipf.writestr(f"{base_name}_전체대화_{timestamp}.txt", all_content) | |
# 화자별 개별 파일 | |
for speaker, utterances in result['conversations_by_speaker_corrected'].items(): | |
if utterances: | |
speaker_content = self._generate_speaker_content(result, speaker, utterances) | |
zipf.writestr(f"{base_name}_{speaker}_교정본_{timestamp}.txt", speaker_content) | |
# JSON 데이터 | |
json_content = json.dumps(result, ensure_ascii=False, indent=2) | |
zipf.writestr(f"{base_name}_data_{timestamp}.json", json_content) | |
logger.info(f"ZIP 파일 생성 완료: {zip_path}") | |
return zip_path | |
except Exception as e: | |
logger.error(f"ZIP 파일 생성 중 오류: {e}") | |
return None | |
def _generate_complete_text_content(self, result): | |
"""전체 대화 텍스트 내용을 생성합니다.""" | |
content = [] | |
content.append(f"파일명: {result['text_name']}") | |
content.append(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
content.append(f"화자1: {result.get('speaker1_name', '화자1')}") | |
content.append(f"화자2: {result.get('speaker2_name', '화자2')}") | |
content.append("="*50) | |
content.append("원본 텍스트:") | |
content.append(result['original_text']) | |
content.append("="*50) | |
content.append("화자별 분리 결과 (원본):") | |
content.append(result['separated_text']) | |
content.append("="*50) | |
content.append("화자별 분리 결과 (맞춤법 교정):") | |
content.append(result['corrected_text']) | |
return "\n".join(content) | |
def _generate_speaker_content(self, result, speaker, utterances): | |
"""화자별 개별 파일 내용을 생성합니다.""" | |
content = [] | |
content.append(f"파일명: {result['text_name']}") | |
content.append(f"화자: {speaker}") | |
content.append(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
content.append(f"발언 수: {len(utterances)}") | |
content.append("="*50) | |
for idx, utterance in enumerate(utterances, 1): | |
content.append(f"{idx}. {utterance}") | |
content.append("") | |
return "\n".join(content) | |
def save_results_to_files(self, result, output_dir="output"): | |
"""처리 결과를 파일로 저장합니다.""" | |
if not result.get("success", False): | |
logger.error(f"결과 저장 실패: {result.get('error', 'Unknown error')}") | |
return False | |
try: | |
# output 폴더 생성 | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
base_name = result["text_name"] | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
saved_files = [] | |
output_config = self.config.get("output", {}) | |
# 1. 전체 대화 저장 | |
if output_config.get("save_original", True) or output_config.get("save_separated", True) or output_config.get("save_corrected", True): | |
all_txt_path = f"{output_dir}/{base_name}_전체대화_{timestamp}.txt" | |
with open(all_txt_path, 'w', encoding='utf-8') as f: | |
f.write(self._generate_complete_text_content(result)) | |
saved_files.append(all_txt_path) | |
# 2. 화자별 개별 파일 저장 | |
if output_config.get("save_individual_speakers", True): | |
for speaker, utterances in result['conversations_by_speaker_corrected'].items(): | |
if utterances: | |
speaker_txt_path = f"{output_dir}/{base_name}_{speaker}_교정본_{timestamp}.txt" | |
with open(speaker_txt_path, 'w', encoding='utf-8') as f: | |
f.write(self._generate_speaker_content(result, speaker, utterances)) | |
saved_files.append(speaker_txt_path) | |
# 3. JSON 형태로도 저장 | |
if output_config.get("save_json", True): | |
json_path = f"{output_dir}/{base_name}_data_{timestamp}.json" | |
with open(json_path, 'w', encoding='utf-8') as f: | |
json.dump(result, f, ensure_ascii=False, indent=2) | |
saved_files.append(json_path) | |
# 4. ZIP 파일 생성 | |
zip_path = self.create_download_zip(result, output_dir) | |
if zip_path: | |
saved_files.append(zip_path) | |
logger.info(f"결과 파일 저장 완료: {len(saved_files)}개 파일") | |
result["saved_files"] = saved_files | |
return True | |
except Exception as e: | |
logger.error(f"결과 파일 저장 중 오류: {e}") | |
return False | |