Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import os | |
from typing import List, Dict, Any, Optional, Tuple | |
import hashlib | |
from datetime import datetime | |
import numpy as np | |
# PDF ์ฒ๋ฆฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ | |
try: | |
import fitz # PyMuPDF | |
PDF_AVAILABLE = True | |
except ImportError: | |
PDF_AVAILABLE = False | |
print("โ ๏ธ PyMuPDF not installed. Install with: pip install pymupdf") | |
try: | |
from sentence_transformers import SentenceTransformer | |
ST_AVAILABLE = True | |
except ImportError: | |
ST_AVAILABLE = False | |
print("โ ๏ธ Sentence Transformers not installed. Install with: pip install sentence-transformers") | |
# Soft and bright custom CSS | |
custom_css = """ | |
.gradio-container { | |
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); | |
min-height: 100vh; | |
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; | |
} | |
.main-container { | |
background: rgba(255, 255, 255, 0.98); | |
border-radius: 16px; | |
padding: 24px; | |
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); | |
border: 1px solid rgba(0, 0, 0, 0.05); | |
margin: 12px; | |
} | |
/* Status messages styling */ | |
.pdf-status { | |
padding: 12px 16px; | |
border-radius: 12px; | |
margin: 12px 0; | |
font-size: 0.95rem; | |
font-weight: 500; | |
} | |
.pdf-success { | |
background: linear-gradient(135deg, #d4edda 0%, #c3e6cb 100%); | |
border: 1px solid #b1dfbb; | |
color: #155724; | |
} | |
.pdf-error { | |
background: linear-gradient(135deg, #f8d7da 0%, #f5c6cb 100%); | |
border: 1px solid #f1aeb5; | |
color: #721c24; | |
} | |
.pdf-info { | |
background: linear-gradient(135deg, #d1ecf1 0%, #bee5eb 100%); | |
border: 1px solid #9ec5d8; | |
color: #0c5460; | |
} | |
.rag-context { | |
background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%); | |
border-left: 4px solid #f59e0b; | |
padding: 12px; | |
margin: 12px 0; | |
border-radius: 8px; | |
font-size: 0.9rem; | |
} | |
""" | |
class SimpleTextSplitter: | |
"""ํ ์คํธ ๋ถํ ๊ธฐ""" | |
def __init__(self, chunk_size=800, chunk_overlap=100): | |
self.chunk_size = chunk_size | |
self.chunk_overlap = chunk_overlap | |
def split_text(self, text: str) -> List[str]: | |
"""ํ ์คํธ๋ฅผ ์ฒญํฌ๋ก ๋ถํ """ | |
chunks = [] | |
sentences = text.split('. ') | |
current_chunk = "" | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence) < self.chunk_size: | |
current_chunk += sentence + ". " | |
else: | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
current_chunk = sentence + ". " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
class PDFRAGSystem: | |
"""PDF ๊ธฐ๋ฐ RAG ์์คํ """ | |
def __init__(self): | |
self.documents = {} | |
self.document_chunks = {} | |
self.embeddings_store = {} | |
self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100) | |
# ์๋ฒ ๋ฉ ๋ชจ๋ธ ์ด๊ธฐํ | |
self.embedder = None | |
if ST_AVAILABLE: | |
try: | |
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
print("โ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์ฑ๊ณต") | |
except Exception as e: | |
print(f"โ ๏ธ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์คํจ: {e}") | |
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: | |
"""PDF์์ ํ ์คํธ ์ถ์ถ""" | |
if not PDF_AVAILABLE: | |
return { | |
"metadata": { | |
"title": "PDF Reader Not Available", | |
"file_name": os.path.basename(pdf_path), | |
"pages": 0 | |
}, | |
"full_text": "PDF ์ฒ๋ฆฌ๋ฅผ ์ํด 'pip install pymupdf'๋ฅผ ์คํํด์ฃผ์ธ์." | |
} | |
try: | |
doc = fitz.open(pdf_path) | |
text_content = [] | |
metadata = { | |
"title": doc.metadata.get("title", os.path.basename(pdf_path)), | |
"pages": len(doc), | |
"file_name": os.path.basename(pdf_path) | |
} | |
for page_num, page in enumerate(doc): | |
text = page.get_text() | |
if text.strip(): | |
text_content.append(text) | |
doc.close() | |
return { | |
"metadata": metadata, | |
"full_text": "\n\n".join(text_content) | |
} | |
except Exception as e: | |
raise Exception(f"PDF ์ฒ๋ฆฌ ์ค๋ฅ: {str(e)}") | |
def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: | |
"""PDF ์ฒ๋ฆฌ ๋ฐ ์ ์ฅ""" | |
try: | |
# PDF ํ ์คํธ ์ถ์ถ | |
pdf_data = self.extract_text_from_pdf(pdf_path) | |
# ํ ์คํธ๋ฅผ ์ฒญํฌ๋ก ๋ถํ | |
chunks = self.text_splitter.split_text(pdf_data["full_text"]) | |
# ์ฒญํฌ ์ ์ฅ | |
self.document_chunks[doc_id] = chunks | |
# ์๋ฒ ๋ฉ ์์ฑ | |
if self.embedder: | |
embeddings = self.embedder.encode(chunks) | |
self.embeddings_store[doc_id] = embeddings | |
# ๋ฌธ์ ์ ๋ณด ์ ์ฅ | |
self.documents[doc_id] = { | |
"metadata": pdf_data["metadata"], | |
"chunk_count": len(chunks), | |
"upload_time": datetime.now().isoformat() | |
} | |
return { | |
"success": True, | |
"doc_id": doc_id, | |
"chunks": len(chunks), | |
"pages": pdf_data["metadata"]["pages"], | |
"title": pdf_data["metadata"]["title"] | |
} | |
except Exception as e: | |
return {"success": False, "error": str(e)} | |
def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]: | |
"""๊ด๋ จ ์ฒญํฌ ๊ฒ์""" | |
all_relevant_chunks = [] | |
if self.embedder and self.embeddings_store: | |
# ์๋ฒ ๋ฉ ๊ธฐ๋ฐ ๊ฒ์ | |
query_embedding = self.embedder.encode([query])[0] | |
for doc_id in doc_ids: | |
if doc_id in self.embeddings_store and doc_id in self.document_chunks: | |
doc_embeddings = self.embeddings_store[doc_id] | |
chunks = self.document_chunks[doc_id] | |
# ์ฝ์ฌ์ธ ์ ์ฌ๋ ๊ณ์ฐ | |
similarities = [] | |
for emb in doc_embeddings: | |
sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) | |
similarities.append(sim) | |
# ์์ ์ฒญํฌ ์ ํ | |
top_indices = np.argsort(similarities)[-top_k:][::-1] | |
for idx in top_indices: | |
if similarities[idx] > 0.2: | |
all_relevant_chunks.append({ | |
"content": chunks[idx], | |
"doc_name": self.documents[doc_id]["metadata"]["file_name"], | |
"similarity": similarities[idx] | |
}) | |
else: | |
# ํค์๋ ๊ธฐ๋ฐ ๊ฒ์ | |
query_keywords = set(query.lower().split()) | |
for doc_id in doc_ids: | |
if doc_id in self.document_chunks: | |
chunks = self.document_chunks[doc_id] | |
for i, chunk in enumerate(chunks[:5]): # ์ฒ์ 5๊ฐ๋ง | |
chunk_lower = chunk.lower() | |
score = sum(1 for keyword in query_keywords if keyword in chunk_lower) | |
if score > 0: | |
all_relevant_chunks.append({ | |
"content": chunk[:500], | |
"doc_name": self.documents[doc_id]["metadata"]["file_name"], | |
"similarity": score / len(query_keywords) if query_keywords else 0 | |
}) | |
# ์ ๋ ฌ ๋ฐ ๋ฐํ | |
all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True) | |
return all_relevant_chunks[:top_k] | |
def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str: | |
"""RAG ํ๋กฌํํธ ์์ฑ""" | |
relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k) | |
if not relevant_chunks: | |
return query | |
# ํ๋กฌํํธ ๊ตฌ์ฑ | |
prompt_parts = [] | |
prompt_parts.append("๋ค์ ๋ฌธ์ ๋ด์ฉ์ ์ฐธ๊ณ ํ์ฌ ๋ต๋ณํด์ฃผ์ธ์:\n") | |
prompt_parts.append("=" * 40) | |
for i, chunk in enumerate(relevant_chunks, 1): | |
prompt_parts.append(f"\n[์ฐธ๊ณ {i} - {chunk['doc_name']}]") | |
content = chunk['content'][:300] if len(chunk['content']) > 300 else chunk['content'] | |
prompt_parts.append(content) | |
prompt_parts.append("\n" + "=" * 40) | |
prompt_parts.append(f"\n์ง๋ฌธ: {query}") | |
return "\n".join(prompt_parts) | |
# RAG ์์คํ ์ธ์คํด์ค ์์ฑ | |
rag_system = PDFRAGSystem() | |
# State variable to track current model and RAG settings | |
current_model = gr.State("openai/gpt-oss-120b") | |
rag_enabled_state = gr.State(False) | |
selected_docs_state = gr.State([]) | |
top_k_state = gr.State(3) | |
def upload_pdf(file): | |
"""PDF ํ์ผ ์ ๋ก๋ ์ฒ๋ฆฌ""" | |
if file is None: | |
return ( | |
gr.update(value="<div class='pdf-status pdf-info'>๐ ํ์ผ์ ์ ํํด์ฃผ์ธ์</div>"), | |
gr.update(choices=[]), | |
gr.update(value=False) | |
) | |
try: | |
# ํ์ผ ํด์๋ฅผ ID๋ก ์ฌ์ฉ | |
with open(file.name, 'rb') as f: | |
file_hash = hashlib.md5(f.read()).hexdigest()[:8] | |
doc_id = f"doc_{file_hash}" | |
# PDF ์ฒ๋ฆฌ ๋ฐ ์ ์ฅ | |
result = rag_system.process_and_store_pdf(file.name, doc_id) | |
if result["success"]: | |
status_html = f""" | |
<div class="pdf-status pdf-success"> | |
โ PDF ์ ๋ก๋ ์๋ฃ!<br> | |
๐ {result['title']}<br> | |
๐ {result['pages']} ํ์ด์ง | ๐ {result['chunks']} ์ฒญํฌ | |
</div> | |
""" | |
# ๋ฌธ์ ๋ชฉ๋ก ์ ๋ฐ์ดํธ | |
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" | |
for doc_id in rag_system.documents.keys()] | |
return ( | |
status_html, | |
gr.update(choices=doc_choices, value=doc_choices), | |
gr.update(value=True) | |
) | |
else: | |
return ( | |
f"<div class='pdf-status pdf-error'>โ ์ค๋ฅ: {result['error']}</div>", | |
gr.update(), | |
gr.update(value=False) | |
) | |
except Exception as e: | |
return ( | |
f"<div class='pdf-status pdf-error'>โ ์ค๋ฅ: {str(e)}</div>", | |
gr.update(), | |
gr.update(value=False) | |
) | |
def clear_documents(): | |
"""๋ฌธ์ ์ด๊ธฐํ""" | |
rag_system.documents = {} | |
rag_system.document_chunks = {} | |
rag_system.embeddings_store = {} | |
return ( | |
gr.update(value="<div class='pdf-status pdf-info'>๐๏ธ ๋ชจ๋ ๋ฌธ์๊ฐ ์ญ์ ๋์์ต๋๋ค</div>"), | |
gr.update(choices=[], value=[]), | |
gr.update(value=False) | |
) | |
def switch_model(model_choice): | |
"""Function to switch between models""" | |
return gr.update(visible=False), gr.update(visible=True), model_choice | |
def create_rag_wrapper(original_fn, model_name): | |
"""์๋ณธ ๋ชจ๋ธ ํจ์๋ฅผ RAG๋ก ๊ฐ์ธ๋ ๋ํผ ์์ฑ""" | |
def wrapped_fn(message, history=None): | |
# RAG ์ค์ ๊ฐ์ ธ์ค๊ธฐ | |
if rag_enabled_state.value and selected_docs_state.value: | |
doc_ids = [doc.split(":")[0] for doc in selected_docs_state.value] | |
enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k_state.value) | |
# RAG ์ ์ฉ ์๋ฆผ | |
print(f"๐ RAG ์ ์ฉ: {len(message)}์ โ {len(enhanced_message)}์") | |
# ์๋ณธ ๋ชจ๋ธ์ ๊ฐํ๋ ๋ฉ์์ง ์ ๋ฌ | |
if history is not None: | |
return original_fn(enhanced_message, history) | |
else: | |
return original_fn(enhanced_message) | |
else: | |
# RAG ๋ฏธ์ ์ฉ์ ์๋ณธ ๋ฉ์์ง ๊ทธ๋๋ก ์ ๋ฌ | |
if history is not None: | |
return original_fn(message, history) | |
else: | |
return original_fn(message) | |
return wrapped_fn | |
# Main interface with soft theme | |
with gr.Blocks(fill_height=True, theme=gr.themes.Soft(), css=custom_css) as demo: | |
with gr.Row(): | |
# Sidebar | |
with gr.Column(scale=1): | |
with gr.Group(elem_classes="main-container"): | |
gr.Markdown("# ๐ Inference Provider + RAG") | |
gr.Markdown( | |
"OpenAI GPT-OSS models served by Cerebras API. " | |
"Upload PDF documents for context-aware responses." | |
) | |
# Model selection | |
model_dropdown = gr.Dropdown( | |
choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], | |
value="openai/gpt-oss-120b", | |
label="๐ Select Model", | |
info="Choose between different model sizes" | |
) | |
# Login button | |
login_button = gr.LoginButton("Sign in with Hugging Face", size="lg") | |
# Reload button to apply model change | |
reload_btn = gr.Button("๐ Apply Model Change", variant="primary", size="lg") | |
# RAG Settings | |
with gr.Accordion("๐ PDF RAG Settings", open=True): | |
pdf_upload = gr.File( | |
label="Upload PDF", | |
file_types=[".pdf"], | |
type="filepath" | |
) | |
upload_status = gr.HTML( | |
value="<div class='pdf-status pdf-info'>๐ค Upload a PDF to enable document-based answers</div>" | |
) | |
document_list = gr.CheckboxGroup( | |
choices=[], | |
label="๐ Uploaded Documents", | |
info="Select documents to use as context" | |
) | |
clear_btn = gr.Button("๐๏ธ Clear All Documents", size="sm", variant="secondary") | |
enable_rag = gr.Checkbox( | |
label="โจ Enable RAG", | |
value=False, | |
info="Use documents for context-aware responses" | |
) | |
top_k_chunks = gr.Slider( | |
minimum=1, | |
maximum=5, | |
value=3, | |
step=1, | |
label="Context Chunks", | |
info="Number of document chunks to use" | |
) | |
# Additional options | |
with gr.Accordion("โ๏ธ Advanced Options", open=False): | |
gr.Markdown("*These options will be available after model implementation*") | |
temperature = gr.Slider( | |
minimum=0, | |
maximum=2, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
max_tokens = gr.Slider( | |
minimum=1, | |
maximum=4096, | |
value=512, | |
step=1, | |
label="Max Tokens" | |
) | |
# Main chat area | |
with gr.Column(scale=3): | |
with gr.Group(elem_classes="main-container"): | |
gr.Markdown("## ๐ฌ Chat Interface") | |
# RAG status | |
rag_status = gr.HTML( | |
value="<div class='pdf-status pdf-info'>๐ RAG: <strong>Disabled</strong></div>" | |
) | |
# RAG context preview | |
context_preview = gr.HTML(value="", visible=False) | |
# Container for model interfaces | |
with gr.Column(visible=True) as model_120b_container: | |
gr.Markdown("### Model: openai/gpt-oss-120b") | |
# Load the original model and wrap it with RAG | |
original_interface_120b = gr.load( | |
"models/openai/gpt-oss-120b", | |
accept_token=login_button, | |
provider="fireworks-ai" | |
) | |
# Note: The loaded interface will have its own chat components | |
# We'll intercept the messages through our wrapper function | |
with gr.Column(visible=False) as model_20b_container: | |
gr.Markdown("### Model: openai/gpt-oss-20b") | |
# Load the original model | |
original_interface_20b = gr.load( | |
"models/openai/gpt-oss-20b", | |
accept_token=login_button, | |
provider="fireworks-ai" | |
) | |
# Event Handlers | |
# PDF upload | |
pdf_upload.upload( | |
fn=upload_pdf, | |
inputs=[pdf_upload], | |
outputs=[upload_status, document_list, enable_rag] | |
) | |
# Clear documents | |
clear_btn.click( | |
fn=clear_documents, | |
outputs=[upload_status, document_list, enable_rag] | |
) | |
# Update RAG state when settings change | |
def update_rag_state(enabled, docs, k): | |
rag_enabled_state.value = enabled | |
selected_docs_state.value = docs if docs else [] | |
top_k_state.value = k | |
status = "โ Enabled" if enabled and docs else "โญ Disabled" | |
status_html = f"<div class='pdf-status pdf-info'>๐ RAG: <strong>{status}</strong></div>" | |
# Show context preview if RAG is enabled | |
if enabled and docs: | |
preview = f"<div class='rag-context'>๐ Using {len(docs)} document(s) with {k} chunks per query</div>" | |
return gr.update(value=status_html), gr.update(value=preview, visible=True) | |
else: | |
return gr.update(value=status_html), gr.update(value="", visible=False) | |
# Connect RAG state updates | |
enable_rag.change( | |
fn=update_rag_state, | |
inputs=[enable_rag, document_list, top_k_chunks], | |
outputs=[rag_status, context_preview] | |
) | |
document_list.change( | |
fn=update_rag_state, | |
inputs=[enable_rag, document_list, top_k_chunks], | |
outputs=[rag_status, context_preview] | |
) | |
top_k_chunks.change( | |
fn=update_rag_state, | |
inputs=[enable_rag, document_list, top_k_chunks], | |
outputs=[rag_status, context_preview] | |
) | |
# Handle model switching | |
reload_btn.click( | |
fn=switch_model, | |
inputs=[model_dropdown], | |
outputs=[model_120b_container, model_20b_container, current_model] | |
).then( | |
fn=lambda: gr.Info("Model switched successfully!"), | |
inputs=[], | |
outputs=[] | |
) | |
# Update visibility based on dropdown selection | |
def update_visibility(model_choice): | |
if model_choice == "openai/gpt-oss-120b": | |
return gr.update(visible=True), gr.update(visible=False) | |
else: | |
return gr.update(visible=False), gr.update(visible=True) | |
model_dropdown.change( | |
fn=update_visibility, | |
inputs=[model_dropdown], | |
outputs=[model_120b_container, model_20b_container] | |
) | |
# Monkey-patch the loaded interfaces to add RAG support | |
# This is done after the interface is loaded | |
demo.load = lambda: print("๐ RAG System Ready!") | |
demo.launch() |