import gradio as gr
import os
from typing import List, Dict, Any, Optional, Tuple
import hashlib
from datetime import datetime
import numpy as np
from gradio_client import Client
# PDF 처리 라이브러리
try:
import fitz # PyMuPDF
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
print("⚠️ PyMuPDF not installed. Install with: pip install pymupdf")
try:
from sentence_transformers import SentenceTransformer
ST_AVAILABLE = True
except ImportError:
ST_AVAILABLE = False
print("⚠️ Sentence Transformers not installed. Install with: pip install sentence-transformers")
# Soft and bright custom CSS
custom_css = """
.gradio-container {
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
min-height: 100vh;
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
}
.main-container {
background: rgba(255, 255, 255, 0.98);
border-radius: 16px;
padding: 24px;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06);
border: 1px solid rgba(0, 0, 0, 0.05);
margin: 12px;
}
.main-container:hover {
box-shadow: 0 10px 15px -3px rgba(0, 0, 0, 0.1), 0 4px 6px -2px rgba(0, 0, 0, 0.05);
transition: all 0.3s ease;
}
/* Status messages styling */
.pdf-status {
padding: 12px 16px;
border-radius: 12px;
margin: 12px 0;
font-size: 0.95rem;
font-weight: 500;
backdrop-filter: blur(10px);
}
.pdf-success {
background: linear-gradient(135deg, #d4edda 0%, #c3e6cb 100%);
border: 1px solid #b1dfbb;
color: #155724;
}
.pdf-error {
background: linear-gradient(135deg, #f8d7da 0%, #f5c6cb 100%);
border: 1px solid #f1aeb5;
color: #721c24;
}
.pdf-info {
background: linear-gradient(135deg, #d1ecf1 0%, #bee5eb 100%);
border: 1px solid #9ec5d8;
color: #0c5460;
}
.pdf-warning {
background: linear-gradient(135deg, #fff3cd 0%, #ffeeba 100%);
border: 1px solid #ffeaa7;
color: #856404;
}
/* RAG context display */
.rag-context {
background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%);
border-left: 4px solid #f59e0b;
padding: 16px;
margin: 16px 0;
border-radius: 8px;
font-size: 0.9rem;
}
/* Chat message styling */
.message {
padding: 12px 16px;
margin: 8px 4px;
border-radius: 12px;
max-width: 80%;
}
.user-message {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
margin-left: auto;
}
.bot-message {
background: #f3f4f6;
color: #1f2937;
}
"""
class SimpleTextSplitter:
"""텍스트 분할기"""
def __init__(self, chunk_size=800, chunk_overlap=100):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_text(self, text: str) -> List[str]:
"""텍스트를 청크로 분할"""
chunks = []
sentences = text.split('. ')
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < self.chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
class PDFRAGSystem:
"""PDF 기반 RAG 시스템"""
def __init__(self):
self.documents = {}
self.document_chunks = {}
self.embeddings_store = {}
self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100)
# 임베딩 모델 초기화
self.embedder = None
if ST_AVAILABLE:
try:
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
print("✅ 임베딩 모델 로드 성공")
except Exception as e:
print(f"⚠️ 임베딩 모델 로드 실패: {e}")
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]:
"""PDF에서 텍스트 추출"""
if not PDF_AVAILABLE:
return {
"metadata": {
"title": "PDF Reader Not Available",
"file_name": os.path.basename(pdf_path),
"pages": 0
},
"full_text": "PDF 처리를 위해 'pip install pymupdf'를 실행해주세요."
}
try:
doc = fitz.open(pdf_path)
text_content = []
metadata = {
"title": doc.metadata.get("title", os.path.basename(pdf_path)),
"pages": len(doc),
"file_name": os.path.basename(pdf_path)
}
for page_num, page in enumerate(doc):
text = page.get_text()
if text.strip():
text_content.append(text)
doc.close()
return {
"metadata": metadata,
"full_text": "\n\n".join(text_content)
}
except Exception as e:
raise Exception(f"PDF 처리 오류: {str(e)}")
def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]:
"""PDF 처리 및 저장"""
try:
# PDF 텍스트 추출
pdf_data = self.extract_text_from_pdf(pdf_path)
# 텍스트를 청크로 분할
chunks = self.text_splitter.split_text(pdf_data["full_text"])
# 청크 저장
self.document_chunks[doc_id] = chunks
# 임베딩 생성
if self.embedder:
embeddings = self.embedder.encode(chunks)
self.embeddings_store[doc_id] = embeddings
# 문서 정보 저장
self.documents[doc_id] = {
"metadata": pdf_data["metadata"],
"chunk_count": len(chunks),
"upload_time": datetime.now().isoformat()
}
return {
"success": True,
"doc_id": doc_id,
"chunks": len(chunks),
"pages": pdf_data["metadata"]["pages"],
"title": pdf_data["metadata"]["title"]
}
except Exception as e:
return {"success": False, "error": str(e)}
def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]:
"""관련 청크 검색"""
all_relevant_chunks = []
if self.embedder and self.embeddings_store:
# 임베딩 기반 검색
query_embedding = self.embedder.encode([query])[0]
for doc_id in doc_ids:
if doc_id in self.embeddings_store and doc_id in self.document_chunks:
doc_embeddings = self.embeddings_store[doc_id]
chunks = self.document_chunks[doc_id]
# 코사인 유사도 계산
similarities = []
for emb in doc_embeddings:
sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb))
similarities.append(sim)
# 상위 청크 선택
top_indices = np.argsort(similarities)[-top_k:][::-1]
for idx in top_indices:
if similarities[idx] > 0.2:
all_relevant_chunks.append({
"content": chunks[idx],
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": similarities[idx]
})
else:
# 키워드 기반 검색
query_keywords = set(query.lower().split())
for doc_id in doc_ids:
if doc_id in self.document_chunks:
chunks = self.document_chunks[doc_id]
for chunk in chunks[:top_k]:
chunk_lower = chunk.lower()
score = sum(1 for keyword in query_keywords if keyword in chunk_lower)
if score > 0:
all_relevant_chunks.append({
"content": chunk[:500],
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": score / len(query_keywords) if query_keywords else 0
})
# 정렬 및 반환
all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True)
return all_relevant_chunks[:top_k]
def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str:
"""RAG 프롬프트 생성"""
relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k)
if not relevant_chunks:
return query
# 프롬프트 구성
prompt_parts = []
prompt_parts.append("아래 참고 문서를 바탕으로 질문에 답변해주세요.\n")
prompt_parts.append("=" * 50)
for i, chunk in enumerate(relevant_chunks, 1):
prompt_parts.append(f"\n[참고문서 {i} - {chunk['doc_name']}]")
content = chunk['content'][:400] if len(chunk['content']) > 400 else chunk['content']
prompt_parts.append(content)
prompt_parts.append("")
prompt_parts.append("=" * 50)
prompt_parts.append(f"\n질문: {query}")
prompt_parts.append("\n위 참고문서의 내용을 바탕으로 정확하고 상세하게 답변해주세요:")
return "\n".join(prompt_parts)
# RAG 시스템 인스턴스 생성
rag_system = PDFRAGSystem()
# State variables
current_model = gr.State("openai/gpt-oss-120b")
conversation_history = gr.State([])
def upload_pdf(file):
"""PDF 파일 업로드 처리"""
if file is None:
return (
gr.update(value="
📁 파일을 선택해주세요
"),
gr.update(choices=[]),
gr.update(value=False)
)
try:
# 파일 해시를 ID로 사용
with open(file.name, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()[:8]
doc_id = f"doc_{file_hash}"
# PDF 처리 및 저장
result = rag_system.process_and_store_pdf(file.name, doc_id)
if result["success"]:
status_html = f"""
✅ PDF 업로드 완료
📄 파일: {result['title']}
📑 페이지: {result['pages']}페이지
🔍 청크: {result['chunks']}개 생성
"""
# 문서 목록 업데이트
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}"
for doc_id in rag_system.documents.keys()]
return (
status_html,
gr.update(choices=doc_choices, value=doc_choices),
gr.update(value=True)
)
else:
status_html = f"""
❌ 업로드 실패: {result['error']}
"""
return status_html, gr.update(), gr.update(value=False)
except Exception as e:
return (
f"❌ 오류: {str(e)}
",
gr.update(),
gr.update(value=False)
)
def clear_documents():
"""문서 초기화"""
rag_system.documents = {}
rag_system.document_chunks = {}
rag_system.embeddings_store = {}
return (
gr.update(value="🗑️ 모든 문서가 삭제되었습니다
"),
gr.update(choices=[], value=[]),
gr.update(value=False)
)
def switch_model(model_choice):
"""Function to switch between models"""
if model_choice == "openai/gpt-oss-120b":
return gr.update(visible=True), gr.update(visible=False), model_choice
else:
return gr.update(visible=False), gr.update(visible=True), model_choice
def chat_with_rag(message, history, model_name, enable_rag, selected_docs, top_k, temperature, max_tokens):
"""RAG를 적용한 채팅 함수"""
if not message:
return history
# RAG 적용
if enable_rag and selected_docs:
doc_ids = [doc.split(":")[0] for doc in selected_docs]
enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k)
# 디버그: RAG 적용 확인
print(f"RAG 적용됨 - 원본: {len(message)}자, 강화: {len(enhanced_message)}자")
else:
enhanced_message = message
try:
# 여기서 실제 모델 API를 호출해야 합니다
# 임시로 모의 응답 생성
if enable_rag and selected_docs:
response = f"""📚 [RAG 기반 답변]
문서를 참고하여 답변드립니다:
{enhanced_message[:500]}...
[참고: 실제 모델 API 연결 필요]
"""
else:
response = f"""💬 [일반 답변]
질문: {message}
[참고: 실제 모델 API 연결 필요]
"""
# 대화 기록에 추가
history.append([message, response])
except Exception as e:
response = f"❌ 오류 발생: {str(e)}"
history.append([message, response])
return history
# Main interface with soft theme
with gr.Blocks(fill_height=True, theme=gr.themes.Soft(), css=custom_css) as demo:
with gr.Row():
# Sidebar
with gr.Column(scale=1):
with gr.Group(elem_classes="main-container"):
gr.Markdown("# 🤖 AI Chat + RAG")
gr.Markdown(
"OpenAI GPT-OSS 모델과 PDF 문서 기반 답변 시스템입니다."
)
# Login button
login_button = gr.LoginButton("🔐 Hugging Face 로그인", size="lg")
# Model selection
model_dropdown = gr.Dropdown(
choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"],
value="openai/gpt-oss-120b",
label="📊 모델 선택",
info="원하는 모델 크기를 선택하세요"
)
# Reload button to apply model change
reload_btn = gr.Button("🔄 모델 변경 적용", variant="primary", size="lg")
# RAG Settings
with gr.Accordion("📚 PDF RAG 설정", open=True):
pdf_upload = gr.File(
label="📤 PDF 업로드",
file_types=[".pdf"],
type="filepath"
)
upload_status = gr.HTML(
value="📁 PDF를 업로드하여 문서 기반 답변을 받으세요
"
)
document_list = gr.CheckboxGroup(
choices=[],
label="📄 업로드된 문서",
info="참고할 문서를 선택하세요"
)
with gr.Row():
clear_btn = gr.Button("🗑️ 모든 문서 삭제", size="sm", variant="secondary")
enable_rag = gr.Checkbox(
label="✨ RAG 활성화",
value=False,
info="선택한 문서를 참고하여 답변 생성"
)
top_k_chunks = gr.Slider(
minimum=1,
maximum=5,
value=3,
step=1,
label="참조 청크 수",
info="답변시 참고할 문서 조각 개수"
)
# Additional options
with gr.Accordion("🎛️ 모델 옵션", open=False):
temperature = gr.Slider(
minimum=0,
maximum=2,
value=0.7,
step=0.1,
label="Temperature",
info="낮을수록 일관성 있고, 높을수록 창의적입니다"
)
max_tokens = gr.Slider(
minimum=1,
maximum=4096,
value=512,
step=1,
label="Max Tokens",
info="생성할 최대 토큰 수"
)
# Main chat area
with gr.Column(scale=3):
with gr.Group(elem_classes="main-container"):
gr.Markdown("## 💬 Chat Interface")
# RAG 상태 표시
rag_status = gr.HTML(
value="🔍 RAG: 비활성화
"
)
# 통합된 채팅 인터페이스 (모델별로 하나씩)
with gr.Column(visible=True) as model_120b_container:
gr.Markdown("### 🚀 Model: openai/gpt-oss-120b")
chatbot_120b = gr.Chatbot(
height=400,
show_label=False,
elem_classes="chatbot"
)
with gr.Row():
msg_120b = gr.Textbox(
placeholder="메시지를 입력하세요... (Enter로 전송)",
show_label=False,
scale=4,
container=False
)
send_btn_120b = gr.Button("📤 전송", variant="primary", scale=1)
with gr.Row():
clear_btn_120b = gr.Button("🗑️ 대화 초기화", variant="secondary", size="sm")
# 예제 질문들
gr.Examples(
examples=[
"문서의 주요 내용을 요약해주세요",
"이 문서에서 가장 중요한 포인트는 무엇인가요?",
"문서에 언급된 날짜와 일정을 알려주세요"
],
inputs=msg_120b
)
with gr.Column(visible=False) as model_20b_container:
gr.Markdown("### 🚀 Model: openai/gpt-oss-20b")
chatbot_20b = gr.Chatbot(
height=400,
show_label=False,
elem_classes="chatbot"
)
with gr.Row():
msg_20b = gr.Textbox(
placeholder="메시지를 입력하세요... (Enter로 전송)",
show_label=False,
scale=4,
container=False
)
send_btn_20b = gr.Button("📤 전송", variant="primary", scale=1)
with gr.Row():
clear_btn_20b = gr.Button("🗑️ 대화 초기화", variant="secondary", size="sm")
# 예제 질문들
gr.Examples(
examples=[
"문서의 주요 내용을 요약해주세요",
"이 문서에서 가장 중요한 포인트는 무엇인가요?",
"문서에 언급된 날짜와 일정을 알려주세요"
],
inputs=msg_20b
)
# Event Handlers
# PDF 업로드
pdf_upload.upload(
fn=upload_pdf,
inputs=[pdf_upload],
outputs=[upload_status, document_list, enable_rag]
)
# 문서 삭제
clear_btn.click(
fn=clear_documents,
outputs=[upload_status, document_list, enable_rag]
)
# RAG 상태 업데이트
enable_rag.change(
fn=lambda x: gr.update(
value=f"🔍 RAG: {'✅ 활성화' if x else '⭕ 비활성화'}
"
),
inputs=[enable_rag],
outputs=[rag_status]
)
# 모델 전환
reload_btn.click(
fn=switch_model,
inputs=[model_dropdown],
outputs=[model_120b_container, model_20b_container, current_model]
).then(
fn=lambda: gr.Info("✅ 모델이 성공적으로 전환되었습니다!"),
inputs=[],
outputs=[]
)
# 120b 모델 채팅 이벤트
msg_120b.submit(
fn=lambda msg, hist: chat_with_rag(
msg, hist, "openai/gpt-oss-120b",
enable_rag.value, document_list.value, top_k_chunks.value,
temperature.value, max_tokens.value
),
inputs=[msg_120b, chatbot_120b],
outputs=[chatbot_120b]
).then(
fn=lambda: "",
outputs=[msg_120b]
)
send_btn_120b.click(
fn=lambda msg, hist: chat_with_rag(
msg, hist, "openai/gpt-oss-120b",
enable_rag.value, document_list.value, top_k_chunks.value,
temperature.value, max_tokens.value
),
inputs=[msg_120b, chatbot_120b],
outputs=[chatbot_120b]
).then(
fn=lambda: "",
outputs=[msg_120b]
)
clear_btn_120b.click(
fn=lambda: [],
outputs=[chatbot_120b]
)
# 20b 모델 채팅 이벤트
msg_20b.submit(
fn=lambda msg, hist: chat_with_rag(
msg, hist, "openai/gpt-oss-20b",
enable_rag.value, document_list.value, top_k_chunks.value,
temperature.value, max_tokens.value
),
inputs=[msg_20b, chatbot_20b],
outputs=[chatbot_20b]
).then(
fn=lambda: "",
outputs=[msg_20b]
)
send_btn_20b.click(
fn=lambda msg, hist: chat_with_rag(
msg, hist, "openai/gpt-oss-20b",
enable_rag.value, document_list.value, top_k_chunks.value,
temperature.value, max_tokens.value
),
inputs=[msg_20b, chatbot_20b],
outputs=[chatbot_20b]
).then(
fn=lambda: "",
outputs=[msg_20b]
)
clear_btn_20b.click(
fn=lambda: [],
outputs=[chatbot_20b]
)
# 실제 모델 API 연결을 위한 함수 (구현 필요)
def connect_to_model_api(model_name, message, temperature, max_tokens):
"""
실제 모델 API에 연결하는 함수
TODO: 여기에 실제 API 호출 코드를 구현해야 합니다
예시:
- OpenAI API
- Hugging Face Inference API
- Custom model endpoint
"""
# client = Client(f"models/{model_name}")
# response = client.predict(message, temperature=temperature, max_tokens=max_tokens)
# return response
pass
demo.launch()