gpt-oss-RAG / app.py
openfree's picture
Update app.py
6ed5f50 verified
raw
history blame
22.3 kB
import gradio as gr
import os
from typing import List, Dict, Any, Optional
import hashlib
from datetime import datetime
import numpy as np
# PDF ์ฒ˜๋ฆฌ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
try:
import fitz # PyMuPDF
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
print("โš ๏ธ PyMuPDF not installed. Install with: pip install pymupdf")
try:
from sentence_transformers import SentenceTransformer
ST_AVAILABLE = True
except ImportError:
ST_AVAILABLE = False
print("โš ๏ธ Sentence Transformers not installed. Install with: pip install sentence-transformers")
# Custom CSS for gradient background and styling
custom_css = """
.gradio-container {
background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%);
background-size: 400% 400%;
animation: gradient-animation 15s ease infinite;
min-height: 100vh;
}
@keyframes gradient-animation {
0% { background-position: 0% 50%; }
50% { background-position: 100% 50%; }
100% { background-position: 0% 50%; }
}
.dark .gradio-container {
background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%);
background-size: 400% 400%;
animation: gradient-animation 15s ease infinite;
}
.main-container {
background-color: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 20px;
padding: 20px;
box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37);
border: 1px solid rgba(255, 255, 255, 0.18);
margin: 10px;
}
.dark .main-container {
background-color: rgba(30, 30, 30, 0.95);
border: 1px solid rgba(255, 255, 255, 0.1);
}
.pdf-status {
padding: 10px;
border-radius: 10px;
margin: 10px 0;
font-size: 0.9em;
}
.pdf-success {
background-color: rgba(52, 211, 153, 0.2);
border: 1px solid rgba(52, 211, 153, 0.5);
color: #10b981;
}
.pdf-error {
background-color: rgba(248, 113, 113, 0.2);
border: 1px solid rgba(248, 113, 113, 0.5);
color: #ef4444;
}
.pdf-info {
background-color: rgba(59, 130, 246, 0.2);
border: 1px solid rgba(59, 130, 246, 0.5);
color: #3b82f6;
}
.rag-context {
background-color: rgba(251, 191, 36, 0.1);
border-left: 4px solid #f59e0b;
padding: 10px;
margin: 10px 0;
border-radius: 5px;
}
"""
class SimpleTextSplitter:
"""ํ…์ŠคํŠธ ๋ถ„ํ• ๊ธฐ"""
def __init__(self, chunk_size=800, chunk_overlap=100):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_text(self, text: str) -> List[str]:
"""ํ…์ŠคํŠธ๋ฅผ ์ฒญํฌ๋กœ ๋ถ„ํ• """
chunks = []
sentences = text.split('. ')
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < self.chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
class PDFRAGSystem:
"""PDF ๊ธฐ๋ฐ˜ RAG ์‹œ์Šคํ…œ"""
def __init__(self):
self.documents = {}
self.document_chunks = {}
self.embeddings_store = {}
self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100)
# ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ์ดˆ๊ธฐํ™”
self.embedder = None
if ST_AVAILABLE:
try:
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
print("โœ… ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ๋กœ๋“œ ์„ฑ๊ณต")
except Exception as e:
print(f"โš ๏ธ ์ž„๋ฒ ๋”ฉ ๋ชจ๋ธ ๋กœ๋“œ ์‹คํŒจ: {e}")
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]:
"""PDF์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ"""
if not PDF_AVAILABLE:
return {
"metadata": {
"title": "PDF Reader Not Available",
"file_name": os.path.basename(pdf_path),
"pages": 0
},
"full_text": "PDF ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด 'pip install pymupdf'๋ฅผ ์‹คํ–‰ํ•ด์ฃผ์„ธ์š”."
}
try:
doc = fitz.open(pdf_path)
text_content = []
metadata = {
"title": doc.metadata.get("title", os.path.basename(pdf_path)),
"pages": len(doc),
"file_name": os.path.basename(pdf_path)
}
for page_num, page in enumerate(doc):
text = page.get_text()
if text.strip():
text_content.append(text)
doc.close()
return {
"metadata": metadata,
"full_text": "\n\n".join(text_content)
}
except Exception as e:
raise Exception(f"PDF ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜: {str(e)}")
def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]:
"""PDF ์ฒ˜๋ฆฌ ๋ฐ ์ €์žฅ"""
try:
# PDF ํ…์ŠคํŠธ ์ถ”์ถœ
pdf_data = self.extract_text_from_pdf(pdf_path)
# ํ…์ŠคํŠธ๋ฅผ ์ฒญํฌ๋กœ ๋ถ„ํ• 
chunks = self.text_splitter.split_text(pdf_data["full_text"])
# ์ฒญํฌ ์ €์žฅ
self.document_chunks[doc_id] = chunks
# ์ž„๋ฒ ๋”ฉ ์ƒ์„ฑ
if self.embedder:
embeddings = self.embedder.encode(chunks)
self.embeddings_store[doc_id] = embeddings
# ๋ฌธ์„œ ์ •๋ณด ์ €์žฅ
self.documents[doc_id] = {
"metadata": pdf_data["metadata"],
"chunk_count": len(chunks),
"upload_time": datetime.now().isoformat()
}
return {
"success": True,
"doc_id": doc_id,
"chunks": len(chunks),
"pages": pdf_data["metadata"]["pages"],
"title": pdf_data["metadata"]["title"]
}
except Exception as e:
return {"success": False, "error": str(e)}
def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]:
"""๊ด€๋ จ ์ฒญํฌ ๊ฒ€์ƒ‰"""
all_relevant_chunks = []
if self.embedder and self.embeddings_store:
# ์ž„๋ฒ ๋”ฉ ๊ธฐ๋ฐ˜ ๊ฒ€์ƒ‰
query_embedding = self.embedder.encode([query])[0]
for doc_id in doc_ids:
if doc_id in self.embeddings_store and doc_id in self.document_chunks:
doc_embeddings = self.embeddings_store[doc_id]
chunks = self.document_chunks[doc_id]
# ์ฝ”์‚ฌ์ธ ์œ ์‚ฌ๋„ ๊ณ„์‚ฐ
similarities = []
for emb in doc_embeddings:
sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb))
similarities.append(sim)
# ์ƒ์œ„ ์ฒญํฌ ์„ ํƒ
top_indices = np.argsort(similarities)[-top_k:][::-1]
for idx in top_indices:
if similarities[idx] > 0.2:
all_relevant_chunks.append({
"content": chunks[idx],
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": similarities[idx]
})
else:
# ํ‚ค์›Œ๋“œ ๊ธฐ๋ฐ˜ ๊ฒ€์ƒ‰
query_keywords = set(query.lower().split())
for doc_id in doc_ids:
if doc_id in self.document_chunks:
chunks = self.document_chunks[doc_id]
for chunk in chunks[:top_k]: # ์ฒ˜์Œ ๋ช‡ ๊ฐœ๋งŒ ์‚ฌ์šฉ
chunk_lower = chunk.lower()
score = sum(1 for keyword in query_keywords if keyword in chunk_lower)
if score > 0:
all_relevant_chunks.append({
"content": chunk[:500], # ๊ธธ์ด ์ œํ•œ
"doc_name": self.documents[doc_id]["metadata"]["file_name"],
"similarity": score / len(query_keywords) if query_keywords else 0
})
# ์ •๋ ฌ ๋ฐ ๋ฐ˜ํ™˜
all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True)
return all_relevant_chunks[:top_k]
def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str:
"""RAG ํ”„๋กฌํ”„ํŠธ ์ƒ์„ฑ"""
relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k)
if not relevant_chunks:
return query
# ํ”„๋กฌํ”„ํŠธ ๊ตฌ์„ฑ
prompt_parts = []
prompt_parts.append("๋‹ค์Œ ๋ฌธ์„œ ๋‚ด์šฉ์„ ์ฐธ๊ณ ํ•˜์—ฌ ์งˆ๋ฌธ์— ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”:\n")
prompt_parts.append("=" * 50)
for i, chunk in enumerate(relevant_chunks, 1):
prompt_parts.append(f"\n[์ฐธ๊ณ ๋ฌธ์„œ {i} - {chunk['doc_name']}]")
content = chunk['content'][:400] if len(chunk['content']) > 400 else chunk['content']
prompt_parts.append(content)
prompt_parts.append("")
prompt_parts.append("=" * 50)
prompt_parts.append(f"\n์งˆ๋ฌธ: {query}")
prompt_parts.append("\n์œ„ ์ฐธ๊ณ ๋ฌธ์„œ๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ์ž์„ธํ•˜๊ณ  ์ •ํ™•ํ•˜๊ฒŒ ๋‹ต๋ณ€ํ•ด์ฃผ์„ธ์š”:")
return "\n".join(prompt_parts)
# RAG ์‹œ์Šคํ…œ ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ
rag_system = PDFRAGSystem()
# State variable to track current model
current_model = gr.State("openai/gpt-oss-120b")
def upload_pdf(file):
"""PDF ํŒŒ์ผ ์—…๋กœ๋“œ ์ฒ˜๋ฆฌ"""
if file is None:
return (
gr.update(value="<div class='pdf-status pdf-error'>ํŒŒ์ผ์„ ์„ ํƒํ•ด์ฃผ์„ธ์š”</div>"),
gr.update(choices=[]),
gr.update(value=False)
)
try:
# ํŒŒ์ผ ํ•ด์‹œ๋ฅผ ID๋กœ ์‚ฌ์šฉ
with open(file.name, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()[:8]
doc_id = f"doc_{file_hash}"
# PDF ์ฒ˜๋ฆฌ ๋ฐ ์ €์žฅ
result = rag_system.process_and_store_pdf(file.name, doc_id)
if result["success"]:
status_html = f"""
<div class="pdf-status pdf-success">
โœ… PDF ์—…๋กœ๋“œ ์„ฑ๊ณต!<br>
๐Ÿ“„ ํŒŒ์ผ: {result['title']}<br>
๐Ÿ“‘ ํŽ˜์ด์ง€: {result['pages']}ํŽ˜์ด์ง€<br>
๐Ÿ” ์ฒญํฌ: {result['chunks']}๊ฐœ ์ƒ์„ฑ
</div>
"""
# ๋ฌธ์„œ ๋ชฉ๋ก ์—…๋ฐ์ดํŠธ
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}"
for doc_id in rag_system.documents.keys()]
return (
status_html,
gr.update(choices=doc_choices, value=doc_choices),
gr.update(value=True)
)
else:
status_html = f"""
<div class="pdf-status pdf-error">
โŒ ์—…๋กœ๋“œ ์‹คํŒจ: {result['error']}
</div>
"""
return status_html, gr.update(), gr.update(value=False)
except Exception as e:
return (
f"<div class='pdf-status pdf-error'>โŒ ์˜ค๋ฅ˜: {str(e)}</div>",
gr.update(),
gr.update(value=False)
)
def clear_documents():
"""๋ฌธ์„œ ์ดˆ๊ธฐํ™”"""
rag_system.documents = {}
rag_system.document_chunks = {}
rag_system.embeddings_store = {}
return (
gr.update(value="<div class='pdf-status pdf-success'>โœ… ๋ชจ๋“  ๋ฌธ์„œ๊ฐ€ ์‚ญ์ œ๋˜์—ˆ์Šต๋‹ˆ๋‹ค</div>"),
gr.update(choices=[], value=[]),
gr.update(value=False)
)
def switch_model(model_choice):
"""Function to switch between models"""
return gr.update(visible=False), gr.update(visible=True), model_choice
def create_rag_context_display(query, selected_docs, top_k):
"""RAG ์ปจํ…์ŠคํŠธ ํ‘œ์‹œ์šฉ HTML ์ƒ์„ฑ"""
if not selected_docs:
return ""
doc_ids = [doc.split(":")[0] for doc in selected_docs]
chunks = rag_system.search_relevant_chunks(query, doc_ids, top_k)
if not chunks:
return ""
html = "<div class='rag-context'><strong>๐Ÿ“š ์ฐธ๊ณ  ๋ฌธ์„œ:</strong><br>"
for i, chunk in enumerate(chunks, 1):
html += f"<br>{i}. {chunk['doc_name']} (์œ ์‚ฌ๋„: {chunk['similarity']:.2f})<br>"
html += f"<small>{chunk['content'][:200]}...</small><br>"
html += "</div>"
return html
# Main interface
with gr.Blocks(fill_height=True, theme="Nymbo/Nymbo_Theme", css=custom_css) as demo:
# JavaScript to handle message passing
gr.HTML("""
<script>
function sendToModel(processedMsg) {
// This function would send the processed message to the model
console.log("Sending to model:", processedMsg);
}
</script>
""")
with gr.Row():
# Sidebar
with gr.Column(scale=1):
with gr.Group(elem_classes="main-container"):
gr.Markdown("# ๐Ÿš€ Inference Provider + RAG")
gr.Markdown(
"OpenAI GPT-OSS models with PDF RAG support. "
"Sign in with your Hugging Face account to use this API."
)
# Model selection
model_dropdown = gr.Dropdown(
choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"],
value="openai/gpt-oss-120b",
label="๐Ÿ“Š Select Model",
info="Choose between different model sizes"
)
# Login button
login_button = gr.LoginButton("Sign in with Hugging Face", size="lg")
# Reload button to apply model change
reload_btn = gr.Button("๐Ÿ”„ Apply Model Change", variant="primary", size="lg")
# RAG Settings
with gr.Accordion("๐Ÿ“š PDF RAG Settings", open=True):
pdf_upload = gr.File(
label="Upload PDF",
file_types=[".pdf"],
type="filepath"
)
upload_status = gr.HTML(
value="<div class='pdf-status pdf-info'>๐Ÿ“ค PDF๋ฅผ ์—…๋กœ๋“œํ•˜์—ฌ ๋ฌธ์„œ ๊ธฐ๋ฐ˜ ๋‹ต๋ณ€์„ ๋ฐ›์œผ์„ธ์š”</div>"
)
document_list = gr.CheckboxGroup(
choices=[],
label="๐Ÿ“„ ์—…๋กœ๋“œ๋œ ๋ฌธ์„œ",
info="์ฐธ๊ณ ํ•  ๋ฌธ์„œ๋ฅผ ์„ ํƒํ•˜์„ธ์š”"
)
clear_btn = gr.Button("๐Ÿ—‘๏ธ ๋ชจ๋“  ๋ฌธ์„œ ์‚ญ์ œ", size="sm")
enable_rag = gr.Checkbox(
label="RAG ํ™œ์„ฑํ™”",
value=False,
info="์„ ํƒํ•œ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•˜์—ฌ ๋‹ต๋ณ€ ์ƒ์„ฑ"
)
top_k_chunks = gr.Slider(
minimum=1,
maximum=5,
value=3,
step=1,
label="์ฐธ์กฐ ์ฒญํฌ ์ˆ˜",
info="๋‹ต๋ณ€ ์ƒ์„ฑ์‹œ ์ฐธ๊ณ ํ•  ๋ฌธ์„œ ์กฐ๊ฐ ๊ฐœ์ˆ˜"
)
# Additional options
with gr.Accordion("โš™๏ธ Advanced Options", open=False):
gr.Markdown("*These options will be available after model implementation*")
temperature = gr.Slider(
minimum=0,
maximum=2,
value=0.7,
step=0.1,
label="Temperature"
)
max_tokens = gr.Slider(
minimum=1,
maximum=4096,
value=512,
step=1,
label="Max Tokens"
)
# Main chat area
with gr.Column(scale=3):
with gr.Group(elem_classes="main-container"):
gr.Markdown("## ๐Ÿ’ฌ Chat Interface")
# RAG ์ƒํƒœ ํ‘œ์‹œ
rag_status = gr.HTML(
value="<div class='pdf-status pdf-info'>๐Ÿ” RAG: <strong>๋น„ํ™œ์„ฑํ™”</strong></div>"
)
# RAG ์ปจํ…์ŠคํŠธ ํ‘œ์‹œ ์˜์—ญ
rag_context_display = gr.HTML(value="", visible=False)
# Container for model interfaces
with gr.Column(visible=True) as model_120b_container:
gr.Markdown("### Model: openai/gpt-oss-120b")
# RAG ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ์ปค์Šคํ…€ ์ธํ„ฐํŽ˜์ด์Šค
with gr.Group():
# ์‚ฌ์šฉ์ž ์ž…๋ ฅ ํ…์ŠคํŠธ๋ฐ•์Šค
user_input = gr.Textbox(
label="๋ฉ”์‹œ์ง€ ์ž…๋ ฅ",
placeholder="๋ฌธ์„œ์— ๋Œ€ํ•ด ์งˆ๋ฌธํ•˜๊ฑฐ๋‚˜ ์ผ๋ฐ˜ ๋Œ€ํ™”๋ฅผ ์‹œ์ž‘ํ•˜์„ธ์š”...",
lines=2
)
with gr.Row():
send_btn = gr.Button("๐Ÿ“ค ์ „์†ก", variant="primary")
clear_chat_btn = gr.Button("๐Ÿ—‘๏ธ ๋Œ€ํ™” ์ดˆ๊ธฐํ™”")
# ์›๋ณธ ๋ชจ๋ธ ๋กœ๋“œ
original_model = gr.load(
"models/openai/gpt-oss-120b",
accept_token=login_button,
provider="fireworks-ai"
)
with gr.Column(visible=False) as model_20b_container:
gr.Markdown("### Model: openai/gpt-oss-20b")
with gr.Group():
# ์‚ฌ์šฉ์ž ์ž…๋ ฅ ํ…์ŠคํŠธ๋ฐ•์Šค (20b์šฉ)
user_input_20b = gr.Textbox(
label="๋ฉ”์‹œ์ง€ ์ž…๋ ฅ",
placeholder="๋ฌธ์„œ์— ๋Œ€ํ•ด ์งˆ๋ฌธํ•˜๊ฑฐ๋‚˜ ์ผ๋ฐ˜ ๋Œ€ํ™”๋ฅผ ์‹œ์ž‘ํ•˜์„ธ์š”...",
lines=2
)
with gr.Row():
send_btn_20b = gr.Button("๐Ÿ“ค ์ „์†ก", variant="primary")
clear_chat_btn_20b = gr.Button("๐Ÿ—‘๏ธ ๋Œ€ํ™” ์ดˆ๊ธฐํ™”")
# ์›๋ณธ ๋ชจ๋ธ ๋กœ๋“œ
original_model_20b = gr.load(
"models/openai/gpt-oss-20b",
accept_token=login_button,
provider="fireworks-ai"
)
# Event Handlers
# PDF ์—…๋กœ๋“œ
pdf_upload.upload(
fn=upload_pdf,
inputs=[pdf_upload],
outputs=[upload_status, document_list, enable_rag]
)
# ๋ฌธ์„œ ์‚ญ์ œ
clear_btn.click(
fn=clear_documents,
outputs=[upload_status, document_list, enable_rag]
)
# RAG ์ƒํƒœ ์—…๋ฐ์ดํŠธ
enable_rag.change(
fn=lambda x: gr.update(
value=f"<div class='pdf-status pdf-info'>๐Ÿ” RAG: <strong>{'ํ™œ์„ฑํ™”' if x else '๋น„ํ™œ์„ฑํ™”'}</strong></div>"
),
inputs=[enable_rag],
outputs=[rag_status]
)
# ๋ชจ๋ธ ์ „ํ™˜
reload_btn.click(
fn=switch_model,
inputs=[model_dropdown],
outputs=[model_120b_container, model_20b_container, current_model]
).then(
fn=lambda: gr.Info("Model switched successfully!"),
inputs=[],
outputs=[]
)
# Update visibility based on dropdown selection
def update_visibility(model_choice):
if model_choice == "openai/gpt-oss-120b":
return gr.update(visible=True), gr.update(visible=False)
else:
return gr.update(visible=False), gr.update(visible=True)
model_dropdown.change(
fn=update_visibility,
inputs=[model_dropdown],
outputs=[model_120b_container, model_20b_container]
)
# ๋ฉ”์‹œ์ง€ ์ „์†ก ์ฒ˜๋ฆฌ (RAG ํฌํ•จ)
def process_message(message, enable_rag, selected_docs, top_k):
"""๋ฉ”์‹œ์ง€๋ฅผ RAG๋กœ ์ฒ˜๋ฆฌํ•˜์—ฌ ๋ชจ๋ธ์— ์ „์†ก"""
if enable_rag and selected_docs:
doc_ids = [doc.split(":")[0] for doc in selected_docs]
enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k)
context_html = create_rag_context_display(message, selected_docs, top_k)
return enhanced_message, gr.update(value=context_html, visible=True)
else:
return message, gr.update(value="", visible=False)
# 120b ๋ชจ๋ธ์šฉ ์ด๋ฒคํŠธ
send_btn.click(
fn=process_message,
inputs=[user_input, enable_rag, document_list, top_k_chunks],
outputs=[user_input, rag_context_display]
)
user_input.submit(
fn=process_message,
inputs=[user_input, enable_rag, document_list, top_k_chunks],
outputs=[user_input, rag_context_display]
)
# 20b ๋ชจ๋ธ์šฉ ์ด๋ฒคํŠธ
send_btn_20b.click(
fn=process_message,
inputs=[user_input_20b, enable_rag, document_list, top_k_chunks],
outputs=[user_input_20b, rag_context_display]
)
user_input_20b.submit(
fn=process_message,
inputs=[user_input_20b, enable_rag, document_list, top_k_chunks],
outputs=[user_input_20b, rag_context_display]
)
demo.launch()