import gradio as gr import os from typing import List, Dict, Any, Optional, Tuple import hashlib from datetime import datetime import numpy as np # PDF 처리 라이브러리 try: import fitz # PyMuPDF PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False print("⚠️ PyMuPDF not installed. Install with: pip install pymupdf") try: from sentence_transformers import SentenceTransformer ST_AVAILABLE = True except ImportError: ST_AVAILABLE = False print("⚠️ Sentence Transformers not installed. Install with: pip install sentence-transformers") # Soft and bright custom CSS custom_css = """ .gradio-container { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); min-height: 100vh; font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; } .main-container { background: rgba(255, 255, 255, 0.98); border-radius: 16px; padding: 24px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); border: 1px solid rgba(0, 0, 0, 0.05); margin: 12px; } /* Status messages styling */ .pdf-status { padding: 12px 16px; border-radius: 12px; margin: 12px 0; font-size: 0.95rem; font-weight: 500; } .pdf-success { background: linear-gradient(135deg, #d4edda 0%, #c3e6cb 100%); border: 1px solid #b1dfbb; color: #155724; } .pdf-error { background: linear-gradient(135deg, #f8d7da 0%, #f5c6cb 100%); border: 1px solid #f1aeb5; color: #721c24; } .pdf-info { background: linear-gradient(135deg, #d1ecf1 0%, #bee5eb 100%); border: 1px solid #9ec5d8; color: #0c5460; } .rag-context { background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%); border-left: 4px solid #f59e0b; padding: 12px; margin: 12px 0; border-radius: 8px; font-size: 0.9rem; } """ class SimpleTextSplitter: """텍스트 분할기""" def __init__(self, chunk_size=800, chunk_overlap=100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def split_text(self, text: str) -> List[str]: """텍스트를 청크로 분할""" chunks = [] sentences = text.split('. ') current_chunk = "" for sentence in sentences: if len(current_chunk) + len(sentence) < self.chunk_size: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) return chunks class PDFRAGSystem: """PDF 기반 RAG 시스템""" def __init__(self): self.documents = {} self.document_chunks = {} self.embeddings_store = {} self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100) # 임베딩 모델 초기화 self.embedder = None if ST_AVAILABLE: try: self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') print("✅ 임베딩 모델 로드 성공") except Exception as e: print(f"⚠️ 임베딩 모델 로드 실패: {e}") def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: """PDF에서 텍스트 추출""" if not PDF_AVAILABLE: return { "metadata": { "title": "PDF Reader Not Available", "file_name": os.path.basename(pdf_path), "pages": 0 }, "full_text": "PDF 처리를 위해 'pip install pymupdf'를 실행해주세요." } try: doc = fitz.open(pdf_path) text_content = [] metadata = { "title": doc.metadata.get("title", os.path.basename(pdf_path)), "pages": len(doc), "file_name": os.path.basename(pdf_path) } for page_num, page in enumerate(doc): text = page.get_text() if text.strip(): text_content.append(text) doc.close() return { "metadata": metadata, "full_text": "\n\n".join(text_content) } except Exception as e: raise Exception(f"PDF 처리 오류: {str(e)}") def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: """PDF 처리 및 저장""" try: # PDF 텍스트 추출 pdf_data = self.extract_text_from_pdf(pdf_path) # 텍스트를 청크로 분할 chunks = self.text_splitter.split_text(pdf_data["full_text"]) # 청크 저장 self.document_chunks[doc_id] = chunks # 임베딩 생성 if self.embedder: embeddings = self.embedder.encode(chunks) self.embeddings_store[doc_id] = embeddings # 문서 정보 저장 self.documents[doc_id] = { "metadata": pdf_data["metadata"], "chunk_count": len(chunks), "upload_time": datetime.now().isoformat() } return { "success": True, "doc_id": doc_id, "chunks": len(chunks), "pages": pdf_data["metadata"]["pages"], "title": pdf_data["metadata"]["title"] } except Exception as e: return {"success": False, "error": str(e)} def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]: """관련 청크 검색""" all_relevant_chunks = [] if self.embedder and self.embeddings_store: # 임베딩 기반 검색 query_embedding = self.embedder.encode([query])[0] for doc_id in doc_ids: if doc_id in self.embeddings_store and doc_id in self.document_chunks: doc_embeddings = self.embeddings_store[doc_id] chunks = self.document_chunks[doc_id] # 코사인 유사도 계산 similarities = [] for emb in doc_embeddings: sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) similarities.append(sim) # 상위 청크 선택 top_indices = np.argsort(similarities)[-top_k:][::-1] for idx in top_indices: if similarities[idx] > 0.2: all_relevant_chunks.append({ "content": chunks[idx], "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": similarities[idx] }) else: # 키워드 기반 검색 query_keywords = set(query.lower().split()) for doc_id in doc_ids: if doc_id in self.document_chunks: chunks = self.document_chunks[doc_id] for i, chunk in enumerate(chunks[:5]): # 처음 5개만 chunk_lower = chunk.lower() score = sum(1 for keyword in query_keywords if keyword in chunk_lower) if score > 0: all_relevant_chunks.append({ "content": chunk[:500], "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": score / len(query_keywords) if query_keywords else 0 }) # 정렬 및 반환 all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True) return all_relevant_chunks[:top_k] def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str: """RAG 프롬프트 생성""" relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k) if not relevant_chunks: return query # 프롬프트 구성 prompt_parts = [] prompt_parts.append("다음 문서 내용을 참고하여 답변해주세요:\n") prompt_parts.append("=" * 40) for i, chunk in enumerate(relevant_chunks, 1): prompt_parts.append(f"\n[참고 {i} - {chunk['doc_name']}]") content = chunk['content'][:300] if len(chunk['content']) > 300 else chunk['content'] prompt_parts.append(content) prompt_parts.append("\n" + "=" * 40) prompt_parts.append(f"\n질문: {query}") return "\n".join(prompt_parts) # RAG 시스템 인스턴스 생성 rag_system = PDFRAGSystem() # State variable to track current model and RAG settings current_model = gr.State("openai/gpt-oss-120b") rag_enabled_state = gr.State(False) selected_docs_state = gr.State([]) top_k_state = gr.State(3) def upload_pdf(file): """PDF 파일 업로드 처리""" if file is None: return ( gr.update(value="