""" 문서 처리 유틸리티 모듈 """ import os import re import csv import io import logging from typing import List, Dict, Any, Optional, Tuple, Union import numpy as np logger = logging.getLogger("DocProcessor") if not logger.hasHandlers(): handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) class DocumentProcessor: """문서 처리 유틸리티 클래스""" @staticmethod def split_text( text: str, chunk_size: int = 512, chunk_overlap: int = 50, separator: str = "\n" ) -> List[str]: """ 텍스트를 더 작은 청크로 분할 Args: text: 분할할 텍스트 chunk_size: 각 청크의 최대 문자 수 chunk_overlap: 청크 간 중첩되는 문자 수 separator: 분할 시 사용할 구분자 Returns: 분할된 텍스트 청크 목록 """ if not text or chunk_size <= 0: return [] # 구분자로 분할 parts = text.split(separator) chunks = [] current_chunk = [] current_size = 0 for part in parts: part_size = len(part) if current_size + part_size + len(current_chunk) > chunk_size and current_chunk: # 현재 청크가 최대 크기를 초과하면 저장 chunks.append(separator.join(current_chunk)) # 중첩을 위해 일부 청크 유지 overlap_tokens = [] overlap_size = 0 for token in reversed(current_chunk): if overlap_size + len(token) <= chunk_overlap: overlap_tokens.insert(0, token) overlap_size += len(token) + 1 # separator 길이 포함 else: break current_chunk = overlap_tokens current_size = overlap_size - len(current_chunk) # separator 길이 제외 current_chunk.append(part) current_size += part_size # 마지막 청크 추가 if current_chunk: chunks.append(separator.join(current_chunk)) return chunks @staticmethod def clean_text(text: str, remove_urls: bool = True, remove_extra_whitespace: bool = True) -> str: """ 텍스트 정제 Args: text: 정제할 텍스트 remove_urls: URL 제거 여부 remove_extra_whitespace: 여분의 공백 제거 여부 Returns: 정제된 텍스트 """ if not text: return "" # URL 제거 if remove_urls: text = re.sub(r'https?://\S+|www\.\S+', '', text) # 특수 문자 및 HTML 태그 정제 text = re.sub(r'<.*?>', '', text) # HTML 태그 제거 # 여분의 공백 제거 if remove_extra_whitespace: text = re.sub(r'\s+', ' ', text).strip() return text @staticmethod def text_to_documents( text: str, metadata: Optional[Dict[str, Any]] = None, chunk_size: int = 512, chunk_overlap: int = 50 ) -> List[Dict[str, Any]]: """ 텍스트를 문서 객체 목록으로 변환 Args: text: 변환할 텍스트 metadata: 문서에 추가할 메타데이터 chunk_size: 각 청크의 최대 문자 수 chunk_overlap: 청크 간 중첩되는 문자 수 Returns: 문서 객체 목록 """ if not text: return [] # 텍스트 정제 clean = DocumentProcessor.clean_text(text) # 텍스트 분할 chunks = DocumentProcessor.split_text( clean, chunk_size=chunk_size, chunk_overlap=chunk_overlap ) # 문서 객체 생성 documents = [] for i, chunk in enumerate(chunks): doc = { "text": chunk, "index": i, "chunk_count": len(chunks) } # 메타데이터 추가 if metadata: doc.update(metadata) documents.append(doc) return documents @staticmethod def load_documents_from_directory( directory: str, extensions: List[str] = [".txt", ".md", ".csv"], recursive: bool = True, chunk_size: int = 512, chunk_overlap: int = 50 ) -> List[Dict[str, Any]]: """ 디렉토리에서 문서 로드 및 처리 Args: directory: 로드할 디렉토리 경로 extensions: 처리할 파일 확장자 목록 recursive: 하위 디렉토리 검색 여부 chunk_size: 각 청크의 최대 문자 수 chunk_overlap: 청크 간 중첩되는 문자 수 Returns: 문서 객체 목록 """ if not os.path.isdir(directory): logger.error(f"디렉토리를 찾을 수 없습니다: {directory}") return [] documents = [] for root, dirs, files in os.walk(directory): if not recursive and root != directory: continue for file in files: _, ext = os.path.splitext(file) if ext.lower() not in extensions: continue file_path = os.path.join(root, file) rel_path = os.path.relpath(file_path, directory) try: logger.info(f"파일 로드 중: {rel_path}") # 먼저 UTF-8로 시도 try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: # UTF-8로 실패하면 CP949(한국어 Windows 기본 인코딩)로 시도 logger.info(f"UTF-8 디코딩 실패, CP949로 시도: {rel_path}") with open(file_path, 'r', encoding='cp949') as f: content = f.read() # 메타데이터 생성 metadata = { "source": rel_path, "filename": file, "filetype": ext.lower()[1:], "filepath": file_path } # CSV 파일은 특별 처리 if ext.lower() == '.csv': logger.info(f"CSV 파일 감지, 행 단위로 분할 처리: {rel_path}") file_docs = DocumentProcessor.csv_to_documents(content, metadata) else: # 일반 텍스트 문서 처리 file_docs = DocumentProcessor.text_to_documents( content, metadata=metadata, chunk_size=chunk_size, chunk_overlap=chunk_overlap ) documents.extend(file_docs) logger.info(f"{len(file_docs)}개 청크 추출: {rel_path}") except Exception as e: logger.error(f"파일 '{rel_path}' 처리 중 오류 발생: {e}") continue logger.info(f"총 {len(documents)}개 문서 청크를 로드했습니다.") return documents @staticmethod def prepare_rag_context(results: List[Dict[str, Any]], field: str = "text") -> List[str]: """ 검색 결과에서 RAG에 사용할 컨텍스트 추출 Args: results: 검색 결과 목록 field: 텍스트 내용이 있는 필드 이름 Returns: 컨텍스트 텍스트 목록 """ context = [] for result in results: if field in result: context.append(result[field]) return context @staticmethod def csv_to_documents(content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]: """ CSV 파일 내용을 행 단위로 분리하여 각 행을 별도의 문서로 처리 Args: content: CSV 파일의 내용 metadata: 기본 메타데이터 Returns: 문서 객체 목록 (각 행이 별도의 문서) """ documents = [] try: # 일반 CSV 파싱 시도 (코마 구분자 기본) try: csv_reader = csv.reader(io.StringIO(content)) rows = list(csv_reader) if len(rows) > 0 and len(rows[0]) > 1: # 코마로 제대로 구분되었다고 판단 logger.info(f"CSV 파일 코마 구분자로 처리: {metadata.get('source', 'unknown')}") has_valid_format = True else: # 코마로 제대로 구분되지 않음 has_valid_format = False except Exception: has_valid_format = False # 코마 형식이 아닌 경우, 공백 구분자 처리 시도 if not has_valid_format: logger.warning(f"CSV 파일이 표준 코마 형식이 아닙니다. 공백 구분자로 처리하겠습니다: {metadata.get('source', 'unknown')}") lines = content.strip().split('\n') for i, line in enumerate(lines): # IT로 시작하는 줄만 처리 (데이터 행으로 간주) if not line.strip().startswith('IT'): continue # 공백으로 분리하되, 최소 5개 열로 보장 parts = line.split(maxsplit=4) # 유효한 행의 최소 길이 확인 if len(parts) < 5: logger.warning(f"행 {i+1} 부족한 데이터: {line[:50]}...") continue # 각 필드 추출 doc_id = parts[0].strip() # IT 번호 query_type = parts[1].strip() # 쿼리 유형 question = parts[2].strip() # 질문 answer = parts[3].strip() # 답변 reference = parts[4].strip() if len(parts) > 4 else "" # 참조 # 문서 텍스트 생성 - 각 필드를 구분하여 포함 text = f"ID: {doc_id}\n" text += f"쿼리 유형: {query_type}\n" text += f"질의 (Question): {question}\n" text += f"응답 (Answer): {answer}\n" if reference: text += f"참조 문서/맥락 (Reference/Context): {reference}" # 문서 객체 생성 doc_metadata = metadata.copy() doc_metadata.update({ "row": i, "query_type": query_type, "question": question, "answer": answer, "reference": reference }) document = { "text": text, "id": doc_id, # IT 번호를 ID로 사용 **doc_metadata } documents.append(document) logger.debug(f"IT 문서 처리: {doc_id} - {question[:30]}...") logger.info(f"공백 구분자 CSV 파일 '{metadata.get('source', 'unknown')}'에서 {len(documents)}개 행을 문서로 변환했습니다.") return documents # 표준 CSV 형식 처리 (코마 구분자 사용) if not rows: logger.warning(f"CSV 파일에 데이터가 없습니다: {metadata.get('source', 'unknown')}") return [] # 첫 번째 행을 헤더로 사용 headers = rows[0] logger.debug(f"CSV 헤더: {headers}") # 각 행을 별도의 문서로 변환 for i, row in enumerate(rows[1:], 1): # 헤더 제외, 1부터 시작 # 행이 헤더보다 짧으면 빈 값으로 채움 while len(row) < len(headers): row.append("") # 행 데이터를 사전형으로 변환 row_data = {headers[j]: value for j, value in enumerate(row) if j < len(headers)} # 첫 번째 열을 ID로 사용 (있는 경우) row_id = row[0] if row and len(row) > 0 else f"row_{i}" # 문서 텍스트 생성 - 모든 필드를 포함한 표현 text_parts = [] for j, header in enumerate(headers): if j < len(row) and row[j]: text_parts.append(f"{header}: {row[j]}") text = "\n".join(text_parts) # 문서 객체 생성 doc_metadata = metadata.copy() doc_metadata.update({ "row": i, "row_id": row_id, "total_rows": len(rows) - 1, # 헤더 제외 "csv_data": row_data # 원본 행 데이터도 저장 }) document = { "text": text, "id": row_id, **doc_metadata } documents.append(document) logger.info(f"CSV 파일 '{metadata.get('source', 'unknown')}'에서 {len(documents)}개 행을 문서로 변환했습니다.") except Exception as e: logger.error(f"CSV 파일 처리 중 오류 발생: {e}") return documents