Spaces:
Sleeping
Sleeping
""" | |
๋ฌธ์ ์ฒ๋ฆฌ ์ ํธ๋ฆฌํฐ ๋ชจ๋ | |
""" | |
import os | |
import re | |
import csv | |
import io | |
import logging | |
from typing import List, Dict, Any, Optional, Tuple, Union | |
import numpy as np | |
logger = logging.getLogger("DocProcessor") | |
if not logger.hasHandlers(): | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.INFO) | |
class DocumentProcessor: | |
"""๋ฌธ์ ์ฒ๋ฆฌ ์ ํธ๋ฆฌํฐ ํด๋์ค""" | |
def split_text( | |
text: str, | |
chunk_size: int = 512, | |
chunk_overlap: int = 50, | |
separator: str = "\n" | |
) -> List[str]: | |
""" | |
ํ ์คํธ๋ฅผ ๋ ์์ ์ฒญํฌ๋ก ๋ถํ | |
Args: | |
text: ๋ถํ ํ ํ ์คํธ | |
chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ | |
chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ | |
separator: ๋ถํ ์ ์ฌ์ฉํ ๊ตฌ๋ถ์ | |
Returns: | |
๋ถํ ๋ ํ ์คํธ ์ฒญํฌ ๋ชฉ๋ก | |
""" | |
if not text or chunk_size <= 0: | |
return [] | |
# ๊ตฌ๋ถ์๋ก ๋ถํ | |
parts = text.split(separator) | |
chunks = [] | |
current_chunk = [] | |
current_size = 0 | |
for part in parts: | |
part_size = len(part) | |
if current_size + part_size + len(current_chunk) > chunk_size and current_chunk: | |
# ํ์ฌ ์ฒญํฌ๊ฐ ์ต๋ ํฌ๊ธฐ๋ฅผ ์ด๊ณผํ๋ฉด ์ ์ฅ | |
chunks.append(separator.join(current_chunk)) | |
# ์ค์ฒฉ์ ์ํด ์ผ๋ถ ์ฒญํฌ ์ ์ง | |
overlap_tokens = [] | |
overlap_size = 0 | |
for token in reversed(current_chunk): | |
if overlap_size + len(token) <= chunk_overlap: | |
overlap_tokens.insert(0, token) | |
overlap_size += len(token) + 1 # separator ๊ธธ์ด ํฌํจ | |
else: | |
break | |
current_chunk = overlap_tokens | |
current_size = overlap_size - len(current_chunk) # separator ๊ธธ์ด ์ ์ธ | |
current_chunk.append(part) | |
current_size += part_size | |
# ๋ง์ง๋ง ์ฒญํฌ ์ถ๊ฐ | |
if current_chunk: | |
chunks.append(separator.join(current_chunk)) | |
return chunks | |
def clean_text(text: str, remove_urls: bool = True, remove_extra_whitespace: bool = True) -> str: | |
""" | |
ํ ์คํธ ์ ์ | |
Args: | |
text: ์ ์ ํ ํ ์คํธ | |
remove_urls: URL ์ ๊ฑฐ ์ฌ๋ถ | |
remove_extra_whitespace: ์ฌ๋ถ์ ๊ณต๋ฐฑ ์ ๊ฑฐ ์ฌ๋ถ | |
Returns: | |
์ ์ ๋ ํ ์คํธ | |
""" | |
if not text: | |
return "" | |
# URL ์ ๊ฑฐ | |
if remove_urls: | |
text = re.sub(r'https?://\S+|www\.\S+', '', text) | |
# ํน์ ๋ฌธ์ ๋ฐ HTML ํ๊ทธ ์ ์ | |
text = re.sub(r'<.*?>', '', text) # HTML ํ๊ทธ ์ ๊ฑฐ | |
# ์ฌ๋ถ์ ๊ณต๋ฐฑ ์ ๊ฑฐ | |
if remove_extra_whitespace: | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
def text_to_documents( | |
text: str, | |
metadata: Optional[Dict[str, Any]] = None, | |
chunk_size: int = 512, | |
chunk_overlap: int = 50 | |
) -> List[Dict[str, Any]]: | |
""" | |
ํ ์คํธ๋ฅผ ๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก์ผ๋ก ๋ณํ | |
Args: | |
text: ๋ณํํ ํ ์คํธ | |
metadata: ๋ฌธ์์ ์ถ๊ฐํ ๋ฉํ๋ฐ์ดํฐ | |
chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ | |
chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ | |
Returns: | |
๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก | |
""" | |
if not text: | |
return [] | |
# ํ ์คํธ ์ ์ | |
clean = DocumentProcessor.clean_text(text) | |
# ํ ์คํธ ๋ถํ | |
chunks = DocumentProcessor.split_text( | |
clean, | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap | |
) | |
# ๋ฌธ์ ๊ฐ์ฒด ์์ฑ | |
documents = [] | |
for i, chunk in enumerate(chunks): | |
doc = { | |
"text": chunk, | |
"index": i, | |
"chunk_count": len(chunks) | |
} | |
# ๋ฉํ๋ฐ์ดํฐ ์ถ๊ฐ | |
if metadata: | |
doc.update(metadata) | |
documents.append(doc) | |
return documents | |
def load_documents_from_directory( | |
directory: str, | |
extensions: List[str] = [".txt", ".md", ".csv"], | |
recursive: bool = True, | |
chunk_size: int = 512, | |
chunk_overlap: int = 50 | |
) -> List[Dict[str, Any]]: | |
""" | |
๋๋ ํ ๋ฆฌ์์ ๋ฌธ์ ๋ก๋ ๋ฐ ์ฒ๋ฆฌ | |
Args: | |
directory: ๋ก๋ํ ๋๋ ํ ๋ฆฌ ๊ฒฝ๋ก | |
extensions: ์ฒ๋ฆฌํ ํ์ผ ํ์ฅ์ ๋ชฉ๋ก | |
recursive: ํ์ ๋๋ ํ ๋ฆฌ ๊ฒ์ ์ฌ๋ถ | |
chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ | |
chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ | |
Returns: | |
๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก | |
""" | |
if not os.path.isdir(directory): | |
logger.error(f"๋๋ ํ ๋ฆฌ๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค: {directory}") | |
return [] | |
documents = [] | |
for root, dirs, files in os.walk(directory): | |
if not recursive and root != directory: | |
continue | |
for file in files: | |
_, ext = os.path.splitext(file) | |
if ext.lower() not in extensions: | |
continue | |
file_path = os.path.join(root, file) | |
rel_path = os.path.relpath(file_path, directory) | |
try: | |
logger.info(f"ํ์ผ ๋ก๋ ์ค: {rel_path}") | |
# ๋จผ์ UTF-8๋ก ์๋ | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
except UnicodeDecodeError: | |
# UTF-8๋ก ์คํจํ๋ฉด CP949(ํ๊ตญ์ด Windows ๊ธฐ๋ณธ ์ธ์ฝ๋ฉ)๋ก ์๋ | |
logger.info(f"UTF-8 ๋์ฝ๋ฉ ์คํจ, CP949๋ก ์๋: {rel_path}") | |
with open(file_path, 'r', encoding='cp949') as f: | |
content = f.read() | |
# ๋ฉํ๋ฐ์ดํฐ ์์ฑ | |
metadata = { | |
"source": rel_path, | |
"filename": file, | |
"filetype": ext.lower()[1:], | |
"filepath": file_path | |
} | |
# CSV ํ์ผ์ ํน๋ณ ์ฒ๋ฆฌ | |
if ext.lower() == '.csv': | |
logger.info(f"CSV ํ์ผ ๊ฐ์ง, ํ ๋จ์๋ก ๋ถํ ์ฒ๋ฆฌ: {rel_path}") | |
file_docs = DocumentProcessor.csv_to_documents(content, metadata) | |
else: | |
# ์ผ๋ฐ ํ ์คํธ ๋ฌธ์ ์ฒ๋ฆฌ | |
file_docs = DocumentProcessor.text_to_documents( | |
content, | |
metadata=metadata, | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap | |
) | |
documents.extend(file_docs) | |
logger.info(f"{len(file_docs)}๊ฐ ์ฒญํฌ ์ถ์ถ: {rel_path}") | |
except Exception as e: | |
logger.error(f"ํ์ผ '{rel_path}' ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {e}") | |
continue | |
logger.info(f"์ด {len(documents)}๊ฐ ๋ฌธ์ ์ฒญํฌ๋ฅผ ๋ก๋ํ์ต๋๋ค.") | |
return documents | |
def prepare_rag_context(results: List[Dict[str, Any]], field: str = "text") -> List[str]: | |
""" | |
๊ฒ์ ๊ฒฐ๊ณผ์์ RAG์ ์ฌ์ฉํ ์ปจํ ์คํธ ์ถ์ถ | |
Args: | |
results: ๊ฒ์ ๊ฒฐ๊ณผ ๋ชฉ๋ก | |
field: ํ ์คํธ ๋ด์ฉ์ด ์๋ ํ๋ ์ด๋ฆ | |
Returns: | |
์ปจํ ์คํธ ํ ์คํธ ๋ชฉ๋ก | |
""" | |
context = [] | |
for result in results: | |
if field in result: | |
context.append(result[field]) | |
return context | |
def csv_to_documents(content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]: | |
""" | |
CSV ํ์ผ ๋ด์ฉ์ ํ ๋จ์๋ก ๋ถ๋ฆฌํ์ฌ ๊ฐ ํ์ ๋ณ๋์ ๋ฌธ์๋ก ์ฒ๋ฆฌ | |
Args: | |
content: CSV ํ์ผ์ ๋ด์ฉ | |
metadata: ๊ธฐ๋ณธ ๋ฉํ๋ฐ์ดํฐ | |
Returns: | |
๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก (๊ฐ ํ์ด ๋ณ๋์ ๋ฌธ์) | |
""" | |
documents = [] | |
try: | |
# ์ผ๋ฐ CSV ํ์ฑ ์๋ (์ฝ๋ง ๊ตฌ๋ถ์ ๊ธฐ๋ณธ) | |
try: | |
csv_reader = csv.reader(io.StringIO(content)) | |
rows = list(csv_reader) | |
if len(rows) > 0 and len(rows[0]) > 1: | |
# ์ฝ๋ง๋ก ์ ๋๋ก ๊ตฌ๋ถ๋์๋ค๊ณ ํ๋จ | |
logger.info(f"CSV ํ์ผ ์ฝ๋ง ๊ตฌ๋ถ์๋ก ์ฒ๋ฆฌ: {metadata.get('source', 'unknown')}") | |
has_valid_format = True | |
else: | |
# ์ฝ๋ง๋ก ์ ๋๋ก ๊ตฌ๋ถ๋์ง ์์ | |
has_valid_format = False | |
except Exception: | |
has_valid_format = False | |
# ์ฝ๋ง ํ์์ด ์๋ ๊ฒฝ์ฐ, ๊ณต๋ฐฑ ๊ตฌ๋ถ์ ์ฒ๋ฆฌ ์๋ | |
if not has_valid_format: | |
logger.warning(f"CSV ํ์ผ์ด ํ์ค ์ฝ๋ง ํ์์ด ์๋๋๋ค. ๊ณต๋ฐฑ ๊ตฌ๋ถ์๋ก ์ฒ๋ฆฌํ๊ฒ ์ต๋๋ค: {metadata.get('source', 'unknown')}") | |
lines = content.strip().split('\n') | |
for i, line in enumerate(lines): | |
# IT๋ก ์์ํ๋ ์ค๋ง ์ฒ๋ฆฌ (๋ฐ์ดํฐ ํ์ผ๋ก ๊ฐ์ฃผ) | |
if not line.strip().startswith('IT'): | |
continue | |
# ๊ณต๋ฐฑ์ผ๋ก ๋ถ๋ฆฌํ๋, ์ต์ 5๊ฐ ์ด๋ก ๋ณด์ฅ | |
parts = line.split(maxsplit=4) | |
# ์ ํจํ ํ์ ์ต์ ๊ธธ์ด ํ์ธ | |
if len(parts) < 5: | |
logger.warning(f"ํ {i+1} ๋ถ์กฑํ ๋ฐ์ดํฐ: {line[:50]}...") | |
continue | |
# ๊ฐ ํ๋ ์ถ์ถ | |
doc_id = parts[0].strip() # IT ๋ฒํธ | |
query_type = parts[1].strip() # ์ฟผ๋ฆฌ ์ ํ | |
question = parts[2].strip() # ์ง๋ฌธ | |
answer = parts[3].strip() # ๋ต๋ณ | |
reference = parts[4].strip() if len(parts) > 4 else "" # ์ฐธ์กฐ | |
# ๋ฌธ์ ํ ์คํธ ์์ฑ - ๊ฐ ํ๋๋ฅผ ๊ตฌ๋ถํ์ฌ ํฌํจ | |
text = f"ID: {doc_id}\n" | |
text += f"์ฟผ๋ฆฌ ์ ํ: {query_type}\n" | |
text += f"์ง์ (Question): {question}\n" | |
text += f"์๋ต (Answer): {answer}\n" | |
if reference: | |
text += f"์ฐธ์กฐ ๋ฌธ์/๋งฅ๋ฝ (Reference/Context): {reference}" | |
# ๋ฌธ์ ๊ฐ์ฒด ์์ฑ | |
doc_metadata = metadata.copy() | |
doc_metadata.update({ | |
"row": i, | |
"query_type": query_type, | |
"question": question, | |
"answer": answer, | |
"reference": reference | |
}) | |
document = { | |
"text": text, | |
"id": doc_id, # IT ๋ฒํธ๋ฅผ ID๋ก ์ฌ์ฉ | |
**doc_metadata | |
} | |
documents.append(document) | |
logger.debug(f"IT ๋ฌธ์ ์ฒ๋ฆฌ: {doc_id} - {question[:30]}...") | |
logger.info(f"๊ณต๋ฐฑ ๊ตฌ๋ถ์ CSV ํ์ผ '{metadata.get('source', 'unknown')}'์์ {len(documents)}๊ฐ ํ์ ๋ฌธ์๋ก ๋ณํํ์ต๋๋ค.") | |
return documents | |
# ํ์ค CSV ํ์ ์ฒ๋ฆฌ (์ฝ๋ง ๊ตฌ๋ถ์ ์ฌ์ฉ) | |
if not rows: | |
logger.warning(f"CSV ํ์ผ์ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค: {metadata.get('source', 'unknown')}") | |
return [] | |
# ์ฒซ ๋ฒ์งธ ํ์ ํค๋๋ก ์ฌ์ฉ | |
headers = rows[0] | |
logger.debug(f"CSV ํค๋: {headers}") | |
# ๊ฐ ํ์ ๋ณ๋์ ๋ฌธ์๋ก ๋ณํ | |
for i, row in enumerate(rows[1:], 1): # ํค๋ ์ ์ธ, 1๋ถํฐ ์์ | |
# ํ์ด ํค๋๋ณด๋ค ์งง์ผ๋ฉด ๋น ๊ฐ์ผ๋ก ์ฑ์ | |
while len(row) < len(headers): | |
row.append("") | |
# ํ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ ํ์ผ๋ก ๋ณํ | |
row_data = {headers[j]: value for j, value in enumerate(row) if j < len(headers)} | |
# ์ฒซ ๋ฒ์งธ ์ด์ ID๋ก ์ฌ์ฉ (์๋ ๊ฒฝ์ฐ) | |
row_id = row[0] if row and len(row) > 0 else f"row_{i}" | |
# ๋ฌธ์ ํ ์คํธ ์์ฑ - ๋ชจ๋ ํ๋๋ฅผ ํฌํจํ ํํ | |
text_parts = [] | |
for j, header in enumerate(headers): | |
if j < len(row) and row[j]: | |
text_parts.append(f"{header}: {row[j]}") | |
text = "\n".join(text_parts) | |
# ๋ฌธ์ ๊ฐ์ฒด ์์ฑ | |
doc_metadata = metadata.copy() | |
doc_metadata.update({ | |
"row": i, | |
"row_id": row_id, | |
"total_rows": len(rows) - 1, # ํค๋ ์ ์ธ | |
"csv_data": row_data # ์๋ณธ ํ ๋ฐ์ดํฐ๋ ์ ์ฅ | |
}) | |
document = { | |
"text": text, | |
"id": row_id, | |
**doc_metadata | |
} | |
documents.append(document) | |
logger.info(f"CSV ํ์ผ '{metadata.get('source', 'unknown')}'์์ {len(documents)}๊ฐ ํ์ ๋ฌธ์๋ก ๋ณํํ์ต๋๋ค.") | |
except Exception as e: | |
logger.error(f"CSV ํ์ผ ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {e}") | |
return documents | |