RAG_AgenticServer / utils /document_processor.py
jeongsoo's picture
init
64371be
"""
๋ฌธ์„œ ์ฒ˜๋ฆฌ ์œ ํ‹ธ๋ฆฌํ‹ฐ ๋ชจ๋“ˆ
"""
import os
import re
import csv
import io
import logging
from typing import List, Dict, Any, Optional, Tuple, Union
import numpy as np
logger = logging.getLogger("DocProcessor")
if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class DocumentProcessor:
"""๋ฌธ์„œ ์ฒ˜๋ฆฌ ์œ ํ‹ธ๋ฆฌํ‹ฐ ํด๋ž˜์Šค"""
@staticmethod
def split_text(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
separator: str = "\n"
) -> List[str]:
"""
ํ…์ŠคํŠธ๋ฅผ ๋” ์ž‘์€ ์ฒญํฌ๋กœ ๋ถ„ํ• 
Args:
text: ๋ถ„ํ• ํ•  ํ…์ŠคํŠธ
chunk_size: ๊ฐ ์ฒญํฌ์˜ ์ตœ๋Œ€ ๋ฌธ์ž ์ˆ˜
chunk_overlap: ์ฒญํฌ ๊ฐ„ ์ค‘์ฒฉ๋˜๋Š” ๋ฌธ์ž ์ˆ˜
separator: ๋ถ„ํ•  ์‹œ ์‚ฌ์šฉํ•  ๊ตฌ๋ถ„์ž
Returns:
๋ถ„ํ• ๋œ ํ…์ŠคํŠธ ์ฒญํฌ ๋ชฉ๋ก
"""
if not text or chunk_size <= 0:
return []
# ๊ตฌ๋ถ„์ž๋กœ ๋ถ„ํ• 
parts = text.split(separator)
chunks = []
current_chunk = []
current_size = 0
for part in parts:
part_size = len(part)
if current_size + part_size + len(current_chunk) > chunk_size and current_chunk:
# ํ˜„์žฌ ์ฒญํฌ๊ฐ€ ์ตœ๋Œ€ ํฌ๊ธฐ๋ฅผ ์ดˆ๊ณผํ•˜๋ฉด ์ €์žฅ
chunks.append(separator.join(current_chunk))
# ์ค‘์ฒฉ์„ ์œ„ํ•ด ์ผ๋ถ€ ์ฒญํฌ ์œ ์ง€
overlap_tokens = []
overlap_size = 0
for token in reversed(current_chunk):
if overlap_size + len(token) <= chunk_overlap:
overlap_tokens.insert(0, token)
overlap_size += len(token) + 1 # separator ๊ธธ์ด ํฌํ•จ
else:
break
current_chunk = overlap_tokens
current_size = overlap_size - len(current_chunk) # separator ๊ธธ์ด ์ œ์™ธ
current_chunk.append(part)
current_size += part_size
# ๋งˆ์ง€๋ง‰ ์ฒญํฌ ์ถ”๊ฐ€
if current_chunk:
chunks.append(separator.join(current_chunk))
return chunks
@staticmethod
def clean_text(text: str, remove_urls: bool = True, remove_extra_whitespace: bool = True) -> str:
"""
ํ…์ŠคํŠธ ์ •์ œ
Args:
text: ์ •์ œํ•  ํ…์ŠคํŠธ
remove_urls: URL ์ œ๊ฑฐ ์—ฌ๋ถ€
remove_extra_whitespace: ์—ฌ๋ถ„์˜ ๊ณต๋ฐฑ ์ œ๊ฑฐ ์—ฌ๋ถ€
Returns:
์ •์ œ๋œ ํ…์ŠคํŠธ
"""
if not text:
return ""
# URL ์ œ๊ฑฐ
if remove_urls:
text = re.sub(r'https?://\S+|www\.\S+', '', text)
# ํŠน์ˆ˜ ๋ฌธ์ž ๋ฐ HTML ํƒœ๊ทธ ์ •์ œ
text = re.sub(r'<.*?>', '', text) # HTML ํƒœ๊ทธ ์ œ๊ฑฐ
# ์—ฌ๋ถ„์˜ ๊ณต๋ฐฑ ์ œ๊ฑฐ
if remove_extra_whitespace:
text = re.sub(r'\s+', ' ', text).strip()
return text
@staticmethod
def text_to_documents(
text: str,
metadata: Optional[Dict[str, Any]] = None,
chunk_size: int = 512,
chunk_overlap: int = 50
) -> List[Dict[str, Any]]:
"""
ํ…์ŠคํŠธ๋ฅผ ๋ฌธ์„œ ๊ฐ์ฒด ๋ชฉ๋ก์œผ๋กœ ๋ณ€ํ™˜
Args:
text: ๋ณ€ํ™˜ํ•  ํ…์ŠคํŠธ
metadata: ๋ฌธ์„œ์— ์ถ”๊ฐ€ํ•  ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ
chunk_size: ๊ฐ ์ฒญํฌ์˜ ์ตœ๋Œ€ ๋ฌธ์ž ์ˆ˜
chunk_overlap: ์ฒญํฌ ๊ฐ„ ์ค‘์ฒฉ๋˜๋Š” ๋ฌธ์ž ์ˆ˜
Returns:
๋ฌธ์„œ ๊ฐ์ฒด ๋ชฉ๋ก
"""
if not text:
return []
# ํ…์ŠคํŠธ ์ •์ œ
clean = DocumentProcessor.clean_text(text)
# ํ…์ŠคํŠธ ๋ถ„ํ• 
chunks = DocumentProcessor.split_text(
clean,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
# ๋ฌธ์„œ ๊ฐ์ฒด ์ƒ์„ฑ
documents = []
for i, chunk in enumerate(chunks):
doc = {
"text": chunk,
"index": i,
"chunk_count": len(chunks)
}
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ถ”๊ฐ€
if metadata:
doc.update(metadata)
documents.append(doc)
return documents
@staticmethod
def load_documents_from_directory(
directory: str,
extensions: List[str] = [".txt", ".md", ".csv"],
recursive: bool = True,
chunk_size: int = 512,
chunk_overlap: int = 50
) -> List[Dict[str, Any]]:
"""
๋””๋ ‰ํ† ๋ฆฌ์—์„œ ๋ฌธ์„œ ๋กœ๋“œ ๋ฐ ์ฒ˜๋ฆฌ
Args:
directory: ๋กœ๋“œํ•  ๋””๋ ‰ํ† ๋ฆฌ ๊ฒฝ๋กœ
extensions: ์ฒ˜๋ฆฌํ•  ํŒŒ์ผ ํ™•์žฅ์ž ๋ชฉ๋ก
recursive: ํ•˜์œ„ ๋””๋ ‰ํ† ๋ฆฌ ๊ฒ€์ƒ‰ ์—ฌ๋ถ€
chunk_size: ๊ฐ ์ฒญํฌ์˜ ์ตœ๋Œ€ ๋ฌธ์ž ์ˆ˜
chunk_overlap: ์ฒญํฌ ๊ฐ„ ์ค‘์ฒฉ๋˜๋Š” ๋ฌธ์ž ์ˆ˜
Returns:
๋ฌธ์„œ ๊ฐ์ฒด ๋ชฉ๋ก
"""
if not os.path.isdir(directory):
logger.error(f"๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: {directory}")
return []
documents = []
for root, dirs, files in os.walk(directory):
if not recursive and root != directory:
continue
for file in files:
_, ext = os.path.splitext(file)
if ext.lower() not in extensions:
continue
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, directory)
try:
logger.info(f"ํŒŒ์ผ ๋กœ๋“œ ์ค‘: {rel_path}")
# ๋จผ์ € UTF-8๋กœ ์‹œ๋„
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
# UTF-8๋กœ ์‹คํŒจํ•˜๋ฉด CP949(ํ•œ๊ตญ์–ด Windows ๊ธฐ๋ณธ ์ธ์ฝ”๋”ฉ)๋กœ ์‹œ๋„
logger.info(f"UTF-8 ๋””์ฝ”๋”ฉ ์‹คํŒจ, CP949๋กœ ์‹œ๋„: {rel_path}")
with open(file_path, 'r', encoding='cp949') as f:
content = f.read()
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ƒ์„ฑ
metadata = {
"source": rel_path,
"filename": file,
"filetype": ext.lower()[1:],
"filepath": file_path
}
# CSV ํŒŒ์ผ์€ ํŠน๋ณ„ ์ฒ˜๋ฆฌ
if ext.lower() == '.csv':
logger.info(f"CSV ํŒŒ์ผ ๊ฐ์ง€, ํ–‰ ๋‹จ์œ„๋กœ ๋ถ„ํ•  ์ฒ˜๋ฆฌ: {rel_path}")
file_docs = DocumentProcessor.csv_to_documents(content, metadata)
else:
# ์ผ๋ฐ˜ ํ…์ŠคํŠธ ๋ฌธ์„œ ์ฒ˜๋ฆฌ
file_docs = DocumentProcessor.text_to_documents(
content,
metadata=metadata,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
documents.extend(file_docs)
logger.info(f"{len(file_docs)}๊ฐœ ์ฒญํฌ ์ถ”์ถœ: {rel_path}")
except Exception as e:
logger.error(f"ํŒŒ์ผ '{rel_path}' ์ฒ˜๋ฆฌ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}")
continue
logger.info(f"์ด {len(documents)}๊ฐœ ๋ฌธ์„œ ์ฒญํฌ๋ฅผ ๋กœ๋“œํ–ˆ์Šต๋‹ˆ๋‹ค.")
return documents
@staticmethod
def prepare_rag_context(results: List[Dict[str, Any]], field: str = "text") -> List[str]:
"""
๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ์—์„œ RAG์— ์‚ฌ์šฉํ•  ์ปจํ…์ŠคํŠธ ์ถ”์ถœ
Args:
results: ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ๋ชฉ๋ก
field: ํ…์ŠคํŠธ ๋‚ด์šฉ์ด ์žˆ๋Š” ํ•„๋“œ ์ด๋ฆ„
Returns:
์ปจํ…์ŠคํŠธ ํ…์ŠคํŠธ ๋ชฉ๋ก
"""
context = []
for result in results:
if field in result:
context.append(result[field])
return context
@staticmethod
def csv_to_documents(content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
CSV ํŒŒ์ผ ๋‚ด์šฉ์„ ํ–‰ ๋‹จ์œ„๋กœ ๋ถ„๋ฆฌํ•˜์—ฌ ๊ฐ ํ–‰์„ ๋ณ„๋„์˜ ๋ฌธ์„œ๋กœ ์ฒ˜๋ฆฌ
Args:
content: CSV ํŒŒ์ผ์˜ ๋‚ด์šฉ
metadata: ๊ธฐ๋ณธ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ
Returns:
๋ฌธ์„œ ๊ฐ์ฒด ๋ชฉ๋ก (๊ฐ ํ–‰์ด ๋ณ„๋„์˜ ๋ฌธ์„œ)
"""
documents = []
try:
# ์ผ๋ฐ˜ CSV ํŒŒ์‹ฑ ์‹œ๋„ (์ฝ”๋งˆ ๊ตฌ๋ถ„์ž ๊ธฐ๋ณธ)
try:
csv_reader = csv.reader(io.StringIO(content))
rows = list(csv_reader)
if len(rows) > 0 and len(rows[0]) > 1:
# ์ฝ”๋งˆ๋กœ ์ œ๋Œ€๋กœ ๊ตฌ๋ถ„๋˜์—ˆ๋‹ค๊ณ  ํŒ๋‹จ
logger.info(f"CSV ํŒŒ์ผ ์ฝ”๋งˆ ๊ตฌ๋ถ„์ž๋กœ ์ฒ˜๋ฆฌ: {metadata.get('source', 'unknown')}")
has_valid_format = True
else:
# ์ฝ”๋งˆ๋กœ ์ œ๋Œ€๋กœ ๊ตฌ๋ถ„๋˜์ง€ ์•Š์Œ
has_valid_format = False
except Exception:
has_valid_format = False
# ์ฝ”๋งˆ ํ˜•์‹์ด ์•„๋‹Œ ๊ฒฝ์šฐ, ๊ณต๋ฐฑ ๊ตฌ๋ถ„์ž ์ฒ˜๋ฆฌ ์‹œ๋„
if not has_valid_format:
logger.warning(f"CSV ํŒŒ์ผ์ด ํ‘œ์ค€ ์ฝ”๋งˆ ํ˜•์‹์ด ์•„๋‹™๋‹ˆ๋‹ค. ๊ณต๋ฐฑ ๊ตฌ๋ถ„์ž๋กœ ์ฒ˜๋ฆฌํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค: {metadata.get('source', 'unknown')}")
lines = content.strip().split('\n')
for i, line in enumerate(lines):
# IT๋กœ ์‹œ์ž‘ํ•˜๋Š” ์ค„๋งŒ ์ฒ˜๋ฆฌ (๋ฐ์ดํ„ฐ ํ–‰์œผ๋กœ ๊ฐ„์ฃผ)
if not line.strip().startswith('IT'):
continue
# ๊ณต๋ฐฑ์œผ๋กœ ๋ถ„๋ฆฌํ•˜๋˜, ์ตœ์†Œ 5๊ฐœ ์—ด๋กœ ๋ณด์žฅ
parts = line.split(maxsplit=4)
# ์œ ํšจํ•œ ํ–‰์˜ ์ตœ์†Œ ๊ธธ์ด ํ™•์ธ
if len(parts) < 5:
logger.warning(f"ํ–‰ {i+1} ๋ถ€์กฑํ•œ ๋ฐ์ดํ„ฐ: {line[:50]}...")
continue
# ๊ฐ ํ•„๋“œ ์ถ”์ถœ
doc_id = parts[0].strip() # IT ๋ฒˆํ˜ธ
query_type = parts[1].strip() # ์ฟผ๋ฆฌ ์œ ํ˜•
question = parts[2].strip() # ์งˆ๋ฌธ
answer = parts[3].strip() # ๋‹ต๋ณ€
reference = parts[4].strip() if len(parts) > 4 else "" # ์ฐธ์กฐ
# ๋ฌธ์„œ ํ…์ŠคํŠธ ์ƒ์„ฑ - ๊ฐ ํ•„๋“œ๋ฅผ ๊ตฌ๋ถ„ํ•˜์—ฌ ํฌํ•จ
text = f"ID: {doc_id}\n"
text += f"์ฟผ๋ฆฌ ์œ ํ˜•: {query_type}\n"
text += f"์งˆ์˜ (Question): {question}\n"
text += f"์‘๋‹ต (Answer): {answer}\n"
if reference:
text += f"์ฐธ์กฐ ๋ฌธ์„œ/๋งฅ๋ฝ (Reference/Context): {reference}"
# ๋ฌธ์„œ ๊ฐ์ฒด ์ƒ์„ฑ
doc_metadata = metadata.copy()
doc_metadata.update({
"row": i,
"query_type": query_type,
"question": question,
"answer": answer,
"reference": reference
})
document = {
"text": text,
"id": doc_id, # IT ๋ฒˆํ˜ธ๋ฅผ ID๋กœ ์‚ฌ์šฉ
**doc_metadata
}
documents.append(document)
logger.debug(f"IT ๋ฌธ์„œ ์ฒ˜๋ฆฌ: {doc_id} - {question[:30]}...")
logger.info(f"๊ณต๋ฐฑ ๊ตฌ๋ถ„์ž CSV ํŒŒ์ผ '{metadata.get('source', 'unknown')}'์—์„œ {len(documents)}๊ฐœ ํ–‰์„ ๋ฌธ์„œ๋กœ ๋ณ€ํ™˜ํ–ˆ์Šต๋‹ˆ๋‹ค.")
return documents
# ํ‘œ์ค€ CSV ํ˜•์‹ ์ฒ˜๋ฆฌ (์ฝ”๋งˆ ๊ตฌ๋ถ„์ž ์‚ฌ์šฉ)
if not rows:
logger.warning(f"CSV ํŒŒ์ผ์— ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค: {metadata.get('source', 'unknown')}")
return []
# ์ฒซ ๋ฒˆ์งธ ํ–‰์„ ํ—ค๋”๋กœ ์‚ฌ์šฉ
headers = rows[0]
logger.debug(f"CSV ํ—ค๋”: {headers}")
# ๊ฐ ํ–‰์„ ๋ณ„๋„์˜ ๋ฌธ์„œ๋กœ ๋ณ€ํ™˜
for i, row in enumerate(rows[1:], 1): # ํ—ค๋” ์ œ์™ธ, 1๋ถ€ํ„ฐ ์‹œ์ž‘
# ํ–‰์ด ํ—ค๋”๋ณด๋‹ค ์งง์œผ๋ฉด ๋นˆ ๊ฐ’์œผ๋กœ ์ฑ„์›€
while len(row) < len(headers):
row.append("")
# ํ–‰ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์ „ํ˜•์œผ๋กœ ๋ณ€ํ™˜
row_data = {headers[j]: value for j, value in enumerate(row) if j < len(headers)}
# ์ฒซ ๋ฒˆ์งธ ์—ด์„ ID๋กœ ์‚ฌ์šฉ (์žˆ๋Š” ๊ฒฝ์šฐ)
row_id = row[0] if row and len(row) > 0 else f"row_{i}"
# ๋ฌธ์„œ ํ…์ŠคํŠธ ์ƒ์„ฑ - ๋ชจ๋“  ํ•„๋“œ๋ฅผ ํฌํ•จํ•œ ํ‘œํ˜„
text_parts = []
for j, header in enumerate(headers):
if j < len(row) and row[j]:
text_parts.append(f"{header}: {row[j]}")
text = "\n".join(text_parts)
# ๋ฌธ์„œ ๊ฐ์ฒด ์ƒ์„ฑ
doc_metadata = metadata.copy()
doc_metadata.update({
"row": i,
"row_id": row_id,
"total_rows": len(rows) - 1, # ํ—ค๋” ์ œ์™ธ
"csv_data": row_data # ์›๋ณธ ํ–‰ ๋ฐ์ดํ„ฐ๋„ ์ €์žฅ
})
document = {
"text": text,
"id": row_id,
**doc_metadata
}
documents.append(document)
logger.info(f"CSV ํŒŒ์ผ '{metadata.get('source', 'unknown')}'์—์„œ {len(documents)}๊ฐœ ํ–‰์„ ๋ฌธ์„œ๋กœ ๋ณ€ํ™˜ํ–ˆ์Šต๋‹ˆ๋‹ค.")
except Exception as e:
logger.error(f"CSV ํŒŒ์ผ ์ฒ˜๋ฆฌ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}")
return documents