import gradio as gr import os from typing import List, Dict, Any, Optional, Tuple import hashlib from datetime import datetime import numpy as np # PDF 처리 라이브러리 try: import fitz # PyMuPDF PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False print("⚠️ PyMuPDF not installed. Install with: pip install pymupdf") try: from sentence_transformers import SentenceTransformer ST_AVAILABLE = True except ImportError: ST_AVAILABLE = False print("⚠️ Sentence Transformers not installed. Install with: pip install sentence-transformers") # Soft and bright custom CSS custom_css = """ .gradio-container { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); min-height: 100vh; font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; } .main-container { background: rgba(255, 255, 255, 0.98); border-radius: 16px; padding: 24px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); border: 1px solid rgba(0, 0, 0, 0.05); margin: 12px; } /* Status messages styling */ .pdf-status { padding: 12px 16px; border-radius: 12px; margin: 12px 0; font-size: 0.95rem; font-weight: 500; } .pdf-success { background: linear-gradient(135deg, #d4edda 0%, #c3e6cb 100%); border: 1px solid #b1dfbb; color: #155724; } .pdf-error { background: linear-gradient(135deg, #f8d7da 0%, #f5c6cb 100%); border: 1px solid #f1aeb5; color: #721c24; } .pdf-info { background: linear-gradient(135deg, #d1ecf1 0%, #bee5eb 100%); border: 1px solid #9ec5d8; color: #0c5460; } .rag-context { background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%); border-left: 4px solid #f59e0b; padding: 12px; margin: 12px 0; border-radius: 8px; font-size: 0.9rem; } """ class SimpleTextSplitter: """텍스트 분할기""" def __init__(self, chunk_size=800, chunk_overlap=100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def split_text(self, text: str) -> List[str]: """텍스트를 청크로 분할""" chunks = [] sentences = text.split('. ') current_chunk = "" for sentence in sentences: if len(current_chunk) + len(sentence) < self.chunk_size: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) return chunks class PDFRAGSystem: """PDF 기반 RAG 시스템""" def __init__(self): self.documents = {} self.document_chunks = {} self.embeddings_store = {} self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100) # 임베딩 모델 초기화 self.embedder = None if ST_AVAILABLE: try: self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') print("✅ 임베딩 모델 로드 성공") except Exception as e: print(f"⚠️ 임베딩 모델 로드 실패: {e}") def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: """PDF에서 텍스트 추출""" if not PDF_AVAILABLE: return { "metadata": { "title": "PDF Reader Not Available", "file_name": os.path.basename(pdf_path), "pages": 0 }, "full_text": "PDF 처리를 위해 'pip install pymupdf'를 실행해주세요." } try: doc = fitz.open(pdf_path) text_content = [] metadata = { "title": doc.metadata.get("title", os.path.basename(pdf_path)), "pages": len(doc), "file_name": os.path.basename(pdf_path) } for page_num, page in enumerate(doc): text = page.get_text() if text.strip(): text_content.append(text) doc.close() return { "metadata": metadata, "full_text": "\n\n".join(text_content) } except Exception as e: raise Exception(f"PDF 처리 오류: {str(e)}") def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: """PDF 처리 및 저장""" try: # PDF 텍스트 추출 pdf_data = self.extract_text_from_pdf(pdf_path) # 텍스트를 청크로 분할 chunks = self.text_splitter.split_text(pdf_data["full_text"]) # 청크 저장 self.document_chunks[doc_id] = chunks # 임베딩 생성 if self.embedder: embeddings = self.embedder.encode(chunks) self.embeddings_store[doc_id] = embeddings # 문서 정보 저장 self.documents[doc_id] = { "metadata": pdf_data["metadata"], "chunk_count": len(chunks), "upload_time": datetime.now().isoformat() } return { "success": True, "doc_id": doc_id, "chunks": len(chunks), "pages": pdf_data["metadata"]["pages"], "title": pdf_data["metadata"]["title"] } except Exception as e: return {"success": False, "error": str(e)} def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]: """관련 청크 검색""" all_relevant_chunks = [] if self.embedder and self.embeddings_store: # 임베딩 기반 검색 query_embedding = self.embedder.encode([query])[0] for doc_id in doc_ids: if doc_id in self.embeddings_store and doc_id in self.document_chunks: doc_embeddings = self.embeddings_store[doc_id] chunks = self.document_chunks[doc_id] # 코사인 유사도 계산 similarities = [] for emb in doc_embeddings: sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) similarities.append(sim) # 상위 청크 선택 top_indices = np.argsort(similarities)[-top_k:][::-1] for idx in top_indices: if similarities[idx] > 0.2: all_relevant_chunks.append({ "content": chunks[idx], "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": similarities[idx] }) else: # 키워드 기반 검색 query_keywords = set(query.lower().split()) for doc_id in doc_ids: if doc_id in self.document_chunks: chunks = self.document_chunks[doc_id] for i, chunk in enumerate(chunks[:5]): # 처음 5개만 chunk_lower = chunk.lower() score = sum(1 for keyword in query_keywords if keyword in chunk_lower) if score > 0: all_relevant_chunks.append({ "content": chunk[:500], "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": score / len(query_keywords) if query_keywords else 0 }) # 정렬 및 반환 all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True) return all_relevant_chunks[:top_k] def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str: """RAG 프롬프트 생성""" relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k) if not relevant_chunks: return query # 프롬프트 구성 prompt_parts = [] prompt_parts.append("다음 문서 내용을 참고하여 답변해주세요:\n") prompt_parts.append("=" * 40) for i, chunk in enumerate(relevant_chunks, 1): prompt_parts.append(f"\n[참고 {i} - {chunk['doc_name']}]") content = chunk['content'][:300] if len(chunk['content']) > 300 else chunk['content'] prompt_parts.append(content) prompt_parts.append("\n" + "=" * 40) prompt_parts.append(f"\n질문: {query}") return "\n".join(prompt_parts) # RAG 시스템 인스턴스 생성 rag_system = PDFRAGSystem() # State variable to track current model and RAG settings current_model = gr.State("openai/gpt-oss-120b") rag_enabled_state = gr.State(False) selected_docs_state = gr.State([]) top_k_state = gr.State(3) def upload_pdf(file): """PDF 파일 업로드 처리""" if file is None: return ( gr.update(value="
📁 파일을 선택해주세요
"), gr.update(choices=[]), gr.update(value=False) ) try: # 파일 해시를 ID로 사용 with open(file.name, 'rb') as f: file_hash = hashlib.md5(f.read()).hexdigest()[:8] doc_id = f"doc_{file_hash}" # PDF 처리 및 저장 result = rag_system.process_and_store_pdf(file.name, doc_id) if result["success"]: status_html = f"""
✅ PDF 업로드 완료!
📄 {result['title']}
📑 {result['pages']} 페이지 | 🔍 {result['chunks']} 청크
""" # 문서 목록 업데이트 doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" for doc_id in rag_system.documents.keys()] return ( status_html, gr.update(choices=doc_choices, value=doc_choices), gr.update(value=True) ) else: return ( f"
❌ 오류: {result['error']}
", gr.update(), gr.update(value=False) ) except Exception as e: return ( f"
❌ 오류: {str(e)}
", gr.update(), gr.update(value=False) ) def clear_documents(): """문서 초기화""" rag_system.documents = {} rag_system.document_chunks = {} rag_system.embeddings_store = {} return ( gr.update(value="
🗑️ 모든 문서가 삭제되었습니다
"), gr.update(choices=[], value=[]), gr.update(value=False) ) def switch_model(model_choice): """Function to switch between models""" return gr.update(visible=False), gr.update(visible=True), model_choice def create_rag_wrapper(original_fn, model_name): """원본 모델 함수를 RAG로 감싸는 래퍼 생성""" def wrapped_fn(message, history=None): # RAG 설정 가져오기 if rag_enabled_state.value and selected_docs_state.value: doc_ids = [doc.split(":")[0] for doc in selected_docs_state.value] enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k_state.value) # RAG 적용 알림 print(f"🔍 RAG 적용: {len(message)}자 → {len(enhanced_message)}자") # 원본 모델에 강화된 메시지 전달 if history is not None: return original_fn(enhanced_message, history) else: return original_fn(enhanced_message) else: # RAG 미적용시 원본 메시지 그대로 전달 if history is not None: return original_fn(message, history) else: return original_fn(message) return wrapped_fn # Main interface with soft theme with gr.Blocks(fill_height=True, theme=gr.themes.Soft(), css=custom_css) as demo: with gr.Row(): # Sidebar with gr.Column(scale=1): with gr.Group(elem_classes="main-container"): gr.Markdown("# 🚀 Inference Provider + RAG") gr.Markdown( "OpenAI GPT-OSS models served by Cerebras API. " "Upload PDF documents for context-aware responses." ) # Model selection model_dropdown = gr.Dropdown( choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], value="openai/gpt-oss-120b", label="📊 Select Model", info="Choose between different model sizes" ) # Login button login_button = gr.LoginButton("Sign in with Hugging Face", size="lg") # Reload button to apply model change reload_btn = gr.Button("🔄 Apply Model Change", variant="primary", size="lg") # RAG Settings with gr.Accordion("📚 PDF RAG Settings", open=True): pdf_upload = gr.File( label="Upload PDF", file_types=[".pdf"], type="filepath" ) upload_status = gr.HTML( value="
📤 Upload a PDF to enable document-based answers
" ) document_list = gr.CheckboxGroup( choices=[], label="📄 Uploaded Documents", info="Select documents to use as context" ) clear_btn = gr.Button("🗑️ Clear All Documents", size="sm", variant="secondary") enable_rag = gr.Checkbox( label="✨ Enable RAG", value=False, info="Use documents for context-aware responses" ) top_k_chunks = gr.Slider( minimum=1, maximum=5, value=3, step=1, label="Context Chunks", info="Number of document chunks to use" ) # Additional options with gr.Accordion("⚙️ Advanced Options", open=False): gr.Markdown("*These options will be available after model implementation*") temperature = gr.Slider( minimum=0, maximum=2, value=0.7, step=0.1, label="Temperature" ) max_tokens = gr.Slider( minimum=1, maximum=4096, value=512, step=1, label="Max Tokens" ) # Main chat area with gr.Column(scale=3): with gr.Group(elem_classes="main-container"): gr.Markdown("## 💬 Chat Interface") # RAG status rag_status = gr.HTML( value="
🔍 RAG: Disabled
" ) # RAG context preview context_preview = gr.HTML(value="", visible=False) # Container for model interfaces with gr.Column(visible=True) as model_120b_container: gr.Markdown("### Model: openai/gpt-oss-120b") # Load the original model and wrap it with RAG original_interface_120b = gr.load( "models/openai/gpt-oss-120b", accept_token=login_button, provider="fireworks-ai" ) # Note: The loaded interface will have its own chat components # We'll intercept the messages through our wrapper function with gr.Column(visible=False) as model_20b_container: gr.Markdown("### Model: openai/gpt-oss-20b") # Load the original model original_interface_20b = gr.load( "models/openai/gpt-oss-20b", accept_token=login_button, provider="fireworks-ai" ) # Event Handlers # PDF upload pdf_upload.upload( fn=upload_pdf, inputs=[pdf_upload], outputs=[upload_status, document_list, enable_rag] ) # Clear documents clear_btn.click( fn=clear_documents, outputs=[upload_status, document_list, enable_rag] ) # Update RAG state when settings change def update_rag_state(enabled, docs, k): rag_enabled_state.value = enabled selected_docs_state.value = docs if docs else [] top_k_state.value = k status = "✅ Enabled" if enabled and docs else "⭕ Disabled" status_html = f"
🔍 RAG: {status}
" # Show context preview if RAG is enabled if enabled and docs: preview = f"
📚 Using {len(docs)} document(s) with {k} chunks per query
" return gr.update(value=status_html), gr.update(value=preview, visible=True) else: return gr.update(value=status_html), gr.update(value="", visible=False) # Connect RAG state updates enable_rag.change( fn=update_rag_state, inputs=[enable_rag, document_list, top_k_chunks], outputs=[rag_status, context_preview] ) document_list.change( fn=update_rag_state, inputs=[enable_rag, document_list, top_k_chunks], outputs=[rag_status, context_preview] ) top_k_chunks.change( fn=update_rag_state, inputs=[enable_rag, document_list, top_k_chunks], outputs=[rag_status, context_preview] ) # Handle model switching reload_btn.click( fn=switch_model, inputs=[model_dropdown], outputs=[model_120b_container, model_20b_container, current_model] ).then( fn=lambda: gr.Info("Model switched successfully!"), inputs=[], outputs=[] ) # Update visibility based on dropdown selection def update_visibility(model_choice): if model_choice == "openai/gpt-oss-120b": return gr.update(visible=True), gr.update(visible=False) else: return gr.update(visible=False), gr.update(visible=True) model_dropdown.change( fn=update_visibility, inputs=[model_dropdown], outputs=[model_120b_container, model_20b_container] ) # Monkey-patch the loaded interfaces to add RAG support # This is done after the interface is loaded demo.load = lambda: print("📚 RAG System Ready!") demo.launch()