import gradio as gr import os from typing import List, Dict, Any, Optional, Tuple import hashlib from datetime import datetime import numpy as np from gradio_client import Client # PDF 처리 라이브러리 try: import fitz # PyMuPDF PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False print("⚠️ PyMuPDF not installed. Install with: pip install pymupdf") try: from sentence_transformers import SentenceTransformer ST_AVAILABLE = True except ImportError: ST_AVAILABLE = False print("⚠️ Sentence Transformers not installed. Install with: pip install sentence-transformers") # Soft and bright custom CSS custom_css = """ .gradio-container { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); min-height: 100vh; font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; } .main-container { background: rgba(255, 255, 255, 0.98); border-radius: 16px; padding: 24px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); border: 1px solid rgba(0, 0, 0, 0.05); margin: 12px; } .main-container:hover { box-shadow: 0 10px 15px -3px rgba(0, 0, 0, 0.1), 0 4px 6px -2px rgba(0, 0, 0, 0.05); transition: all 0.3s ease; } /* Status messages styling */ .pdf-status { padding: 12px 16px; border-radius: 12px; margin: 12px 0; font-size: 0.95rem; font-weight: 500; backdrop-filter: blur(10px); } .pdf-success { background: linear-gradient(135deg, #d4edda 0%, #c3e6cb 100%); border: 1px solid #b1dfbb; color: #155724; } .pdf-error { background: linear-gradient(135deg, #f8d7da 0%, #f5c6cb 100%); border: 1px solid #f1aeb5; color: #721c24; } .pdf-info { background: linear-gradient(135deg, #d1ecf1 0%, #bee5eb 100%); border: 1px solid #9ec5d8; color: #0c5460; } .pdf-warning { background: linear-gradient(135deg, #fff3cd 0%, #ffeeba 100%); border: 1px solid #ffeaa7; color: #856404; } /* RAG context display */ .rag-context { background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%); border-left: 4px solid #f59e0b; padding: 16px; margin: 16px 0; border-radius: 8px; font-size: 0.9rem; } /* Chat message styling */ .message { padding: 12px 16px; margin: 8px 4px; border-radius: 12px; max-width: 80%; } .user-message { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; margin-left: auto; } .bot-message { background: #f3f4f6; color: #1f2937; } """ class SimpleTextSplitter: """텍스트 분할기""" def __init__(self, chunk_size=800, chunk_overlap=100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def split_text(self, text: str) -> List[str]: """텍스트를 청크로 분할""" chunks = [] sentences = text.split('. ') current_chunk = "" for sentence in sentences: if len(current_chunk) + len(sentence) < self.chunk_size: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) return chunks class PDFRAGSystem: """PDF 기반 RAG 시스템""" def __init__(self): self.documents = {} self.document_chunks = {} self.embeddings_store = {} self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100) # 임베딩 모델 초기화 self.embedder = None if ST_AVAILABLE: try: self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') print("✅ 임베딩 모델 로드 성공") except Exception as e: print(f"⚠️ 임베딩 모델 로드 실패: {e}") def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: """PDF에서 텍스트 추출""" if not PDF_AVAILABLE: return { "metadata": { "title": "PDF Reader Not Available", "file_name": os.path.basename(pdf_path), "pages": 0 }, "full_text": "PDF 처리를 위해 'pip install pymupdf'를 실행해주세요." } try: doc = fitz.open(pdf_path) text_content = [] metadata = { "title": doc.metadata.get("title", os.path.basename(pdf_path)), "pages": len(doc), "file_name": os.path.basename(pdf_path) } for page_num, page in enumerate(doc): text = page.get_text() if text.strip(): text_content.append(text) doc.close() return { "metadata": metadata, "full_text": "\n\n".join(text_content) } except Exception as e: raise Exception(f"PDF 처리 오류: {str(e)}") def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: """PDF 처리 및 저장""" try: # PDF 텍스트 추출 pdf_data = self.extract_text_from_pdf(pdf_path) # 텍스트를 청크로 분할 chunks = self.text_splitter.split_text(pdf_data["full_text"]) # 청크 저장 self.document_chunks[doc_id] = chunks # 임베딩 생성 if self.embedder: embeddings = self.embedder.encode(chunks) self.embeddings_store[doc_id] = embeddings # 문서 정보 저장 self.documents[doc_id] = { "metadata": pdf_data["metadata"], "chunk_count": len(chunks), "upload_time": datetime.now().isoformat() } return { "success": True, "doc_id": doc_id, "chunks": len(chunks), "pages": pdf_data["metadata"]["pages"], "title": pdf_data["metadata"]["title"] } except Exception as e: return {"success": False, "error": str(e)} def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]: """관련 청크 검색""" all_relevant_chunks = [] if self.embedder and self.embeddings_store: # 임베딩 기반 검색 query_embedding = self.embedder.encode([query])[0] for doc_id in doc_ids: if doc_id in self.embeddings_store and doc_id in self.document_chunks: doc_embeddings = self.embeddings_store[doc_id] chunks = self.document_chunks[doc_id] # 코사인 유사도 계산 similarities = [] for emb in doc_embeddings: sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) similarities.append(sim) # 상위 청크 선택 top_indices = np.argsort(similarities)[-top_k:][::-1] for idx in top_indices: if similarities[idx] > 0.2: all_relevant_chunks.append({ "content": chunks[idx], "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": similarities[idx] }) else: # 키워드 기반 검색 query_keywords = set(query.lower().split()) for doc_id in doc_ids: if doc_id in self.document_chunks: chunks = self.document_chunks[doc_id] for chunk in chunks[:top_k]: chunk_lower = chunk.lower() score = sum(1 for keyword in query_keywords if keyword in chunk_lower) if score > 0: all_relevant_chunks.append({ "content": chunk[:500], "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": score / len(query_keywords) if query_keywords else 0 }) # 정렬 및 반환 all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True) return all_relevant_chunks[:top_k] def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str: """RAG 프롬프트 생성""" relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k) if not relevant_chunks: return query # 프롬프트 구성 prompt_parts = [] prompt_parts.append("아래 참고 문서를 바탕으로 질문에 답변해주세요.\n") prompt_parts.append("=" * 50) for i, chunk in enumerate(relevant_chunks, 1): prompt_parts.append(f"\n[참고문서 {i} - {chunk['doc_name']}]") content = chunk['content'][:400] if len(chunk['content']) > 400 else chunk['content'] prompt_parts.append(content) prompt_parts.append("") prompt_parts.append("=" * 50) prompt_parts.append(f"\n질문: {query}") prompt_parts.append("\n위 참고문서의 내용을 바탕으로 정확하고 상세하게 답변해주세요:") return "\n".join(prompt_parts) # RAG 시스템 인스턴스 생성 rag_system = PDFRAGSystem() # State variables current_model = gr.State("openai/gpt-oss-120b") conversation_history = gr.State([]) def upload_pdf(file): """PDF 파일 업로드 처리""" if file is None: return ( gr.update(value="
📁 파일을 선택해주세요
"), gr.update(choices=[]), gr.update(value=False) ) try: # 파일 해시를 ID로 사용 with open(file.name, 'rb') as f: file_hash = hashlib.md5(f.read()).hexdigest()[:8] doc_id = f"doc_{file_hash}" # PDF 처리 및 저장 result = rag_system.process_and_store_pdf(file.name, doc_id) if result["success"]: status_html = f"""
✅ PDF 업로드 완료
📄 파일: {result['title']}
📑 페이지: {result['pages']}페이지
🔍 청크: {result['chunks']}개 생성
""" # 문서 목록 업데이트 doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" for doc_id in rag_system.documents.keys()] return ( status_html, gr.update(choices=doc_choices, value=doc_choices), gr.update(value=True) ) else: status_html = f"""
❌ 업로드 실패: {result['error']}
""" return status_html, gr.update(), gr.update(value=False) except Exception as e: return ( f"
❌ 오류: {str(e)}
", gr.update(), gr.update(value=False) ) def clear_documents(): """문서 초기화""" rag_system.documents = {} rag_system.document_chunks = {} rag_system.embeddings_store = {} return ( gr.update(value="
🗑️ 모든 문서가 삭제되었습니다
"), gr.update(choices=[], value=[]), gr.update(value=False) ) def switch_model(model_choice): """Function to switch between models""" if model_choice == "openai/gpt-oss-120b": return gr.update(visible=True), gr.update(visible=False), model_choice else: return gr.update(visible=False), gr.update(visible=True), model_choice def chat_with_rag(message, history, model_name, enable_rag, selected_docs, top_k, temperature, max_tokens): """RAG를 적용한 채팅 함수""" if not message: return history # RAG 적용 if enable_rag and selected_docs: doc_ids = [doc.split(":")[0] for doc in selected_docs] enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k) # 디버그: RAG 적용 확인 print(f"RAG 적용됨 - 원본: {len(message)}자, 강화: {len(enhanced_message)}자") else: enhanced_message = message try: # 여기서 실제 모델 API를 호출해야 합니다 # 임시로 모의 응답 생성 if enable_rag and selected_docs: response = f"""📚 [RAG 기반 답변] 문서를 참고하여 답변드립니다: {enhanced_message[:500]}... [참고: 실제 모델 API 연결 필요] """ else: response = f"""💬 [일반 답변] 질문: {message} [참고: 실제 모델 API 연결 필요] """ # 대화 기록에 추가 history.append([message, response]) except Exception as e: response = f"❌ 오류 발생: {str(e)}" history.append([message, response]) return history # Main interface with soft theme with gr.Blocks(fill_height=True, theme=gr.themes.Soft(), css=custom_css) as demo: with gr.Row(): # Sidebar with gr.Column(scale=1): with gr.Group(elem_classes="main-container"): gr.Markdown("# 🤖 AI Chat + RAG") gr.Markdown( "OpenAI GPT-OSS 모델과 PDF 문서 기반 답변 시스템입니다." ) # Login button login_button = gr.LoginButton("🔐 Hugging Face 로그인", size="lg") # Model selection model_dropdown = gr.Dropdown( choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], value="openai/gpt-oss-120b", label="📊 모델 선택", info="원하는 모델 크기를 선택하세요" ) # Reload button to apply model change reload_btn = gr.Button("🔄 모델 변경 적용", variant="primary", size="lg") # RAG Settings with gr.Accordion("📚 PDF RAG 설정", open=True): pdf_upload = gr.File( label="📤 PDF 업로드", file_types=[".pdf"], type="filepath" ) upload_status = gr.HTML( value="
📁 PDF를 업로드하여 문서 기반 답변을 받으세요
" ) document_list = gr.CheckboxGroup( choices=[], label="📄 업로드된 문서", info="참고할 문서를 선택하세요" ) with gr.Row(): clear_btn = gr.Button("🗑️ 모든 문서 삭제", size="sm", variant="secondary") enable_rag = gr.Checkbox( label="✨ RAG 활성화", value=False, info="선택한 문서를 참고하여 답변 생성" ) top_k_chunks = gr.Slider( minimum=1, maximum=5, value=3, step=1, label="참조 청크 수", info="답변시 참고할 문서 조각 개수" ) # Additional options with gr.Accordion("🎛️ 모델 옵션", open=False): temperature = gr.Slider( minimum=0, maximum=2, value=0.7, step=0.1, label="Temperature", info="낮을수록 일관성 있고, 높을수록 창의적입니다" ) max_tokens = gr.Slider( minimum=1, maximum=4096, value=512, step=1, label="Max Tokens", info="생성할 최대 토큰 수" ) # Main chat area with gr.Column(scale=3): with gr.Group(elem_classes="main-container"): gr.Markdown("## 💬 Chat Interface") # RAG 상태 표시 rag_status = gr.HTML( value="
🔍 RAG: 비활성화
" ) # 통합된 채팅 인터페이스 (모델별로 하나씩) with gr.Column(visible=True) as model_120b_container: gr.Markdown("### 🚀 Model: openai/gpt-oss-120b") chatbot_120b = gr.Chatbot( height=400, show_label=False, elem_classes="chatbot" ) with gr.Row(): msg_120b = gr.Textbox( placeholder="메시지를 입력하세요... (Enter로 전송)", show_label=False, scale=4, container=False ) send_btn_120b = gr.Button("📤 전송", variant="primary", scale=1) with gr.Row(): clear_btn_120b = gr.Button("🗑️ 대화 초기화", variant="secondary", size="sm") # 예제 질문들 gr.Examples( examples=[ "문서의 주요 내용을 요약해주세요", "이 문서에서 가장 중요한 포인트는 무엇인가요?", "문서에 언급된 날짜와 일정을 알려주세요" ], inputs=msg_120b ) with gr.Column(visible=False) as model_20b_container: gr.Markdown("### 🚀 Model: openai/gpt-oss-20b") chatbot_20b = gr.Chatbot( height=400, show_label=False, elem_classes="chatbot" ) with gr.Row(): msg_20b = gr.Textbox( placeholder="메시지를 입력하세요... (Enter로 전송)", show_label=False, scale=4, container=False ) send_btn_20b = gr.Button("📤 전송", variant="primary", scale=1) with gr.Row(): clear_btn_20b = gr.Button("🗑️ 대화 초기화", variant="secondary", size="sm") # 예제 질문들 gr.Examples( examples=[ "문서의 주요 내용을 요약해주세요", "이 문서에서 가장 중요한 포인트는 무엇인가요?", "문서에 언급된 날짜와 일정을 알려주세요" ], inputs=msg_20b ) # Event Handlers # PDF 업로드 pdf_upload.upload( fn=upload_pdf, inputs=[pdf_upload], outputs=[upload_status, document_list, enable_rag] ) # 문서 삭제 clear_btn.click( fn=clear_documents, outputs=[upload_status, document_list, enable_rag] ) # RAG 상태 업데이트 enable_rag.change( fn=lambda x: gr.update( value=f"
🔍 RAG: {'✅ 활성화' if x else '⭕ 비활성화'}
" ), inputs=[enable_rag], outputs=[rag_status] ) # 모델 전환 reload_btn.click( fn=switch_model, inputs=[model_dropdown], outputs=[model_120b_container, model_20b_container, current_model] ).then( fn=lambda: gr.Info("✅ 모델이 성공적으로 전환되었습니다!"), inputs=[], outputs=[] ) # 120b 모델 채팅 이벤트 msg_120b.submit( fn=lambda msg, hist: chat_with_rag( msg, hist, "openai/gpt-oss-120b", enable_rag.value, document_list.value, top_k_chunks.value, temperature.value, max_tokens.value ), inputs=[msg_120b, chatbot_120b], outputs=[chatbot_120b] ).then( fn=lambda: "", outputs=[msg_120b] ) send_btn_120b.click( fn=lambda msg, hist: chat_with_rag( msg, hist, "openai/gpt-oss-120b", enable_rag.value, document_list.value, top_k_chunks.value, temperature.value, max_tokens.value ), inputs=[msg_120b, chatbot_120b], outputs=[chatbot_120b] ).then( fn=lambda: "", outputs=[msg_120b] ) clear_btn_120b.click( fn=lambda: [], outputs=[chatbot_120b] ) # 20b 모델 채팅 이벤트 msg_20b.submit( fn=lambda msg, hist: chat_with_rag( msg, hist, "openai/gpt-oss-20b", enable_rag.value, document_list.value, top_k_chunks.value, temperature.value, max_tokens.value ), inputs=[msg_20b, chatbot_20b], outputs=[chatbot_20b] ).then( fn=lambda: "", outputs=[msg_20b] ) send_btn_20b.click( fn=lambda msg, hist: chat_with_rag( msg, hist, "openai/gpt-oss-20b", enable_rag.value, document_list.value, top_k_chunks.value, temperature.value, max_tokens.value ), inputs=[msg_20b, chatbot_20b], outputs=[chatbot_20b] ).then( fn=lambda: "", outputs=[msg_20b] ) clear_btn_20b.click( fn=lambda: [], outputs=[chatbot_20b] ) # 실제 모델 API 연결을 위한 함수 (구현 필요) def connect_to_model_api(model_name, message, temperature, max_tokens): """ 실제 모델 API에 연결하는 함수 TODO: 여기에 실제 API 호출 코드를 구현해야 합니다 예시: - OpenAI API - Hugging Face Inference API - Custom model endpoint """ # client = Client(f"models/{model_name}") # response = client.predict(message, temperature=temperature, max_tokens=max_tokens) # return response pass demo.launch()