Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import os | |
from typing import List, Dict, Any, Optional | |
import hashlib | |
from datetime import datetime | |
import numpy as np | |
# PDF ์ฒ๋ฆฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ | |
try: | |
import fitz # PyMuPDF | |
PDF_AVAILABLE = True | |
except ImportError: | |
PDF_AVAILABLE = False | |
print("โ ๏ธ PyMuPDF not installed. Install with: pip install pymupdf") | |
try: | |
from sentence_transformers import SentenceTransformer | |
ST_AVAILABLE = True | |
except ImportError: | |
ST_AVAILABLE = False | |
print("โ ๏ธ Sentence Transformers not installed. Install with: pip install sentence-transformers") | |
# Custom CSS for gradient background and styling | |
custom_css = """ | |
.gradio-container { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%); | |
background-size: 400% 400%; | |
animation: gradient-animation 15s ease infinite; | |
min-height: 100vh; | |
} | |
@keyframes gradient-animation { | |
0% { background-position: 0% 50%; } | |
50% { background-position: 100% 50%; } | |
100% { background-position: 0% 50%; } | |
} | |
.dark .gradio-container { | |
background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%); | |
background-size: 400% 400%; | |
animation: gradient-animation 15s ease infinite; | |
} | |
.main-container { | |
background-color: rgba(255, 255, 255, 0.95); | |
backdrop-filter: blur(10px); | |
border-radius: 20px; | |
padding: 20px; | |
box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37); | |
border: 1px solid rgba(255, 255, 255, 0.18); | |
margin: 10px; | |
} | |
.dark .main-container { | |
background-color: rgba(30, 30, 30, 0.95); | |
border: 1px solid rgba(255, 255, 255, 0.1); | |
} | |
.pdf-status { | |
padding: 10px; | |
border-radius: 10px; | |
margin: 10px 0; | |
font-size: 0.9em; | |
} | |
.pdf-success { | |
background-color: rgba(52, 211, 153, 0.2); | |
border: 1px solid rgba(52, 211, 153, 0.5); | |
color: #10b981; | |
} | |
.pdf-error { | |
background-color: rgba(248, 113, 113, 0.2); | |
border: 1px solid rgba(248, 113, 113, 0.5); | |
color: #ef4444; | |
} | |
.pdf-info { | |
background-color: rgba(59, 130, 246, 0.2); | |
border: 1px solid rgba(59, 130, 246, 0.5); | |
color: #3b82f6; | |
} | |
.rag-context { | |
background-color: rgba(251, 191, 36, 0.1); | |
border-left: 4px solid #f59e0b; | |
padding: 10px; | |
margin: 10px 0; | |
border-radius: 5px; | |
} | |
""" | |
class SimpleTextSplitter: | |
"""ํ ์คํธ ๋ถํ ๊ธฐ""" | |
def __init__(self, chunk_size=800, chunk_overlap=100): | |
self.chunk_size = chunk_size | |
self.chunk_overlap = chunk_overlap | |
def split_text(self, text: str) -> List[str]: | |
"""ํ ์คํธ๋ฅผ ์ฒญํฌ๋ก ๋ถํ """ | |
chunks = [] | |
sentences = text.split('. ') | |
current_chunk = "" | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence) < self.chunk_size: | |
current_chunk += sentence + ". " | |
else: | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
current_chunk = sentence + ". " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
class PDFRAGSystem: | |
"""PDF ๊ธฐ๋ฐ RAG ์์คํ """ | |
def __init__(self): | |
self.documents = {} | |
self.document_chunks = {} | |
self.embeddings_store = {} | |
self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100) | |
# ์๋ฒ ๋ฉ ๋ชจ๋ธ ์ด๊ธฐํ | |
self.embedder = None | |
if ST_AVAILABLE: | |
try: | |
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
print("โ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์ฑ๊ณต") | |
except Exception as e: | |
print(f"โ ๏ธ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์คํจ: {e}") | |
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: | |
"""PDF์์ ํ ์คํธ ์ถ์ถ""" | |
if not PDF_AVAILABLE: | |
return { | |
"metadata": { | |
"title": "PDF Reader Not Available", | |
"file_name": os.path.basename(pdf_path), | |
"pages": 0 | |
}, | |
"full_text": "PDF ์ฒ๋ฆฌ๋ฅผ ์ํด 'pip install pymupdf'๋ฅผ ์คํํด์ฃผ์ธ์." | |
} | |
try: | |
doc = fitz.open(pdf_path) | |
text_content = [] | |
metadata = { | |
"title": doc.metadata.get("title", os.path.basename(pdf_path)), | |
"pages": len(doc), | |
"file_name": os.path.basename(pdf_path) | |
} | |
for page_num, page in enumerate(doc): | |
text = page.get_text() | |
if text.strip(): | |
text_content.append(text) | |
doc.close() | |
return { | |
"metadata": metadata, | |
"full_text": "\n\n".join(text_content) | |
} | |
except Exception as e: | |
raise Exception(f"PDF ์ฒ๋ฆฌ ์ค๋ฅ: {str(e)}") | |
def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: | |
"""PDF ์ฒ๋ฆฌ ๋ฐ ์ ์ฅ""" | |
try: | |
# PDF ํ ์คํธ ์ถ์ถ | |
pdf_data = self.extract_text_from_pdf(pdf_path) | |
# ํ ์คํธ๋ฅผ ์ฒญํฌ๋ก ๋ถํ | |
chunks = self.text_splitter.split_text(pdf_data["full_text"]) | |
# ์ฒญํฌ ์ ์ฅ | |
self.document_chunks[doc_id] = chunks | |
# ์๋ฒ ๋ฉ ์์ฑ | |
if self.embedder: | |
embeddings = self.embedder.encode(chunks) | |
self.embeddings_store[doc_id] = embeddings | |
# ๋ฌธ์ ์ ๋ณด ์ ์ฅ | |
self.documents[doc_id] = { | |
"metadata": pdf_data["metadata"], | |
"chunk_count": len(chunks), | |
"upload_time": datetime.now().isoformat() | |
} | |
return { | |
"success": True, | |
"doc_id": doc_id, | |
"chunks": len(chunks), | |
"pages": pdf_data["metadata"]["pages"], | |
"title": pdf_data["metadata"]["title"] | |
} | |
except Exception as e: | |
return {"success": False, "error": str(e)} | |
def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]: | |
"""๊ด๋ จ ์ฒญํฌ ๊ฒ์""" | |
all_relevant_chunks = [] | |
if self.embedder and self.embeddings_store: | |
# ์๋ฒ ๋ฉ ๊ธฐ๋ฐ ๊ฒ์ | |
query_embedding = self.embedder.encode([query])[0] | |
for doc_id in doc_ids: | |
if doc_id in self.embeddings_store and doc_id in self.document_chunks: | |
doc_embeddings = self.embeddings_store[doc_id] | |
chunks = self.document_chunks[doc_id] | |
# ์ฝ์ฌ์ธ ์ ์ฌ๋ ๊ณ์ฐ | |
similarities = [] | |
for emb in doc_embeddings: | |
sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) | |
similarities.append(sim) | |
# ์์ ์ฒญํฌ ์ ํ | |
top_indices = np.argsort(similarities)[-top_k:][::-1] | |
for idx in top_indices: | |
if similarities[idx] > 0.2: | |
all_relevant_chunks.append({ | |
"content": chunks[idx], | |
"doc_name": self.documents[doc_id]["metadata"]["file_name"], | |
"similarity": similarities[idx] | |
}) | |
else: | |
# ํค์๋ ๊ธฐ๋ฐ ๊ฒ์ | |
query_keywords = set(query.lower().split()) | |
for doc_id in doc_ids: | |
if doc_id in self.document_chunks: | |
chunks = self.document_chunks[doc_id] | |
for chunk in chunks[:top_k]: # ์ฒ์ ๋ช ๊ฐ๋ง ์ฌ์ฉ | |
chunk_lower = chunk.lower() | |
score = sum(1 for keyword in query_keywords if keyword in chunk_lower) | |
if score > 0: | |
all_relevant_chunks.append({ | |
"content": chunk[:500], # ๊ธธ์ด ์ ํ | |
"doc_name": self.documents[doc_id]["metadata"]["file_name"], | |
"similarity": score / len(query_keywords) if query_keywords else 0 | |
}) | |
# ์ ๋ ฌ ๋ฐ ๋ฐํ | |
all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True) | |
return all_relevant_chunks[:top_k] | |
def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str: | |
"""RAG ํ๋กฌํํธ ์์ฑ""" | |
relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k) | |
if not relevant_chunks: | |
return query | |
# ํ๋กฌํํธ ๊ตฌ์ฑ | |
prompt_parts = [] | |
prompt_parts.append("๋ค์ ๋ฌธ์ ๋ด์ฉ์ ์ฐธ๊ณ ํ์ฌ ์ง๋ฌธ์ ๋ต๋ณํด์ฃผ์ธ์:\n") | |
prompt_parts.append("=" * 50) | |
for i, chunk in enumerate(relevant_chunks, 1): | |
prompt_parts.append(f"\n[์ฐธ๊ณ ๋ฌธ์ {i} - {chunk['doc_name']}]") | |
content = chunk['content'][:400] if len(chunk['content']) > 400 else chunk['content'] | |
prompt_parts.append(content) | |
prompt_parts.append("") | |
prompt_parts.append("=" * 50) | |
prompt_parts.append(f"\n์ง๋ฌธ: {query}") | |
prompt_parts.append("\n์ ์ฐธ๊ณ ๋ฌธ์๋ฅผ ๋ฐํ์ผ๋ก ์์ธํ๊ณ ์ ํํ๊ฒ ๋ต๋ณํด์ฃผ์ธ์:") | |
return "\n".join(prompt_parts) | |
# RAG ์์คํ ์ธ์คํด์ค ์์ฑ | |
rag_system = PDFRAGSystem() | |
# State variable to track current model | |
current_model = gr.State("openai/gpt-oss-120b") | |
def upload_pdf(file): | |
"""PDF ํ์ผ ์ ๋ก๋ ์ฒ๋ฆฌ""" | |
if file is None: | |
return ( | |
gr.update(value="<div class='pdf-status pdf-error'>ํ์ผ์ ์ ํํด์ฃผ์ธ์</div>"), | |
gr.update(choices=[]), | |
gr.update(value=False) | |
) | |
try: | |
# ํ์ผ ํด์๋ฅผ ID๋ก ์ฌ์ฉ | |
with open(file.name, 'rb') as f: | |
file_hash = hashlib.md5(f.read()).hexdigest()[:8] | |
doc_id = f"doc_{file_hash}" | |
# PDF ์ฒ๋ฆฌ ๋ฐ ์ ์ฅ | |
result = rag_system.process_and_store_pdf(file.name, doc_id) | |
if result["success"]: | |
status_html = f""" | |
<div class="pdf-status pdf-success"> | |
โ PDF ์ ๋ก๋ ์ฑ๊ณต!<br> | |
๐ ํ์ผ: {result['title']}<br> | |
๐ ํ์ด์ง: {result['pages']}ํ์ด์ง<br> | |
๐ ์ฒญํฌ: {result['chunks']}๊ฐ ์์ฑ | |
</div> | |
""" | |
# ๋ฌธ์ ๋ชฉ๋ก ์ ๋ฐ์ดํธ | |
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" | |
for doc_id in rag_system.documents.keys()] | |
return ( | |
status_html, | |
gr.update(choices=doc_choices, value=doc_choices), | |
gr.update(value=True) | |
) | |
else: | |
status_html = f""" | |
<div class="pdf-status pdf-error"> | |
โ ์ ๋ก๋ ์คํจ: {result['error']} | |
</div> | |
""" | |
return status_html, gr.update(), gr.update(value=False) | |
except Exception as e: | |
return ( | |
f"<div class='pdf-status pdf-error'>โ ์ค๋ฅ: {str(e)}</div>", | |
gr.update(), | |
gr.update(value=False) | |
) | |
def clear_documents(): | |
"""๋ฌธ์ ์ด๊ธฐํ""" | |
rag_system.documents = {} | |
rag_system.document_chunks = {} | |
rag_system.embeddings_store = {} | |
return ( | |
gr.update(value="<div class='pdf-status pdf-success'>โ ๋ชจ๋ ๋ฌธ์๊ฐ ์ญ์ ๋์์ต๋๋ค</div>"), | |
gr.update(choices=[], value=[]), | |
gr.update(value=False) | |
) | |
def switch_model(model_choice): | |
"""Function to switch between models""" | |
return gr.update(visible=False), gr.update(visible=True), model_choice | |
def create_rag_context_display(query, selected_docs, top_k): | |
"""RAG ์ปจํ ์คํธ ํ์์ฉ HTML ์์ฑ""" | |
if not selected_docs: | |
return "" | |
doc_ids = [doc.split(":")[0] for doc in selected_docs] | |
chunks = rag_system.search_relevant_chunks(query, doc_ids, top_k) | |
if not chunks: | |
return "" | |
html = "<div class='rag-context'><strong>๐ ์ฐธ๊ณ ๋ฌธ์:</strong><br>" | |
for i, chunk in enumerate(chunks, 1): | |
html += f"<br>{i}. {chunk['doc_name']} (์ ์ฌ๋: {chunk['similarity']:.2f})<br>" | |
html += f"<small>{chunk['content'][:200]}...</small><br>" | |
html += "</div>" | |
return html | |
# Main interface | |
with gr.Blocks(fill_height=True, theme="Nymbo/Nymbo_Theme", css=custom_css) as demo: | |
# JavaScript to handle message passing | |
gr.HTML(""" | |
<script> | |
function sendToModel(processedMsg) { | |
// This function would send the processed message to the model | |
console.log("Sending to model:", processedMsg); | |
} | |
</script> | |
""") | |
with gr.Row(): | |
# Sidebar | |
with gr.Column(scale=1): | |
with gr.Group(elem_classes="main-container"): | |
gr.Markdown("# ๐ Inference Provider + RAG") | |
gr.Markdown( | |
"OpenAI GPT-OSS models with PDF RAG support. " | |
"Sign in with your Hugging Face account to use this API." | |
) | |
# Model selection | |
model_dropdown = gr.Dropdown( | |
choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], | |
value="openai/gpt-oss-120b", | |
label="๐ Select Model", | |
info="Choose between different model sizes" | |
) | |
# Login button | |
login_button = gr.LoginButton("Sign in with Hugging Face", size="lg") | |
# Reload button to apply model change | |
reload_btn = gr.Button("๐ Apply Model Change", variant="primary", size="lg") | |
# RAG Settings | |
with gr.Accordion("๐ PDF RAG Settings", open=True): | |
pdf_upload = gr.File( | |
label="Upload PDF", | |
file_types=[".pdf"], | |
type="filepath" | |
) | |
upload_status = gr.HTML( | |
value="<div class='pdf-status pdf-info'>๐ค PDF๋ฅผ ์ ๋ก๋ํ์ฌ ๋ฌธ์ ๊ธฐ๋ฐ ๋ต๋ณ์ ๋ฐ์ผ์ธ์</div>" | |
) | |
document_list = gr.CheckboxGroup( | |
choices=[], | |
label="๐ ์ ๋ก๋๋ ๋ฌธ์", | |
info="์ฐธ๊ณ ํ ๋ฌธ์๋ฅผ ์ ํํ์ธ์" | |
) | |
clear_btn = gr.Button("๐๏ธ ๋ชจ๋ ๋ฌธ์ ์ญ์ ", size="sm") | |
enable_rag = gr.Checkbox( | |
label="RAG ํ์ฑํ", | |
value=False, | |
info="์ ํํ ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ์ฌ ๋ต๋ณ ์์ฑ" | |
) | |
top_k_chunks = gr.Slider( | |
minimum=1, | |
maximum=5, | |
value=3, | |
step=1, | |
label="์ฐธ์กฐ ์ฒญํฌ ์", | |
info="๋ต๋ณ ์์ฑ์ ์ฐธ๊ณ ํ ๋ฌธ์ ์กฐ๊ฐ ๊ฐ์" | |
) | |
# Additional options | |
with gr.Accordion("โ๏ธ Advanced Options", open=False): | |
gr.Markdown("*These options will be available after model implementation*") | |
temperature = gr.Slider( | |
minimum=0, | |
maximum=2, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
max_tokens = gr.Slider( | |
minimum=1, | |
maximum=4096, | |
value=512, | |
step=1, | |
label="Max Tokens" | |
) | |
# Main chat area | |
with gr.Column(scale=3): | |
with gr.Group(elem_classes="main-container"): | |
gr.Markdown("## ๐ฌ Chat Interface") | |
# RAG ์ํ ํ์ | |
rag_status = gr.HTML( | |
value="<div class='pdf-status pdf-info'>๐ RAG: <strong>๋นํ์ฑํ</strong></div>" | |
) | |
# RAG ์ปจํ ์คํธ ํ์ ์์ญ | |
rag_context_display = gr.HTML(value="", visible=False) | |
# Container for model interfaces | |
with gr.Column(visible=True) as model_120b_container: | |
gr.Markdown("### Model: openai/gpt-oss-120b") | |
# RAG ์ฒ๋ฆฌ๋ฅผ ์ํ ์ปค์คํ ์ธํฐํ์ด์ค | |
with gr.Group(): | |
# ์ฌ์ฉ์ ์ ๋ ฅ ํ ์คํธ๋ฐ์ค | |
user_input = gr.Textbox( | |
label="๋ฉ์์ง ์ ๋ ฅ", | |
placeholder="๋ฌธ์์ ๋ํด ์ง๋ฌธํ๊ฑฐ๋ ์ผ๋ฐ ๋ํ๋ฅผ ์์ํ์ธ์...", | |
lines=2 | |
) | |
with gr.Row(): | |
send_btn = gr.Button("๐ค ์ ์ก", variant="primary") | |
clear_chat_btn = gr.Button("๐๏ธ ๋ํ ์ด๊ธฐํ") | |
# ์๋ณธ ๋ชจ๋ธ ๋ก๋ | |
original_model = gr.load( | |
"models/openai/gpt-oss-120b", | |
accept_token=login_button, | |
provider="fireworks-ai" | |
) | |
with gr.Column(visible=False) as model_20b_container: | |
gr.Markdown("### Model: openai/gpt-oss-20b") | |
with gr.Group(): | |
# ์ฌ์ฉ์ ์ ๋ ฅ ํ ์คํธ๋ฐ์ค (20b์ฉ) | |
user_input_20b = gr.Textbox( | |
label="๋ฉ์์ง ์ ๋ ฅ", | |
placeholder="๋ฌธ์์ ๋ํด ์ง๋ฌธํ๊ฑฐ๋ ์ผ๋ฐ ๋ํ๋ฅผ ์์ํ์ธ์...", | |
lines=2 | |
) | |
with gr.Row(): | |
send_btn_20b = gr.Button("๐ค ์ ์ก", variant="primary") | |
clear_chat_btn_20b = gr.Button("๐๏ธ ๋ํ ์ด๊ธฐํ") | |
# ์๋ณธ ๋ชจ๋ธ ๋ก๋ | |
original_model_20b = gr.load( | |
"models/openai/gpt-oss-20b", | |
accept_token=login_button, | |
provider="fireworks-ai" | |
) | |
# Event Handlers | |
# PDF ์ ๋ก๋ | |
pdf_upload.upload( | |
fn=upload_pdf, | |
inputs=[pdf_upload], | |
outputs=[upload_status, document_list, enable_rag] | |
) | |
# ๋ฌธ์ ์ญ์ | |
clear_btn.click( | |
fn=clear_documents, | |
outputs=[upload_status, document_list, enable_rag] | |
) | |
# RAG ์ํ ์ ๋ฐ์ดํธ | |
enable_rag.change( | |
fn=lambda x: gr.update( | |
value=f"<div class='pdf-status pdf-info'>๐ RAG: <strong>{'ํ์ฑํ' if x else '๋นํ์ฑํ'}</strong></div>" | |
), | |
inputs=[enable_rag], | |
outputs=[rag_status] | |
) | |
# ๋ชจ๋ธ ์ ํ | |
reload_btn.click( | |
fn=switch_model, | |
inputs=[model_dropdown], | |
outputs=[model_120b_container, model_20b_container, current_model] | |
).then( | |
fn=lambda: gr.Info("Model switched successfully!"), | |
inputs=[], | |
outputs=[] | |
) | |
# Update visibility based on dropdown selection | |
def update_visibility(model_choice): | |
if model_choice == "openai/gpt-oss-120b": | |
return gr.update(visible=True), gr.update(visible=False) | |
else: | |
return gr.update(visible=False), gr.update(visible=True) | |
model_dropdown.change( | |
fn=update_visibility, | |
inputs=[model_dropdown], | |
outputs=[model_120b_container, model_20b_container] | |
) | |
# ๋ฉ์์ง ์ ์ก ์ฒ๋ฆฌ (RAG ํฌํจ) | |
def process_message(message, enable_rag, selected_docs, top_k): | |
"""๋ฉ์์ง๋ฅผ RAG๋ก ์ฒ๋ฆฌํ์ฌ ๋ชจ๋ธ์ ์ ์ก""" | |
if enable_rag and selected_docs: | |
doc_ids = [doc.split(":")[0] for doc in selected_docs] | |
enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k) | |
context_html = create_rag_context_display(message, selected_docs, top_k) | |
return enhanced_message, gr.update(value=context_html, visible=True) | |
else: | |
return message, gr.update(value="", visible=False) | |
# 120b ๋ชจ๋ธ์ฉ ์ด๋ฒคํธ | |
send_btn.click( | |
fn=process_message, | |
inputs=[user_input, enable_rag, document_list, top_k_chunks], | |
outputs=[user_input, rag_context_display] | |
) | |
user_input.submit( | |
fn=process_message, | |
inputs=[user_input, enable_rag, document_list, top_k_chunks], | |
outputs=[user_input, rag_context_display] | |
) | |
# 20b ๋ชจ๋ธ์ฉ ์ด๋ฒคํธ | |
send_btn_20b.click( | |
fn=process_message, | |
inputs=[user_input_20b, enable_rag, document_list, top_k_chunks], | |
outputs=[user_input_20b, rag_context_display] | |
) | |
user_input_20b.submit( | |
fn=process_message, | |
inputs=[user_input_20b, enable_rag, document_list, top_k_chunks], | |
outputs=[user_input_20b, rag_context_display] | |
) | |
demo.launch() |