gpt-oss-RAG / app.py
openfree's picture
Update app.py
97286e7 verified
raw
history blame
20.4 kB
import gradio as gr
import os
from typing import List, Dict, Any, Optional, Tuple
import hashlib
from datetime import datetime
import numpy as np
# PDF ์ฒ˜๋ฆฌ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
try:
import fitz # PyMuPDF
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
print("โš ๏ธ PyMuPDF not installed. Install with: pip install pymupdf")
try:
from sentence_transformers import SentenceTransformer
ST_AVAILABLE = True
except ImportError:
ST_AVAILABLE = False
print("โš ๏ธ Sentence Transformers not installed. Install with: pip install sentence-transformers")
# Soft and bright custom CSS
custom_css = """
.gradio-container {
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
min-height: 100vh;
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
}
.main-container {
background: rgba(255, 255, 255, 0.98);
border-radius: 16px;
padding: 24px;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06);
border: 1px solid rgba(0, 0, 0, 0.05);
margin: 12px;
}
/* Status messages styling */
.pdf-status {
padding: 12px 16px;
border-radius: 12px;
margin: 12px 0;
font-size: 0.95rem;
font-weight: 500;
}
.pdf-success {
background: linear-gradient(135deg, #d4edda 0%, #c3e6cb 100%);
border: 1px solid #b1dfbb;
color: #155724;
}
.pdf-error {
background: linear-gradient(135deg, #f8d7da 0%, #f5c6cb 100%);
border: 1px solid #f1aeb5;
color: #721c24;
}
.pdf-info {
background: linear-gradient(135deg, #d1ecf1 0%, #bee5eb 100%);
border: 1px solid #9ec5d8;
color: #0c5460;
}
.rag-context {
background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%);
border-left: 4px solid #f59e0b;
padding: 12px;
margin: 12px 0;
border-radius: 8px;
font-size: 0.9rem;
}
"""
class SimpleTextSplitter:
"""ํ…์ŠคํŠธ ๋ถ„ํ• ๊ธฐ"""
def __init__(self, chunk_size=800, chunk_overlap=100):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_text(self, text: str) -> List[str]:
"""ํ…์ŠคํŠธ๋ฅผ ์ฒญํฌ๋กœ ๋ถ„ํ• """
chunks = []
sentences = text.split('. ')
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < self.chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
class PDFRAGSystem:
"""PDF ๊ธฐ๋ฐ˜ RAG ์‹œ์Šคํ…œ"""
def __init__(self):
self.documents = {}
self.document_chunks = {}
self.embeddings_store = {}
self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100)
# ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ์ดˆ๊ธฐํ™”
self.embedder = None
if ST_AVAILABLE:
try:
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
print("โœ… ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ๋กœ๋“œ ์„ฑ๊ณต")
except Exception as e:
print(f"โš ๏ธ ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ๋กœ๋“œ ์‹คํŒจ: {e}")
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]:
"""PDF์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ"""
if not PDF_AVAILABLE:
return {
"metadata": {
"title": "PDF Reader Not Available",
"file_name": os.path.basename(pdf_path),
"pages": 0
},
"full_text": "PDF ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด 'pip install pymupdf'๋ฅผ ์‹คํ–‰ํ•ด์ฃผ์„ธ์š”."
}
try:
doc = fitz.open(pdf_path)
text_content = []
metadata = {
"title": doc.metadata.get("title", os.path.basename(pdf_path)),
"pages": len(doc),
"file_name": os.path.basename(pdf_path)
}
for page_num, page in enumerate(doc):
text = page.get_text()
if text.strip():
text_content.append(text)
doc.close()
return {
"metadata": metadata,
"full_text": "\n\n".join(text_content)
}
except Exception as e:
raise Exception(f"PDF ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜: {str(e)}")
def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]:
"""PDF ์ฒ˜๋ฆฌ ๋ฐ ์ €์žฅ"""
try:
# PDF ํ…์ŠคํŠธ ์ถ”์ถœ
pdf_data = self.extract_text_from_pdf(pdf_path)
# ํ…์ŠคํŠธ๋ฅผ ์ฒญํฌ๋กœ ๋ถ„ํ• 
chunks = self.text_splitter.split_text(pdf_data["full_text"])
# ์ฒญํฌ ์ €์žฅ
self.document_chunks[doc_id] = chunks
# ์ž„๋ฒ ๋”ฉ ์ƒ์„ฑ
if self.embedder:
embeddings = self.embedder.encode(chunks)
self.embeddings_store[doc_id] = embeddings
# ๋ฌธ์„œ ์ •๋ณด ์ €์žฅ
self.documents[doc_id] = {
"metadata": pdf_data["metadata"],
"chunk_count": len(chunks),
"upload_time": datetime.now().isoformat()
}
return {
"success": True,
"doc_id": doc_id,
"chunks": len(chunks),
"pages": pdf_data["metadata"]["pages"],
"title": pdf_data["metadata"]["title"]
}
except Exception as e:
return {"success": False, "error": str(e)}
def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]:
"""๊ด€๋ จ ์ฒญํฌ ๊ฒ€์ƒ‰"""
all_relevant_chunks = []
if self.embedder and self.embeddings_store:
# ์ž„๋ฒ ๋”ฉ ๊ธฐ๋ฐ˜ ๊ฒ€์ƒ‰
query_embedding = self.embedder.encode([query])[0]
for doc_id in doc_ids:
if doc_id in self.embeddings_store and doc_id in self.document_chunks:
doc_embeddings = self.embeddings_store[doc_id]
chunks = self.document_chunks[doc_id]
# ์ฝ”์‚ฌ์ธ ์œ ์‚ฌ๋„ ๊ณ„์‚ฐ
similarities = []
for emb in doc_embeddings:
sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb))
similarities.append(sim)
# ์ƒ์œ„ ์ฒญํฌ ์„ ํƒ
top_indices = np.argsort(similarities)[-top_k:][::-1]
for idx in top_indices:
if similarities[idx] > 0.2:
all_relevant_chunks.append({
"content": chunks[idx],
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": similarities[idx]
})
else:
# ํ‚ค์›Œ๋“œ ๊ธฐ๋ฐ˜ ๊ฒ€์ƒ‰
query_keywords = set(query.lower().split())
for doc_id in doc_ids:
if doc_id in self.document_chunks:
chunks = self.document_chunks[doc_id]
for i, chunk in enumerate(chunks[:5]): # ์ฒ˜์Œ 5๊ฐœ๋งŒ
chunk_lower = chunk.lower()
score = sum(1 for keyword in query_keywords if keyword in chunk_lower)
if score > 0:
all_relevant_chunks.append({
"content": chunk[:500],
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": score / len(query_keywords) if query_keywords else 0
})
# ์ •๋ ฌ ๋ฐ ๋ฐ˜ํ™˜
all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True)
return all_relevant_chunks[:top_k]
def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str:
"""RAG ํ”„๋กฌํ”„ํŠธ ์ƒ์„ฑ"""
relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k)
if not relevant_chunks:
return query
# ํ”„๋กฌํ”„ํŠธ ๊ตฌ์„ฑ
prompt_parts = []
prompt_parts.append("๋‹ค์Œ ๋ฌธ์„œ ๋‚ด์šฉ์„ ์ฐธ๊ณ ํ•˜์—ฌ ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”:\n")
prompt_parts.append("=" * 40)
for i, chunk in enumerate(relevant_chunks, 1):
prompt_parts.append(f"\n[์ฐธ๊ณ  {i} - {chunk['doc_name']}]")
content = chunk['content'][:300] if len(chunk['content']) > 300 else chunk['content']
prompt_parts.append(content)
prompt_parts.append("\n" + "=" * 40)
prompt_parts.append(f"\n์งˆ๋ฌธ: {query}")
return "\n".join(prompt_parts)
# RAG ์‹œ์Šคํ…œ ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ
rag_system = PDFRAGSystem()
# State variable to track current model and RAG settings
current_model = gr.State("openai/gpt-oss-120b")
rag_enabled_state = gr.State(False)
selected_docs_state = gr.State([])
top_k_state = gr.State(3)
def upload_pdf(file):
"""PDF ํŒŒ์ผ ์—…๋กœ๋“œ ์ฒ˜๋ฆฌ"""
if file is None:
return (
gr.update(value="<div class='pdf-status pdf-info'>๐Ÿ“ ํŒŒ์ผ์„ ์„ ํƒํ•ด์ฃผ์„ธ์š”</div>"),
gr.update(choices=[]),
gr.update(value=False)
)
try:
# ํŒŒ์ผ ํ•ด์‹œ๋ฅผ ID๋กœ ์‚ฌ์šฉ
with open(file.name, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()[:8]
doc_id = f"doc_{file_hash}"
# PDF ์ฒ˜๋ฆฌ ๋ฐ ์ €์žฅ
result = rag_system.process_and_store_pdf(file.name, doc_id)
if result["success"]:
status_html = f"""
<div class="pdf-status pdf-success">
โœ… PDF ์—…๋กœ๋“œ ์™„๋ฃŒ!<br>
๐Ÿ“„ {result['title']}<br>
๐Ÿ“‘ {result['pages']} ํŽ˜์ด์ง€ | ๐Ÿ” {result['chunks']} ์ฒญํฌ
</div>
"""
# ๋ฌธ์„œ ๋ชฉ๋ก ์—…๋ฐ์ดํŠธ
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}"
for doc_id in rag_system.documents.keys()]
return (
status_html,
gr.update(choices=doc_choices, value=doc_choices),
gr.update(value=True)
)
else:
return (
f"<div class='pdf-status pdf-error'>โŒ ์˜ค๋ฅ˜: {result['error']}</div>",
gr.update(),
gr.update(value=False)
)
except Exception as e:
return (
f"<div class='pdf-status pdf-error'>โŒ ์˜ค๋ฅ˜: {str(e)}</div>",
gr.update(),
gr.update(value=False)
)
def clear_documents():
"""๋ฌธ์„œ ์ดˆ๊ธฐํ™”"""
rag_system.documents = {}
rag_system.document_chunks = {}
rag_system.embeddings_store = {}
return (
gr.update(value="<div class='pdf-status pdf-info'>๐Ÿ—‘๏ธ ๋ชจ๋“  ๋ฌธ์„œ๊ฐ€ ์‚ญ์ œ๋˜์—ˆ์Šต๋‹ˆ๋‹ค</div>"),
gr.update(choices=[], value=[]),
gr.update(value=False)
)
def switch_model(model_choice):
"""Function to switch between models"""
return gr.update(visible=False), gr.update(visible=True), model_choice
def create_rag_wrapper(original_fn, model_name):
"""์›๋ณธ ๋ชจ๋ธ ํ•จ์ˆ˜๋ฅผ RAG๋กœ ๊ฐ์‹ธ๋Š” ๋ž˜ํผ ์ƒ์„ฑ"""
def wrapped_fn(message, history=None):
# RAG ์„ค์ • ๊ฐ€์ ธ์˜ค๊ธฐ
if rag_enabled_state.value and selected_docs_state.value:
doc_ids = [doc.split(":")[0] for doc in selected_docs_state.value]
enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k_state.value)
# RAG ์ ์šฉ ์•Œ๋ฆผ
print(f"๐Ÿ” RAG ์ ์šฉ: {len(message)}์ž โ†’ {len(enhanced_message)}์ž")
# ์›๋ณธ ๋ชจ๋ธ์— ๊ฐ•ํ™”๋œ ๋ฉ”์‹œ์ง€ ์ „๋‹ฌ
if history is not None:
return original_fn(enhanced_message, history)
else:
return original_fn(enhanced_message)
else:
# RAG ๋ฏธ์ ์šฉ์‹œ ์›๋ณธ ๋ฉ”์‹œ์ง€ ๊ทธ๋Œ€๋กœ ์ „๋‹ฌ
if history is not None:
return original_fn(message, history)
else:
return original_fn(message)
return wrapped_fn
# Main interface with soft theme
with gr.Blocks(fill_height=True, theme=gr.themes.Soft(), css=custom_css) as demo:
with gr.Row():
# Sidebar
with gr.Column(scale=1):
with gr.Group(elem_classes="main-container"):
gr.Markdown("# ๐Ÿš€ Inference Provider + RAG")
gr.Markdown(
"OpenAI GPT-OSS models served by Cerebras API. "
"Upload PDF documents for context-aware responses."
)
# Model selection
model_dropdown = gr.Dropdown(
choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"],
value="openai/gpt-oss-120b",
label="๐Ÿ“Š Select Model",
info="Choose between different model sizes"
)
# Login button
login_button = gr.LoginButton("Sign in with Hugging Face", size="lg")
# Reload button to apply model change
reload_btn = gr.Button("๐Ÿ”„ Apply Model Change", variant="primary", size="lg")
# RAG Settings
with gr.Accordion("๐Ÿ“š PDF RAG Settings", open=True):
pdf_upload = gr.File(
label="Upload PDF",
file_types=[".pdf"],
type="filepath"
)
upload_status = gr.HTML(
value="<div class='pdf-status pdf-info'>๐Ÿ“ค Upload a PDF to enable document-based answers</div>"
)
document_list = gr.CheckboxGroup(
choices=[],
label="๐Ÿ“„ Uploaded Documents",
info="Select documents to use as context"
)
clear_btn = gr.Button("๐Ÿ—‘๏ธ Clear All Documents", size="sm", variant="secondary")
enable_rag = gr.Checkbox(
label="โœจ Enable RAG",
value=False,
info="Use documents for context-aware responses"
)
top_k_chunks = gr.Slider(
minimum=1,
maximum=5,
value=3,
step=1,
label="Context Chunks",
info="Number of document chunks to use"
)
# Additional options
with gr.Accordion("โš™๏ธ Advanced Options", open=False):
gr.Markdown("*These options will be available after model implementation*")
temperature = gr.Slider(
minimum=0,
maximum=2,
value=0.7,
step=0.1,
label="Temperature"
)
max_tokens = gr.Slider(
minimum=1,
maximum=4096,
value=512,
step=1,
label="Max Tokens"
)
# Main chat area
with gr.Column(scale=3):
with gr.Group(elem_classes="main-container"):
gr.Markdown("## ๐Ÿ’ฌ Chat Interface")
# RAG status
rag_status = gr.HTML(
value="<div class='pdf-status pdf-info'>๐Ÿ” RAG: <strong>Disabled</strong></div>"
)
# RAG context preview
context_preview = gr.HTML(value="", visible=False)
# Container for model interfaces
with gr.Column(visible=True) as model_120b_container:
gr.Markdown("### Model: openai/gpt-oss-120b")
# Load the original model and wrap it with RAG
original_interface_120b = gr.load(
"models/openai/gpt-oss-120b",
accept_token=login_button,
provider="fireworks-ai"
)
# Note: The loaded interface will have its own chat components
# We'll intercept the messages through our wrapper function
with gr.Column(visible=False) as model_20b_container:
gr.Markdown("### Model: openai/gpt-oss-20b")
# Load the original model
original_interface_20b = gr.load(
"models/openai/gpt-oss-20b",
accept_token=login_button,
provider="fireworks-ai"
)
# Event Handlers
# PDF upload
pdf_upload.upload(
fn=upload_pdf,
inputs=[pdf_upload],
outputs=[upload_status, document_list, enable_rag]
)
# Clear documents
clear_btn.click(
fn=clear_documents,
outputs=[upload_status, document_list, enable_rag]
)
# Update RAG state when settings change
def update_rag_state(enabled, docs, k):
rag_enabled_state.value = enabled
selected_docs_state.value = docs if docs else []
top_k_state.value = k
status = "โœ… Enabled" if enabled and docs else "โญ• Disabled"
status_html = f"<div class='pdf-status pdf-info'>๐Ÿ” RAG: <strong>{status}</strong></div>"
# Show context preview if RAG is enabled
if enabled and docs:
preview = f"<div class='rag-context'>๐Ÿ“š Using {len(docs)} document(s) with {k} chunks per query</div>"
return gr.update(value=status_html), gr.update(value=preview, visible=True)
else:
return gr.update(value=status_html), gr.update(value="", visible=False)
# Connect RAG state updates
enable_rag.change(
fn=update_rag_state,
inputs=[enable_rag, document_list, top_k_chunks],
outputs=[rag_status, context_preview]
)
document_list.change(
fn=update_rag_state,
inputs=[enable_rag, document_list, top_k_chunks],
outputs=[rag_status, context_preview]
)
top_k_chunks.change(
fn=update_rag_state,
inputs=[enable_rag, document_list, top_k_chunks],
outputs=[rag_status, context_preview]
)
# Handle model switching
reload_btn.click(
fn=switch_model,
inputs=[model_dropdown],
outputs=[model_120b_container, model_20b_container, current_model]
).then(
fn=lambda: gr.Info("Model switched successfully!"),
inputs=[],
outputs=[]
)
# Update visibility based on dropdown selection
def update_visibility(model_choice):
if model_choice == "openai/gpt-oss-120b":
return gr.update(visible=True), gr.update(visible=False)
else:
return gr.update(visible=False), gr.update(visible=True)
model_dropdown.change(
fn=update_visibility,
inputs=[model_dropdown],
outputs=[model_120b_container, model_20b_container]
)
# Monkey-patch the loaded interfaces to add RAG support
# This is done after the interface is loaded
demo.load = lambda: print("๐Ÿ“š RAG System Ready!")
demo.launch()