import os import google.generativeai as genai from dotenv import load_dotenv import logging import json from datetime import datetime import re import tempfile import zipfile # 환경 변수 로드 load_dotenv() # 로깅 설정 logger = logging.getLogger(__name__) class TextProcessor: """ 텍스트를 AI를 통한 화자 분리 및 맞춤법 교정을 수행하는 클래스 """ def __init__(self, google_api_key=None, config_path="config.json"): """ TextProcessor 초기화 Args: google_api_key (str): Google AI API 키. None인 경우 환경 변수에서 읽음 config_path (str): 설정 파일 경로 """ # API 키 안전하게 가져오기 if google_api_key: self.google_api_key = str(google_api_key) else: self.google_api_key = os.getenv("GOOGLE_API_KEY") self.gemini_model = None self.models_loaded = False # 설정 파일 로드 self.config = self.load_config(config_path) # API 키 검증 - 더 안전한 체크 if (self.google_api_key is None or not isinstance(self.google_api_key, str) or len(self.google_api_key.strip()) == 0 or self.google_api_key.strip() == "your_google_api_key_here"): raise ValueError("Google AI API 키가 설정되지 않았습니다. 환경 변수 GOOGLE_API_KEY를 설정하거나 매개변수로 전달하세요.") def load_config(self, config_path): """설정 파일을 로드합니다.""" try: if os.path.exists(config_path): with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) logger.info(f"설정 파일 로드 완료: {config_path}") return config else: logger.warning(f"설정 파일을 찾을 수 없습니다: {config_path}. 기본 설정을 사용합니다.") return self.get_default_config() except Exception as e: logger.error(f"설정 파일 로드 실패: {e}. 기본 설정을 사용합니다.") return self.get_default_config() def get_default_config(self): """기본 설정을 반환합니다.""" return { "models": { "gemini": {"name": "gemini-2.0-flash", "temperature": 0.3} }, "processing": { "chunk_size": 20000, "enable_chunking": True, "validate_ai_response": True, "required_speaker_tags": ["[화자1]", "[화자2]"] }, "output": { "save_original": True, "save_separated": True, "save_corrected": True, "save_individual_speakers": True, "save_json": True, "create_download_zip": True } } def load_models(self): """Gemini AI 모델을 로드합니다.""" try: logger.info("Gemini 모델 로딩을 시작합니다.") # 설정에서 모델 이름 가져오기 model_name = self.config.get("models", {}).get("gemini", {}).get("name", "gemini-2.0-flash") # Gemini 모델 설정 genai.configure(api_key=self.google_api_key) self.gemini_model = genai.GenerativeModel(model_name) logger.info(f"{model_name} 모델 설정이 완료되었습니다.") self.models_loaded = True logger.info("Gemini 모델 로딩이 완료되었습니다.") return True except Exception as e: error_msg = f"Gemini 모델을 로딩하는 중 오류가 발생했습니다: {e}" logger.error(error_msg) raise Exception(error_msg) def split_text_into_chunks(self, text, chunk_size=None): """텍스트를 청크로 분할합니다.""" if chunk_size is None: chunk_size = self.config.get("processing", {}).get("chunk_size", 20000) if len(text) <= chunk_size: return [text] chunks = [] sentences = re.split(r'[.!?。!?]\s+', text) current_chunk = "" for sentence in sentences: if len(current_chunk) + len(sentence) <= chunk_size: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) logger.info(f"텍스트를 {len(chunks)}개 청크로 분할했습니다.") return chunks def validate_ai_response(self, response_text, expected_tags=None): """AI 응답의 유효성을 검증합니다.""" if not self.config.get("processing", {}).get("validate_ai_response", True): return True, "검증 비활성화됨" if expected_tags is None: expected_tags = self.config.get("processing", {}).get("required_speaker_tags", ["[화자1]", "[화자2]"]) # 응답이 비어있는지 확인 if not response_text or not response_text.strip(): return False, "AI 응답이 비어 있습니다." # 필요한 태그가 포함되어 있는지 확인 found_tags = [] for tag in expected_tags: if tag in response_text: found_tags.append(tag) if not found_tags: return False, f"화자 태그({', '.join(expected_tags)})가 응답에 포함되지 않았습니다." if len(found_tags) < 2: return False, f"최소 2개의 화자 태그가 필요하지만 {len(found_tags)}개만 발견되었습니다." return True, f"검증 성공: {', '.join(found_tags)} 태그 발견" def get_prompt(self, prompt_type, **kwargs): """설정에서 프롬프트를 가져와 포맷팅합니다.""" prompts = self.config.get("prompts", {}) if prompt_type == "speaker_separation": template = prompts.get("speaker_separation", "당신은 2명의 화자가 나누는 대화를 분석하는 전문가입니다. 주어진 텍스트를 화자별로 분리해주세요.\n\n분석할 텍스트:\n{text}") elif prompt_type == "spell_correction": template = prompts.get("spell_correction", "한국어 맞춤법을 교정해주세요. [화자1], [화자2] 태그는 유지하세요.\n\n교정할 텍스트:\n{text}") else: raise ValueError(f"알 수 없는 프롬프트 타입: {prompt_type}") return template.format(**kwargs) def process_text(self, input_text, text_name=None, progress_callback=None, speaker1_name=None, speaker2_name=None): """ 텍스트를 처리하여 화자 분리 및 맞춤법 교정을 수행합니다. Args: input_text (str): 처리할 텍스트 text_name (str): 텍스트 이름 (선택사항) progress_callback (function): 진행 상황을 알려주는 콜백 함수 speaker1_name (str): 화자1의 사용자 정의 이름 speaker2_name (str): 화자2의 사용자 정의 이름 Returns: dict: 처리 결과 딕셔너리 """ if not self.models_loaded: self.load_models() try: text_name = text_name or f"text_{datetime.now().strftime('%Y%m%d_%H%M%S')}" logger.info(f"텍스트 처리 시작: {text_name}") # 입력 텍스트 검증 if not input_text or not input_text.strip(): raise ValueError("처리할 텍스트가 비어 있습니다.") full_text = input_text.strip() # 청킹 여부 결정 enable_chunking = self.config.get("processing", {}).get("enable_chunking", True) chunk_size = self.config.get("processing", {}).get("chunk_size", 20000) if enable_chunking and len(full_text) > chunk_size: return self.process_text_with_chunking(full_text, text_name, progress_callback, speaker1_name, speaker2_name) else: return self.process_text_single(full_text, text_name, progress_callback, speaker1_name, speaker2_name) except Exception as e: logger.error(f"텍스트 {text_name} 처리 중 오류: {e}") return { "text_name": text_name or "unknown", "success": False, "error": str(e) } def process_text_single(self, full_text, text_name, progress_callback, speaker1_name, speaker2_name): """단일 텍스트를 처리합니다.""" # 1단계: Gemini로 화자 분리 if progress_callback: progress_callback("AI 화자 분리 중...", 1, 3) logger.info(f"AI 화자 분리 시작: {text_name}") speaker_separated_text = self.separate_speakers_with_gemini(full_text) # AI 응답 검증 is_valid, validation_msg = self.validate_ai_response(speaker_separated_text) if not is_valid: raise ValueError(f"화자 분리 실패: {validation_msg}") logger.info(f"화자 분리 검증 완료: {validation_msg}") # 2단계: 맞춤법 교정 if progress_callback: progress_callback("맞춤법 교정 중...", 2, 3) logger.info(f"맞춤법 교정 시작: {text_name}") corrected_text = self.correct_spelling_with_gemini(speaker_separated_text) # 3단계: 결과 파싱 및 사용자 정의 이름 적용 if progress_callback: progress_callback("결과 정리 중...", 3, 3) # 교정된 텍스트에서 화자별 대화 파싱 corrected_conversations = self.parse_separated_text(corrected_text) original_conversations = self.parse_separated_text(speaker_separated_text) # 사용자 정의 화자 이름 적용 if speaker1_name or speaker2_name: corrected_conversations, corrected_text = self.apply_custom_speaker_names( corrected_conversations, corrected_text, speaker1_name, speaker2_name) original_conversations, speaker_separated_text = self.apply_custom_speaker_names( original_conversations, speaker_separated_text, speaker1_name, speaker2_name) # 결과 딕셔너리 생성 processing_result = { "text_name": text_name, "processed_time": datetime.now().isoformat(), "original_text": full_text, "separated_text": speaker_separated_text, "corrected_text": corrected_text, "conversations_by_speaker_original": original_conversations, "conversations_by_speaker_corrected": corrected_conversations, "speaker1_name": speaker1_name or "화자1", "speaker2_name": speaker2_name or "화자2", "success": True } logger.info(f"텍스트 처리 완료: {text_name}") return processing_result def process_text_with_chunking(self, full_text, text_name, progress_callback, speaker1_name, speaker2_name): """청킹을 사용하여 대용량 텍스트를 처리합니다.""" logger.info(f"대용량 텍스트 청킹 처리 시작: {text_name}") chunks = self.split_text_into_chunks(full_text) total_steps = len(chunks) * 2 # 화자 분리 + 맞춤법 교정 current_step = 0 separated_chunks = [] corrected_chunks = [] # 각 청크 처리 for i, chunk in enumerate(chunks): # 화자 분리 current_step += 1 if progress_callback: progress_callback(f"청크 {i+1}/{len(chunks)} 화자 분리 중...", current_step, total_steps) separated_chunk = self.separate_speakers_with_gemini(chunk) # AI 응답 검증 is_valid, validation_msg = self.validate_ai_response(separated_chunk) if not is_valid: logger.warning(f"청크 {i+1} 화자 분리 검증 실패: {validation_msg}") # 검증 실패한 청크는 원본을 사용하되 기본 태그 추가 separated_chunk = f"[화자1] {chunk}" separated_chunks.append(separated_chunk) # 맞춤법 교정 current_step += 1 if progress_callback: progress_callback(f"청크 {i+1}/{len(chunks)} 맞춤법 교정 중...", current_step, total_steps) corrected_chunk = self.correct_spelling_with_gemini(separated_chunk) corrected_chunks.append(corrected_chunk) # 청크들을 다시 합치기 speaker_separated_text = "\n\n".join(separated_chunks) corrected_text = "\n\n".join(corrected_chunks) # 결과 파싱 및 사용자 정의 이름 적용 corrected_conversations = self.parse_separated_text(corrected_text) original_conversations = self.parse_separated_text(speaker_separated_text) if speaker1_name or speaker2_name: corrected_conversations, corrected_text = self.apply_custom_speaker_names( corrected_conversations, corrected_text, speaker1_name, speaker2_name) original_conversations, speaker_separated_text = self.apply_custom_speaker_names( original_conversations, speaker_separated_text, speaker1_name, speaker2_name) processing_result = { "text_name": text_name, "processed_time": datetime.now().isoformat(), "original_text": full_text, "separated_text": speaker_separated_text, "corrected_text": corrected_text, "conversations_by_speaker_original": original_conversations, "conversations_by_speaker_corrected": corrected_conversations, "speaker1_name": speaker1_name or "화자1", "speaker2_name": speaker2_name or "화자2", "chunks_processed": len(chunks), "success": True } logger.info(f"청킹 처리 완료: {text_name} ({len(chunks)}개 청크)") return processing_result def apply_custom_speaker_names(self, conversations, text, speaker1_name, speaker2_name): """사용자 정의 화자 이름을 적용합니다.""" updated_conversations = {} updated_text = text # 대화 딕셔너리 업데이트 if speaker1_name: updated_conversations[speaker1_name] = conversations.get("화자1", []) updated_text = updated_text.replace("[화자1]", f"[{speaker1_name}]") else: updated_conversations["화자1"] = conversations.get("화자1", []) if speaker2_name: updated_conversations[speaker2_name] = conversations.get("화자2", []) updated_text = updated_text.replace("[화자2]", f"[{speaker2_name}]") else: updated_conversations["화자2"] = conversations.get("화자2", []) return updated_conversations, updated_text def separate_speakers_with_gemini(self, text): """Gemini API를 사용하여 텍스트를 화자별로 분리합니다.""" try: prompt = self.get_prompt("speaker_separation", text=text) response = self.gemini_model.generate_content(prompt) separated_text = response.text.strip() logger.info("Gemini를 통한 화자 분리가 완료되었습니다.") return separated_text except Exception as e: logger.error(f"Gemini 화자 분리 중 오류: {e}") return f"[오류] 화자 분리 실패: {str(e)}" def correct_spelling_with_gemini(self, separated_text): """Gemini API를 사용하여 화자별 분리된 텍스트의 맞춤법을 교정합니다.""" try: prompt = self.get_prompt("spell_correction", text=separated_text) response = self.gemini_model.generate_content(prompt) corrected_text = response.text.strip() logger.info("Gemini를 통한 맞춤법 교정이 완료되었습니다.") return corrected_text except Exception as e: logger.error(f"Gemini 맞춤법 교정 중 오류: {e}") return separated_text # 오류 발생 시 원본 반환 def parse_separated_text(self, separated_text): """화자별로 분리된 텍스트를 파싱하여 구조화합니다.""" conversations = { "화자1": [], "화자2": [] } # 정규표현식으로 화자별 발언 추출 pattern = r'\[화자([12])\]\s*(.+?)(?=\[화자[12]\]|$)' matches = re.findall(pattern, separated_text, re.DOTALL) for speaker_num, content in matches: speaker = f"화자{speaker_num}" content = content.strip() if content: conversations[speaker].append(content) return conversations def create_download_zip(self, result, output_dir="output"): """처리 결과를 ZIP 파일로 생성합니다.""" try: if not self.config.get("output", {}).get("create_download_zip", True): return None base_name = result["text_name"] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_path = os.path.join(output_dir, f"{base_name}_complete_{timestamp}.zip") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # 전체 대화 저장 all_content = self._generate_complete_text_content(result) zipf.writestr(f"{base_name}_전체대화_{timestamp}.txt", all_content) # 화자별 개별 파일 for speaker, utterances in result['conversations_by_speaker_corrected'].items(): if utterances: speaker_content = self._generate_speaker_content(result, speaker, utterances) zipf.writestr(f"{base_name}_{speaker}_교정본_{timestamp}.txt", speaker_content) # JSON 데이터 json_content = json.dumps(result, ensure_ascii=False, indent=2) zipf.writestr(f"{base_name}_data_{timestamp}.json", json_content) logger.info(f"ZIP 파일 생성 완료: {zip_path}") return zip_path except Exception as e: logger.error(f"ZIP 파일 생성 중 오류: {e}") return None def _generate_complete_text_content(self, result): """전체 대화 텍스트 내용을 생성합니다.""" content = [] content.append(f"파일명: {result['text_name']}") content.append(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") content.append(f"화자1: {result.get('speaker1_name', '화자1')}") content.append(f"화자2: {result.get('speaker2_name', '화자2')}") content.append("="*50) content.append("원본 텍스트:") content.append(result['original_text']) content.append("="*50) content.append("화자별 분리 결과 (원본):") content.append(result['separated_text']) content.append("="*50) content.append("화자별 분리 결과 (맞춤법 교정):") content.append(result['corrected_text']) return "\n".join(content) def _generate_speaker_content(self, result, speaker, utterances): """화자별 개별 파일 내용을 생성합니다.""" content = [] content.append(f"파일명: {result['text_name']}") content.append(f"화자: {speaker}") content.append(f"처리 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") content.append(f"발언 수: {len(utterances)}") content.append("="*50) for idx, utterance in enumerate(utterances, 1): content.append(f"{idx}. {utterance}") content.append("") return "\n".join(content) def save_results_to_files(self, result, output_dir="output"): """처리 결과를 파일로 저장합니다.""" if not result.get("success", False): logger.error(f"결과 저장 실패: {result.get('error', 'Unknown error')}") return False try: # output 폴더 생성 if not os.path.exists(output_dir): os.makedirs(output_dir) base_name = result["text_name"] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") saved_files = [] output_config = self.config.get("output", {}) # 1. 전체 대화 저장 if output_config.get("save_original", True) or output_config.get("save_separated", True) or output_config.get("save_corrected", True): all_txt_path = f"{output_dir}/{base_name}_전체대화_{timestamp}.txt" with open(all_txt_path, 'w', encoding='utf-8') as f: f.write(self._generate_complete_text_content(result)) saved_files.append(all_txt_path) # 2. 화자별 개별 파일 저장 if output_config.get("save_individual_speakers", True): for speaker, utterances in result['conversations_by_speaker_corrected'].items(): if utterances: speaker_txt_path = f"{output_dir}/{base_name}_{speaker}_교정본_{timestamp}.txt" with open(speaker_txt_path, 'w', encoding='utf-8') as f: f.write(self._generate_speaker_content(result, speaker, utterances)) saved_files.append(speaker_txt_path) # 3. JSON 형태로도 저장 if output_config.get("save_json", True): json_path = f"{output_dir}/{base_name}_data_{timestamp}.json" with open(json_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) saved_files.append(json_path) # 4. ZIP 파일 생성 zip_path = self.create_download_zip(result, output_dir) if zip_path: saved_files.append(zip_path) logger.info(f"결과 파일 저장 완료: {len(saved_files)}개 파일") result["saved_files"] = saved_files return True except Exception as e: logger.error(f"결과 파일 저장 중 오류: {e}") return False