import gradio as gr import os from typing import List, Dict, Any, Optional import hashlib import json from datetime import datetime # PDF 처리 라이브러리 import pymupdf # PyMuPDF import chromadb from chromadb.utils import embedding_functions from langchain.text_splitter import RecursiveCharacterTextSplitter from sentence_transformers import SentenceTransformer import numpy as np # Custom CSS (기존 CSS + 추가 스타일) custom_css = """ .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%); background-size: 400% 400%; animation: gradient-animation 15s ease infinite; min-height: 100vh; } @keyframes gradient-animation { 0% { background-position: 0% 50%; } 50% { background-position: 100% 50%; } 100% { background-position: 0% 50%; } } .dark .gradio-container { background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%); background-size: 400% 400%; animation: gradient-animation 15s ease infinite; } .main-container { background-color: rgba(255, 255, 255, 0.95); backdrop-filter: blur(10px); border-radius: 20px; padding: 20px; box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37); border: 1px solid rgba(255, 255, 255, 0.18); margin: 10px; } .dark .main-container { background-color: rgba(30, 30, 30, 0.95); border: 1px solid rgba(255, 255, 255, 0.1); } .pdf-status { padding: 10px; border-radius: 10px; margin: 10px 0; font-size: 0.9em; } .pdf-success { background-color: rgba(52, 211, 153, 0.2); border: 1px solid rgba(52, 211, 153, 0.5); color: #10b981; } .pdf-error { background-color: rgba(248, 113, 113, 0.2); border: 1px solid rgba(248, 113, 113, 0.5); color: #ef4444; } .pdf-processing { background-color: rgba(251, 191, 36, 0.2); border: 1px solid rgba(251, 191, 36, 0.5); color: #f59e0b; } .document-card { padding: 12px; margin: 8px 0; border-radius: 8px; background: rgba(255, 255, 255, 0.1); border: 1px solid rgba(255, 255, 255, 0.2); cursor: pointer; transition: all 0.3s ease; } .document-card:hover { background: rgba(255, 255, 255, 0.2); transform: translateX(5px); } """ class PDFRAGSystem: """PDF 기반 RAG 시스템 클래스""" def __init__(self): self.documents = {} self.embedder = None self.vector_store = None self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len, separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] ) self.initialize_vector_store() def initialize_vector_store(self): """벡터 저장소 초기화""" try: # Sentence Transformer 모델 로드 self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') # ChromaDB 클라이언트 초기화 self.chroma_client = chromadb.Client() self.collection = self.chroma_client.create_collection( name="pdf_documents", metadata={"hnsw:space": "cosine"} ) except Exception as e: print(f"Vector store initialization error: {e}") def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: """PDF에서 텍스트 추출""" try: doc = pymupdf.open(pdf_path) text_content = [] metadata = { "title": doc.metadata.get("title", "Untitled"), "author": doc.metadata.get("author", "Unknown"), "pages": len(doc), "creation_date": doc.metadata.get("creationDate", ""), "file_name": os.path.basename(pdf_path) } for page_num, page in enumerate(doc): text = page.get_text() if text.strip(): text_content.append({ "page": page_num + 1, "content": text }) doc.close() return { "metadata": metadata, "pages": text_content, "full_text": "\n\n".join([p["content"] for p in text_content]) } except Exception as e: raise Exception(f"PDF 처리 오류: {str(e)}") def process_and_index_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: """PDF 처리 및 벡터 인덱싱""" try: # PDF 텍스트 추출 pdf_data = self.extract_text_from_pdf(pdf_path) # 텍스트를 청크로 분할 chunks = self.text_splitter.split_text(pdf_data["full_text"]) # 각 청크에 대한 임베딩 생성 embeddings = self.embedder.encode(chunks) # ChromaDB에 저장 ids = [f"{doc_id}_{i}" for i in range(len(chunks))] metadatas = [ { "doc_id": doc_id, "chunk_index": i, "source": pdf_data["metadata"]["file_name"], "page_count": pdf_data["metadata"]["pages"] } for i in range(len(chunks)) ] self.collection.add( ids=ids, embeddings=embeddings.tolist(), documents=chunks, metadatas=metadatas ) # 문서 정보 저장 self.documents[doc_id] = { "metadata": pdf_data["metadata"], "chunk_count": len(chunks), "upload_time": datetime.now().isoformat() } return { "success": True, "doc_id": doc_id, "chunks": len(chunks), "pages": pdf_data["metadata"]["pages"], "title": pdf_data["metadata"]["title"] } except Exception as e: return { "success": False, "error": str(e) } def search_relevant_chunks(self, query: str, top_k: int = 5) -> List[Dict]: """쿼리와 관련된 청크 검색""" try: # 쿼리 임베딩 생성 query_embedding = self.embedder.encode([query]) # 유사한 문서 검색 results = self.collection.query( query_embeddings=query_embedding.tolist(), n_results=top_k ) if results and results['documents']: chunks = [] for i in range(len(results['documents'][0])): chunks.append({ "content": results['documents'][0][i], "metadata": results['metadatas'][0][i], "distance": results['distances'][0][i] if 'distances' in results else None }) return chunks return [] except Exception as e: print(f"Search error: {e}") return [] def generate_rag_prompt(self, query: str, context_chunks: List[Dict]) -> str: """RAG 프롬프트 생성""" context = "\n\n---\n\n".join([ f"[출처: {chunk['metadata']['source']}, 청크 {chunk['metadata']['chunk_index']+1}]\n{chunk['content']}" for chunk in context_chunks ]) prompt = f"""다음 문서 내용을 참고하여 질문에 답변해주세요. 답변은 제공된 문서 내용을 바탕으로 작성하되, 필요시 추가 설명을 포함할 수 있습니다. 문서에서 관련 정보를 찾을 수 없는 경우, 그 사실을 명시해주세요. 📚 참고 문서: {context} ❓ 질문: {query} 💡 답변:""" return prompt # RAG 시스템 인스턴스 생성 rag_system = PDFRAGSystem() # State variables current_model = gr.State("openai/gpt-oss-120b") uploaded_documents = gr.State({}) rag_enabled = gr.State(False) def upload_pdf(file): """PDF 파일 업로드 처리""" if file is None: return gr.update(value="파일을 선택해주세요"), gr.update(choices=[]), gr.update(value=False) try: # 파일 해시를 ID로 사용 with open(file.name, 'rb') as f: file_hash = hashlib.md5(f.read()).hexdigest()[:8] doc_id = f"doc_{file_hash}" # PDF 처리 및 인덱싱 result = rag_system.process_and_index_pdf(file.name, doc_id) if result["success"]: status_html = f"""
✅ PDF 업로드 성공!
📄 제목: {result.get('title', 'Unknown')}
📑 페이지: {result['pages']}페이지
🔍 생성된 청크: {result['chunks']}개
🆔 문서 ID: {doc_id}
""" # 문서 목록 업데이트 doc_list = list(rag_system.documents.keys()) doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" for doc_id in doc_list] return status_html, gr.update(choices=doc_choices, value=doc_choices), gr.update(value=True) else: status_html = f"""
❌ PDF 업로드 실패
오류: {result['error']}
""" return status_html, gr.update(choices=[]), gr.update(value=False) except Exception as e: status_html = f"""
❌ 오류 발생: {str(e)}
""" return status_html, gr.update(choices=[]), gr.update(value=False) def clear_documents(): """업로드된 문서 초기화""" try: # ChromaDB 컬렉션 재생성 rag_system.chroma_client.delete_collection("pdf_documents") rag_system.collection = rag_system.chroma_client.create_collection( name="pdf_documents", metadata={"hnsw:space": "cosine"} ) rag_system.documents = {} return gr.update(value="
✅ 모든 문서가 삭제되었습니다
"), gr.update(choices=[], value=[]), gr.update(value=False) except Exception as e: return gr.update(value=f"
❌ 삭제 실패: {str(e)}
"), gr.update(), gr.update() def process_with_rag(message: str, enable_rag: bool, selected_docs: List[str], top_k: int = 5): """RAG를 활용한 메시지 처리""" if not enable_rag or not selected_docs: return message # RAG 비활성화시 원본 메시지 반환 try: # 관련 청크 검색 relevant_chunks = rag_system.search_relevant_chunks(message, top_k=top_k) if relevant_chunks: # 선택된 문서의 청크만 필터링 selected_doc_ids = [doc.split(":")[0] for doc in selected_docs] filtered_chunks = [ chunk for chunk in relevant_chunks if chunk['metadata']['doc_id'] in selected_doc_ids ] if filtered_chunks: # RAG 프롬프트 생성 rag_prompt = rag_system.generate_rag_prompt(message, filtered_chunks[:top_k]) return rag_prompt return message except Exception as e: print(f"RAG processing error: {e}") return message def switch_model(model_choice): """모델 전환 함수""" return gr.update(visible=False), gr.update(visible=True), model_choice # Gradio 인터페이스 with gr.Blocks(fill_height=True, theme="Nymbo/Nymbo_Theme", css=custom_css) as demo: with gr.Row(): # 사이드바 with gr.Column(scale=1): with gr.Group(elem_classes="main-container"): gr.Markdown("# 🚀 AI Chat with RAG") gr.Markdown( "PDF 문서를 업로드하여 AI가 문서 내용을 참고해 답변하도록 할 수 있습니다." ) # 모델 선택 model_dropdown = gr.Dropdown( choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], value="openai/gpt-oss-120b", label="📊 모델 선택" ) login_button = gr.LoginButton("Sign in with Hugging Face", size="lg") reload_btn = gr.Button("🔄 모델 변경 적용", variant="primary", size="lg") # RAG 설정 with gr.Accordion("📚 PDF RAG 설정", open=True): pdf_upload = gr.File( label="PDF 업로드", file_types=[".pdf"], type="filepath" ) upload_status = gr.HTML( value="
PDF를 업로드하여 RAG를 활성화하세요
" ) document_list = gr.CheckboxGroup( choices=[], label="📄 업로드된 문서", info="질문에 참고할 문서를 선택하세요" ) with gr.Row(): clear_btn = gr.Button("🗑️ 모든 문서 삭제", size="sm") refresh_btn = gr.Button("🔄 목록 새로고침", size="sm") enable_rag = gr.Checkbox( label="RAG 활성화", value=False, info="문서 기반 답변 생성 활성화" ) with gr.Accordion("⚙️ RAG 고급 설정", open=False): top_k_chunks = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="참조할 청크 수", info="답변 생성시 참고할 문서 청크의 개수" ) chunk_size = gr.Slider( minimum=500, maximum=2000, value=1000, step=100, label="청크 크기", info="문서를 분할하는 청크의 크기 (문자 수)" ) # 고급 옵션 with gr.Accordion("⚙️ 모델 설정", open=False): temperature = gr.Slider( minimum=0, maximum=2, value=0.7, step=0.1, label="Temperature" ) max_tokens = gr.Slider( minimum=1, maximum=4096, value=512, step=1, label="Max Tokens" ) # 메인 채팅 영역 with gr.Column(scale=3): with gr.Group(elem_classes="main-container"): gr.Markdown("## 💬 Chat Interface") # RAG 상태 표시 with gr.Row(): rag_status = gr.HTML( value="
🔍 RAG: 비활성화
" ) # 모델 인터페이스 컨테이너 with gr.Column(visible=True) as model_120b_container: gr.Markdown("### Model: openai/gpt-oss-120b") # 실제 모델 로드는 gr.load()로 처리 chatbot_120b = gr.Chatbot(height=400) msg_box_120b = gr.Textbox( label="메시지 입력", placeholder="PDF 내용에 대해 질문해보세요...", lines=2 ) with gr.Row(): send_btn_120b = gr.Button("📤 전송", variant="primary") clear_btn_120b = gr.Button("🗑️ 대화 초기화") with gr.Column(visible=False) as model_20b_container: gr.Markdown("### Model: openai/gpt-oss-20b") chatbot_20b = gr.Chatbot(height=400) msg_box_20b = gr.Textbox( label="메시지 입력", placeholder="PDF 내용에 대해 질문해보세요...", lines=2 ) with gr.Row(): send_btn_20b = gr.Button("📤 전송", variant="primary") clear_btn_20b = gr.Button("🗑️ 대화 초기화") # 이벤트 핸들러 # PDF 업로드 처리 pdf_upload.upload( fn=upload_pdf, inputs=[pdf_upload], outputs=[upload_status, document_list, enable_rag] ) # 문서 초기화 clear_btn.click( fn=clear_documents, outputs=[upload_status, document_list, enable_rag] ) # RAG 상태 업데이트 enable_rag.change( fn=lambda x: gr.update( value=f"
🔍 RAG: {'활성화' if x else '비활성화'}
" ), inputs=[enable_rag], outputs=[rag_status] ) # 모델 전환 reload_btn.click( fn=switch_model, inputs=[model_dropdown], outputs=[model_120b_container, model_20b_container, current_model] ).then( fn=lambda: gr.Info("모델이 성공적으로 전환되었습니다!"), inputs=[], outputs=[] ) # 채팅 기능 (RAG 통합) def chat_with_rag(message, history, enable_rag, selected_docs, top_k): """RAG를 활용한 채팅""" # RAG 처리 processed_message = process_with_rag(message, enable_rag, selected_docs, top_k) # 여기에 실제 모델 API 호출 코드가 들어가야 함 # 현재는 예시 응답 if enable_rag and selected_docs: response = f"[RAG 활성화] 선택된 {len(selected_docs)}개 문서를 참고하여 답변합니다:\n\n{processed_message[:200]}..." else: response = f"[일반 모드] {message}에 대한 답변입니다." history.append((message, response)) return "", history # 120b 모델 채팅 msg_box_120b.submit( fn=chat_with_rag, inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks], outputs=[msg_box_120b, chatbot_120b] ) send_btn_120b.click( fn=chat_with_rag, inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks], outputs=[msg_box_120b, chatbot_120b] ) clear_btn_120b.click( lambda: ([], ""), outputs=[chatbot_120b, msg_box_120b] ) # 20b 모델 채팅 msg_box_20b.submit( fn=chat_with_rag, inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks], outputs=[msg_box_20b, chatbot_20b] ) send_btn_20b.click( fn=chat_with_rag, inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks], outputs=[msg_box_20b, chatbot_20b] ) clear_btn_20b.click( lambda: ([], ""), outputs=[chatbot_20b, msg_box_20b] ) if __name__ == "__main__": demo.launch()