import gradio as gr
import os
from typing import List, Dict, Any, Optional
import hashlib
import json
from datetime import datetime
# PDF 처리 라이브러리
import pymupdf # PyMuPDF
import chromadb
from chromadb.utils import embedding_functions
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import numpy as np
# Custom CSS (기존 CSS + 추가 스타일)
custom_css = """
.gradio-container {
background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%);
background-size: 400% 400%;
animation: gradient-animation 15s ease infinite;
min-height: 100vh;
}
@keyframes gradient-animation {
0% { background-position: 0% 50%; }
50% { background-position: 100% 50%; }
100% { background-position: 0% 50%; }
}
.dark .gradio-container {
background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%);
background-size: 400% 400%;
animation: gradient-animation 15s ease infinite;
}
.main-container {
background-color: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 20px;
padding: 20px;
box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37);
border: 1px solid rgba(255, 255, 255, 0.18);
margin: 10px;
}
.dark .main-container {
background-color: rgba(30, 30, 30, 0.95);
border: 1px solid rgba(255, 255, 255, 0.1);
}
.pdf-status {
padding: 10px;
border-radius: 10px;
margin: 10px 0;
font-size: 0.9em;
}
.pdf-success {
background-color: rgba(52, 211, 153, 0.2);
border: 1px solid rgba(52, 211, 153, 0.5);
color: #10b981;
}
.pdf-error {
background-color: rgba(248, 113, 113, 0.2);
border: 1px solid rgba(248, 113, 113, 0.5);
color: #ef4444;
}
.pdf-processing {
background-color: rgba(251, 191, 36, 0.2);
border: 1px solid rgba(251, 191, 36, 0.5);
color: #f59e0b;
}
.document-card {
padding: 12px;
margin: 8px 0;
border-radius: 8px;
background: rgba(255, 255, 255, 0.1);
border: 1px solid rgba(255, 255, 255, 0.2);
cursor: pointer;
transition: all 0.3s ease;
}
.document-card:hover {
background: rgba(255, 255, 255, 0.2);
transform: translateX(5px);
}
"""
class PDFRAGSystem:
"""PDF 기반 RAG 시스템 클래스"""
def __init__(self):
self.documents = {}
self.embedder = None
self.vector_store = None
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
)
self.initialize_vector_store()
def initialize_vector_store(self):
"""벡터 저장소 초기화"""
try:
# Sentence Transformer 모델 로드
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
# ChromaDB 클라이언트 초기화
self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.create_collection(
name="pdf_documents",
metadata={"hnsw:space": "cosine"}
)
except Exception as e:
print(f"Vector store initialization error: {e}")
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]:
"""PDF에서 텍스트 추출"""
try:
doc = pymupdf.open(pdf_path)
text_content = []
metadata = {
"title": doc.metadata.get("title", "Untitled"),
"author": doc.metadata.get("author", "Unknown"),
"pages": len(doc),
"creation_date": doc.metadata.get("creationDate", ""),
"file_name": os.path.basename(pdf_path)
}
for page_num, page in enumerate(doc):
text = page.get_text()
if text.strip():
text_content.append({
"page": page_num + 1,
"content": text
})
doc.close()
return {
"metadata": metadata,
"pages": text_content,
"full_text": "\n\n".join([p["content"] for p in text_content])
}
except Exception as e:
raise Exception(f"PDF 처리 오류: {str(e)}")
def process_and_index_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]:
"""PDF 처리 및 벡터 인덱싱"""
try:
# PDF 텍스트 추출
pdf_data = self.extract_text_from_pdf(pdf_path)
# 텍스트를 청크로 분할
chunks = self.text_splitter.split_text(pdf_data["full_text"])
# 각 청크에 대한 임베딩 생성
embeddings = self.embedder.encode(chunks)
# ChromaDB에 저장
ids = [f"{doc_id}_{i}" for i in range(len(chunks))]
metadatas = [
{
"doc_id": doc_id,
"chunk_index": i,
"source": pdf_data["metadata"]["file_name"],
"page_count": pdf_data["metadata"]["pages"]
}
for i in range(len(chunks))
]
self.collection.add(
ids=ids,
embeddings=embeddings.tolist(),
documents=chunks,
metadatas=metadatas
)
# 문서 정보 저장
self.documents[doc_id] = {
"metadata": pdf_data["metadata"],
"chunk_count": len(chunks),
"upload_time": datetime.now().isoformat()
}
return {
"success": True,
"doc_id": doc_id,
"chunks": len(chunks),
"pages": pdf_data["metadata"]["pages"],
"title": pdf_data["metadata"]["title"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def search_relevant_chunks(self, query: str, top_k: int = 5) -> List[Dict]:
"""쿼리와 관련된 청크 검색"""
try:
# 쿼리 임베딩 생성
query_embedding = self.embedder.encode([query])
# 유사한 문서 검색
results = self.collection.query(
query_embeddings=query_embedding.tolist(),
n_results=top_k
)
if results and results['documents']:
chunks = []
for i in range(len(results['documents'][0])):
chunks.append({
"content": results['documents'][0][i],
"metadata": results['metadatas'][0][i],
"distance": results['distances'][0][i] if 'distances' in results else None
})
return chunks
return []
except Exception as e:
print(f"Search error: {e}")
return []
def generate_rag_prompt(self, query: str, context_chunks: List[Dict]) -> str:
"""RAG 프롬프트 생성"""
context = "\n\n---\n\n".join([
f"[출처: {chunk['metadata']['source']}, 청크 {chunk['metadata']['chunk_index']+1}]\n{chunk['content']}"
for chunk in context_chunks
])
prompt = f"""다음 문서 내용을 참고하여 질문에 답변해주세요.
답변은 제공된 문서 내용을 바탕으로 작성하되, 필요시 추가 설명을 포함할 수 있습니다.
문서에서 관련 정보를 찾을 수 없는 경우, 그 사실을 명시해주세요.
📚 참고 문서:
{context}
❓ 질문: {query}
💡 답변:"""
return prompt
# RAG 시스템 인스턴스 생성
rag_system = PDFRAGSystem()
# State variables
current_model = gr.State("openai/gpt-oss-120b")
uploaded_documents = gr.State({})
rag_enabled = gr.State(False)
def upload_pdf(file):
"""PDF 파일 업로드 처리"""
if file is None:
return gr.update(value="파일을 선택해주세요"), gr.update(choices=[]), gr.update(value=False)
try:
# 파일 해시를 ID로 사용
with open(file.name, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()[:8]
doc_id = f"doc_{file_hash}"
# PDF 처리 및 인덱싱
result = rag_system.process_and_index_pdf(file.name, doc_id)
if result["success"]:
status_html = f"""
✅ PDF 업로드 성공!
📄 제목: {result.get('title', 'Unknown')}
📑 페이지: {result['pages']}페이지
🔍 생성된 청크: {result['chunks']}개
🆔 문서 ID: {doc_id}
"""
# 문서 목록 업데이트
doc_list = list(rag_system.documents.keys())
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}"
for doc_id in doc_list]
return status_html, gr.update(choices=doc_choices, value=doc_choices), gr.update(value=True)
else:
status_html = f"""
❌ PDF 업로드 실패
오류: {result['error']}
"""
return status_html, gr.update(choices=[]), gr.update(value=False)
except Exception as e:
status_html = f"""
❌ 오류 발생: {str(e)}
"""
return status_html, gr.update(choices=[]), gr.update(value=False)
def clear_documents():
"""업로드된 문서 초기화"""
try:
# ChromaDB 컬렉션 재생성
rag_system.chroma_client.delete_collection("pdf_documents")
rag_system.collection = rag_system.chroma_client.create_collection(
name="pdf_documents",
metadata={"hnsw:space": "cosine"}
)
rag_system.documents = {}
return gr.update(value="✅ 모든 문서가 삭제되었습니다
"), gr.update(choices=[], value=[]), gr.update(value=False)
except Exception as e:
return gr.update(value=f"❌ 삭제 실패: {str(e)}
"), gr.update(), gr.update()
def process_with_rag(message: str, enable_rag: bool, selected_docs: List[str], top_k: int = 5):
"""RAG를 활용한 메시지 처리"""
if not enable_rag or not selected_docs:
return message # RAG 비활성화시 원본 메시지 반환
try:
# 관련 청크 검색
relevant_chunks = rag_system.search_relevant_chunks(message, top_k=top_k)
if relevant_chunks:
# 선택된 문서의 청크만 필터링
selected_doc_ids = [doc.split(":")[0] for doc in selected_docs]
filtered_chunks = [
chunk for chunk in relevant_chunks
if chunk['metadata']['doc_id'] in selected_doc_ids
]
if filtered_chunks:
# RAG 프롬프트 생성
rag_prompt = rag_system.generate_rag_prompt(message, filtered_chunks[:top_k])
return rag_prompt
return message
except Exception as e:
print(f"RAG processing error: {e}")
return message
def switch_model(model_choice):
"""모델 전환 함수"""
return gr.update(visible=False), gr.update(visible=True), model_choice
# Gradio 인터페이스
with gr.Blocks(fill_height=True, theme="Nymbo/Nymbo_Theme", css=custom_css) as demo:
with gr.Row():
# 사이드바
with gr.Column(scale=1):
with gr.Group(elem_classes="main-container"):
gr.Markdown("# 🚀 AI Chat with RAG")
gr.Markdown(
"PDF 문서를 업로드하여 AI가 문서 내용을 참고해 답변하도록 할 수 있습니다."
)
# 모델 선택
model_dropdown = gr.Dropdown(
choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"],
value="openai/gpt-oss-120b",
label="📊 모델 선택"
)
login_button = gr.LoginButton("Sign in with Hugging Face", size="lg")
reload_btn = gr.Button("🔄 모델 변경 적용", variant="primary", size="lg")
# RAG 설정
with gr.Accordion("📚 PDF RAG 설정", open=True):
pdf_upload = gr.File(
label="PDF 업로드",
file_types=[".pdf"],
type="filepath"
)
upload_status = gr.HTML(
value="PDF를 업로드하여 RAG를 활성화하세요
"
)
document_list = gr.CheckboxGroup(
choices=[],
label="📄 업로드된 문서",
info="질문에 참고할 문서를 선택하세요"
)
with gr.Row():
clear_btn = gr.Button("🗑️ 모든 문서 삭제", size="sm")
refresh_btn = gr.Button("🔄 목록 새로고침", size="sm")
enable_rag = gr.Checkbox(
label="RAG 활성화",
value=False,
info="문서 기반 답변 생성 활성화"
)
with gr.Accordion("⚙️ RAG 고급 설정", open=False):
top_k_chunks = gr.Slider(
minimum=1,
maximum=10,
value=5,
step=1,
label="참조할 청크 수",
info="답변 생성시 참고할 문서 청크의 개수"
)
chunk_size = gr.Slider(
minimum=500,
maximum=2000,
value=1000,
step=100,
label="청크 크기",
info="문서를 분할하는 청크의 크기 (문자 수)"
)
# 고급 옵션
with gr.Accordion("⚙️ 모델 설정", open=False):
temperature = gr.Slider(
minimum=0,
maximum=2,
value=0.7,
step=0.1,
label="Temperature"
)
max_tokens = gr.Slider(
minimum=1,
maximum=4096,
value=512,
step=1,
label="Max Tokens"
)
# 메인 채팅 영역
with gr.Column(scale=3):
with gr.Group(elem_classes="main-container"):
gr.Markdown("## 💬 Chat Interface")
# RAG 상태 표시
with gr.Row():
rag_status = gr.HTML(
value="🔍 RAG: 비활성화
"
)
# 모델 인터페이스 컨테이너
with gr.Column(visible=True) as model_120b_container:
gr.Markdown("### Model: openai/gpt-oss-120b")
# 실제 모델 로드는 gr.load()로 처리
chatbot_120b = gr.Chatbot(height=400)
msg_box_120b = gr.Textbox(
label="메시지 입력",
placeholder="PDF 내용에 대해 질문해보세요...",
lines=2
)
with gr.Row():
send_btn_120b = gr.Button("📤 전송", variant="primary")
clear_btn_120b = gr.Button("🗑️ 대화 초기화")
with gr.Column(visible=False) as model_20b_container:
gr.Markdown("### Model: openai/gpt-oss-20b")
chatbot_20b = gr.Chatbot(height=400)
msg_box_20b = gr.Textbox(
label="메시지 입력",
placeholder="PDF 내용에 대해 질문해보세요...",
lines=2
)
with gr.Row():
send_btn_20b = gr.Button("📤 전송", variant="primary")
clear_btn_20b = gr.Button("🗑️ 대화 초기화")
# 이벤트 핸들러
# PDF 업로드 처리
pdf_upload.upload(
fn=upload_pdf,
inputs=[pdf_upload],
outputs=[upload_status, document_list, enable_rag]
)
# 문서 초기화
clear_btn.click(
fn=clear_documents,
outputs=[upload_status, document_list, enable_rag]
)
# RAG 상태 업데이트
enable_rag.change(
fn=lambda x: gr.update(
value=f"🔍 RAG: {'활성화' if x else '비활성화'}
"
),
inputs=[enable_rag],
outputs=[rag_status]
)
# 모델 전환
reload_btn.click(
fn=switch_model,
inputs=[model_dropdown],
outputs=[model_120b_container, model_20b_container, current_model]
).then(
fn=lambda: gr.Info("모델이 성공적으로 전환되었습니다!"),
inputs=[],
outputs=[]
)
# 채팅 기능 (RAG 통합)
def chat_with_rag(message, history, enable_rag, selected_docs, top_k):
"""RAG를 활용한 채팅"""
# RAG 처리
processed_message = process_with_rag(message, enable_rag, selected_docs, top_k)
# 여기에 실제 모델 API 호출 코드가 들어가야 함
# 현재는 예시 응답
if enable_rag and selected_docs:
response = f"[RAG 활성화] 선택된 {len(selected_docs)}개 문서를 참고하여 답변합니다:\n\n{processed_message[:200]}..."
else:
response = f"[일반 모드] {message}에 대한 답변입니다."
history.append((message, response))
return "", history
# 120b 모델 채팅
msg_box_120b.submit(
fn=chat_with_rag,
inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks],
outputs=[msg_box_120b, chatbot_120b]
)
send_btn_120b.click(
fn=chat_with_rag,
inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks],
outputs=[msg_box_120b, chatbot_120b]
)
clear_btn_120b.click(
lambda: ([], ""),
outputs=[chatbot_120b, msg_box_120b]
)
# 20b 모델 채팅
msg_box_20b.submit(
fn=chat_with_rag,
inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks],
outputs=[msg_box_20b, chatbot_20b]
)
send_btn_20b.click(
fn=chat_with_rag,
inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks],
outputs=[msg_box_20b, chatbot_20b]
)
clear_btn_20b.click(
lambda: ([], ""),
outputs=[chatbot_20b, msg_box_20b]
)
if __name__ == "__main__":
demo.launch()