Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import os | |
from typing import List, Dict, Any, Optional | |
import hashlib | |
import json | |
from datetime import datetime | |
# PDF ์ฒ๋ฆฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ | |
import pymupdf # PyMuPDF | |
import chromadb | |
from chromadb.utils import embedding_functions | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from sentence_transformers import SentenceTransformer | |
import numpy as np | |
# Custom CSS (๊ธฐ์กด CSS + ์ถ๊ฐ ์คํ์ผ) | |
custom_css = """ | |
.gradio-container { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%); | |
background-size: 400% 400%; | |
animation: gradient-animation 15s ease infinite; | |
min-height: 100vh; | |
} | |
@keyframes gradient-animation { | |
0% { background-position: 0% 50%; } | |
50% { background-position: 100% 50%; } | |
100% { background-position: 0% 50%; } | |
} | |
.dark .gradio-container { | |
background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%); | |
background-size: 400% 400%; | |
animation: gradient-animation 15s ease infinite; | |
} | |
.main-container { | |
background-color: rgba(255, 255, 255, 0.95); | |
backdrop-filter: blur(10px); | |
border-radius: 20px; | |
padding: 20px; | |
box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37); | |
border: 1px solid rgba(255, 255, 255, 0.18); | |
margin: 10px; | |
} | |
.dark .main-container { | |
background-color: rgba(30, 30, 30, 0.95); | |
border: 1px solid rgba(255, 255, 255, 0.1); | |
} | |
.pdf-status { | |
padding: 10px; | |
border-radius: 10px; | |
margin: 10px 0; | |
font-size: 0.9em; | |
} | |
.pdf-success { | |
background-color: rgba(52, 211, 153, 0.2); | |
border: 1px solid rgba(52, 211, 153, 0.5); | |
color: #10b981; | |
} | |
.pdf-error { | |
background-color: rgba(248, 113, 113, 0.2); | |
border: 1px solid rgba(248, 113, 113, 0.5); | |
color: #ef4444; | |
} | |
.pdf-processing { | |
background-color: rgba(251, 191, 36, 0.2); | |
border: 1px solid rgba(251, 191, 36, 0.5); | |
color: #f59e0b; | |
} | |
.document-card { | |
padding: 12px; | |
margin: 8px 0; | |
border-radius: 8px; | |
background: rgba(255, 255, 255, 0.1); | |
border: 1px solid rgba(255, 255, 255, 0.2); | |
cursor: pointer; | |
transition: all 0.3s ease; | |
} | |
.document-card:hover { | |
background: rgba(255, 255, 255, 0.2); | |
transform: translateX(5px); | |
} | |
""" | |
class PDFRAGSystem: | |
"""PDF ๊ธฐ๋ฐ RAG ์์คํ ํด๋์ค""" | |
def __init__(self): | |
self.documents = {} | |
self.embedder = None | |
self.vector_store = None | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len, | |
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] | |
) | |
self.initialize_vector_store() | |
def initialize_vector_store(self): | |
"""๋ฒกํฐ ์ ์ฅ์ ์ด๊ธฐํ""" | |
try: | |
# Sentence Transformer ๋ชจ๋ธ ๋ก๋ | |
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
# ChromaDB ํด๋ผ์ด์ธํธ ์ด๊ธฐํ | |
self.chroma_client = chromadb.Client() | |
self.collection = self.chroma_client.create_collection( | |
name="pdf_documents", | |
metadata={"hnsw:space": "cosine"} | |
) | |
except Exception as e: | |
print(f"Vector store initialization error: {e}") | |
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: | |
"""PDF์์ ํ ์คํธ ์ถ์ถ""" | |
try: | |
doc = pymupdf.open(pdf_path) | |
text_content = [] | |
metadata = { | |
"title": doc.metadata.get("title", "Untitled"), | |
"author": doc.metadata.get("author", "Unknown"), | |
"pages": len(doc), | |
"creation_date": doc.metadata.get("creationDate", ""), | |
"file_name": os.path.basename(pdf_path) | |
} | |
for page_num, page in enumerate(doc): | |
text = page.get_text() | |
if text.strip(): | |
text_content.append({ | |
"page": page_num + 1, | |
"content": text | |
}) | |
doc.close() | |
return { | |
"metadata": metadata, | |
"pages": text_content, | |
"full_text": "\n\n".join([p["content"] for p in text_content]) | |
} | |
except Exception as e: | |
raise Exception(f"PDF ์ฒ๋ฆฌ ์ค๋ฅ: {str(e)}") | |
def process_and_index_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: | |
"""PDF ์ฒ๋ฆฌ ๋ฐ ๋ฒกํฐ ์ธ๋ฑ์ฑ""" | |
try: | |
# PDF ํ ์คํธ ์ถ์ถ | |
pdf_data = self.extract_text_from_pdf(pdf_path) | |
# ํ ์คํธ๋ฅผ ์ฒญํฌ๋ก ๋ถํ | |
chunks = self.text_splitter.split_text(pdf_data["full_text"]) | |
# ๊ฐ ์ฒญํฌ์ ๋ํ ์๋ฒ ๋ฉ ์์ฑ | |
embeddings = self.embedder.encode(chunks) | |
# ChromaDB์ ์ ์ฅ | |
ids = [f"{doc_id}_{i}" for i in range(len(chunks))] | |
metadatas = [ | |
{ | |
"doc_id": doc_id, | |
"chunk_index": i, | |
"source": pdf_data["metadata"]["file_name"], | |
"page_count": pdf_data["metadata"]["pages"] | |
} | |
for i in range(len(chunks)) | |
] | |
self.collection.add( | |
ids=ids, | |
embeddings=embeddings.tolist(), | |
documents=chunks, | |
metadatas=metadatas | |
) | |
# ๋ฌธ์ ์ ๋ณด ์ ์ฅ | |
self.documents[doc_id] = { | |
"metadata": pdf_data["metadata"], | |
"chunk_count": len(chunks), | |
"upload_time": datetime.now().isoformat() | |
} | |
return { | |
"success": True, | |
"doc_id": doc_id, | |
"chunks": len(chunks), | |
"pages": pdf_data["metadata"]["pages"], | |
"title": pdf_data["metadata"]["title"] | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"error": str(e) | |
} | |
def search_relevant_chunks(self, query: str, top_k: int = 5) -> List[Dict]: | |
"""์ฟผ๋ฆฌ์ ๊ด๋ จ๋ ์ฒญํฌ ๊ฒ์""" | |
try: | |
# ์ฟผ๋ฆฌ ์๋ฒ ๋ฉ ์์ฑ | |
query_embedding = self.embedder.encode([query]) | |
# ์ ์ฌํ ๋ฌธ์ ๊ฒ์ | |
results = self.collection.query( | |
query_embeddings=query_embedding.tolist(), | |
n_results=top_k | |
) | |
if results and results['documents']: | |
chunks = [] | |
for i in range(len(results['documents'][0])): | |
chunks.append({ | |
"content": results['documents'][0][i], | |
"metadata": results['metadatas'][0][i], | |
"distance": results['distances'][0][i] if 'distances' in results else None | |
}) | |
return chunks | |
return [] | |
except Exception as e: | |
print(f"Search error: {e}") | |
return [] | |
def generate_rag_prompt(self, query: str, context_chunks: List[Dict]) -> str: | |
"""RAG ํ๋กฌํํธ ์์ฑ""" | |
context = "\n\n---\n\n".join([ | |
f"[์ถ์ฒ: {chunk['metadata']['source']}, ์ฒญํฌ {chunk['metadata']['chunk_index']+1}]\n{chunk['content']}" | |
for chunk in context_chunks | |
]) | |
prompt = f"""๋ค์ ๋ฌธ์ ๋ด์ฉ์ ์ฐธ๊ณ ํ์ฌ ์ง๋ฌธ์ ๋ต๋ณํด์ฃผ์ธ์. | |
๋ต๋ณ์ ์ ๊ณต๋ ๋ฌธ์ ๋ด์ฉ์ ๋ฐํ์ผ๋ก ์์ฑํ๋, ํ์์ ์ถ๊ฐ ์ค๋ช ์ ํฌํจํ ์ ์์ต๋๋ค. | |
๋ฌธ์์์ ๊ด๋ จ ์ ๋ณด๋ฅผ ์ฐพ์ ์ ์๋ ๊ฒฝ์ฐ, ๊ทธ ์ฌ์ค์ ๋ช ์ํด์ฃผ์ธ์. | |
๐ ์ฐธ๊ณ ๋ฌธ์: | |
{context} | |
โ ์ง๋ฌธ: {query} | |
๐ก ๋ต๋ณ:""" | |
return prompt | |
# RAG ์์คํ ์ธ์คํด์ค ์์ฑ | |
rag_system = PDFRAGSystem() | |
# State variables | |
current_model = gr.State("openai/gpt-oss-120b") | |
uploaded_documents = gr.State({}) | |
rag_enabled = gr.State(False) | |
def upload_pdf(file): | |
"""PDF ํ์ผ ์ ๋ก๋ ์ฒ๋ฆฌ""" | |
if file is None: | |
return gr.update(value="ํ์ผ์ ์ ํํด์ฃผ์ธ์"), gr.update(choices=[]), gr.update(value=False) | |
try: | |
# ํ์ผ ํด์๋ฅผ ID๋ก ์ฌ์ฉ | |
with open(file.name, 'rb') as f: | |
file_hash = hashlib.md5(f.read()).hexdigest()[:8] | |
doc_id = f"doc_{file_hash}" | |
# PDF ์ฒ๋ฆฌ ๋ฐ ์ธ๋ฑ์ฑ | |
result = rag_system.process_and_index_pdf(file.name, doc_id) | |
if result["success"]: | |
status_html = f""" | |
<div class="pdf-status pdf-success"> | |
โ PDF ์ ๋ก๋ ์ฑ๊ณต!<br> | |
๐ ์ ๋ชฉ: {result.get('title', 'Unknown')}<br> | |
๐ ํ์ด์ง: {result['pages']}ํ์ด์ง<br> | |
๐ ์์ฑ๋ ์ฒญํฌ: {result['chunks']}๊ฐ<br> | |
๐ ๋ฌธ์ ID: {doc_id} | |
</div> | |
""" | |
# ๋ฌธ์ ๋ชฉ๋ก ์ ๋ฐ์ดํธ | |
doc_list = list(rag_system.documents.keys()) | |
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" | |
for doc_id in doc_list] | |
return status_html, gr.update(choices=doc_choices, value=doc_choices), gr.update(value=True) | |
else: | |
status_html = f""" | |
<div class="pdf-status pdf-error"> | |
โ PDF ์ ๋ก๋ ์คํจ<br> | |
์ค๋ฅ: {result['error']} | |
</div> | |
""" | |
return status_html, gr.update(choices=[]), gr.update(value=False) | |
except Exception as e: | |
status_html = f""" | |
<div class="pdf-status pdf-error"> | |
โ ์ค๋ฅ ๋ฐ์: {str(e)} | |
</div> | |
""" | |
return status_html, gr.update(choices=[]), gr.update(value=False) | |
def clear_documents(): | |
"""์ ๋ก๋๋ ๋ฌธ์ ์ด๊ธฐํ""" | |
try: | |
# ChromaDB ์ปฌ๋ ์ ์ฌ์์ฑ | |
rag_system.chroma_client.delete_collection("pdf_documents") | |
rag_system.collection = rag_system.chroma_client.create_collection( | |
name="pdf_documents", | |
metadata={"hnsw:space": "cosine"} | |
) | |
rag_system.documents = {} | |
return gr.update(value="<div class='pdf-status pdf-success'>โ ๋ชจ๋ ๋ฌธ์๊ฐ ์ญ์ ๋์์ต๋๋ค</div>"), gr.update(choices=[], value=[]), gr.update(value=False) | |
except Exception as e: | |
return gr.update(value=f"<div class='pdf-status pdf-error'>โ ์ญ์ ์คํจ: {str(e)}</div>"), gr.update(), gr.update() | |
def process_with_rag(message: str, enable_rag: bool, selected_docs: List[str], top_k: int = 5): | |
"""RAG๋ฅผ ํ์ฉํ ๋ฉ์์ง ์ฒ๋ฆฌ""" | |
if not enable_rag or not selected_docs: | |
return message # RAG ๋นํ์ฑํ์ ์๋ณธ ๋ฉ์์ง ๋ฐํ | |
try: | |
# ๊ด๋ จ ์ฒญํฌ ๊ฒ์ | |
relevant_chunks = rag_system.search_relevant_chunks(message, top_k=top_k) | |
if relevant_chunks: | |
# ์ ํ๋ ๋ฌธ์์ ์ฒญํฌ๋ง ํํฐ๋ง | |
selected_doc_ids = [doc.split(":")[0] for doc in selected_docs] | |
filtered_chunks = [ | |
chunk for chunk in relevant_chunks | |
if chunk['metadata']['doc_id'] in selected_doc_ids | |
] | |
if filtered_chunks: | |
# RAG ํ๋กฌํํธ ์์ฑ | |
rag_prompt = rag_system.generate_rag_prompt(message, filtered_chunks[:top_k]) | |
return rag_prompt | |
return message | |
except Exception as e: | |
print(f"RAG processing error: {e}") | |
return message | |
def switch_model(model_choice): | |
"""๋ชจ๋ธ ์ ํ ํจ์""" | |
return gr.update(visible=False), gr.update(visible=True), model_choice | |
# Gradio ์ธํฐํ์ด์ค | |
with gr.Blocks(fill_height=True, theme="Nymbo/Nymbo_Theme", css=custom_css) as demo: | |
with gr.Row(): | |
# ์ฌ์ด๋๋ฐ | |
with gr.Column(scale=1): | |
with gr.Group(elem_classes="main-container"): | |
gr.Markdown("# ๐ AI Chat with RAG") | |
gr.Markdown( | |
"PDF ๋ฌธ์๋ฅผ ์ ๋ก๋ํ์ฌ AI๊ฐ ๋ฌธ์ ๋ด์ฉ์ ์ฐธ๊ณ ํด ๋ต๋ณํ๋๋ก ํ ์ ์์ต๋๋ค." | |
) | |
# ๋ชจ๋ธ ์ ํ | |
model_dropdown = gr.Dropdown( | |
choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], | |
value="openai/gpt-oss-120b", | |
label="๐ ๋ชจ๋ธ ์ ํ" | |
) | |
login_button = gr.LoginButton("Sign in with Hugging Face", size="lg") | |
reload_btn = gr.Button("๐ ๋ชจ๋ธ ๋ณ๊ฒฝ ์ ์ฉ", variant="primary", size="lg") | |
# RAG ์ค์ | |
with gr.Accordion("๐ PDF RAG ์ค์ ", open=True): | |
pdf_upload = gr.File( | |
label="PDF ์ ๋ก๋", | |
file_types=[".pdf"], | |
type="filepath" | |
) | |
upload_status = gr.HTML( | |
value="<div class='pdf-status'>PDF๋ฅผ ์ ๋ก๋ํ์ฌ RAG๋ฅผ ํ์ฑํํ์ธ์</div>" | |
) | |
document_list = gr.CheckboxGroup( | |
choices=[], | |
label="๐ ์ ๋ก๋๋ ๋ฌธ์", | |
info="์ง๋ฌธ์ ์ฐธ๊ณ ํ ๋ฌธ์๋ฅผ ์ ํํ์ธ์" | |
) | |
with gr.Row(): | |
clear_btn = gr.Button("๐๏ธ ๋ชจ๋ ๋ฌธ์ ์ญ์ ", size="sm") | |
refresh_btn = gr.Button("๐ ๋ชฉ๋ก ์๋ก๊ณ ์นจ", size="sm") | |
enable_rag = gr.Checkbox( | |
label="RAG ํ์ฑํ", | |
value=False, | |
info="๋ฌธ์ ๊ธฐ๋ฐ ๋ต๋ณ ์์ฑ ํ์ฑํ" | |
) | |
with gr.Accordion("โ๏ธ RAG ๊ณ ๊ธ ์ค์ ", open=False): | |
top_k_chunks = gr.Slider( | |
minimum=1, | |
maximum=10, | |
value=5, | |
step=1, | |
label="์ฐธ์กฐํ ์ฒญํฌ ์", | |
info="๋ต๋ณ ์์ฑ์ ์ฐธ๊ณ ํ ๋ฌธ์ ์ฒญํฌ์ ๊ฐ์" | |
) | |
chunk_size = gr.Slider( | |
minimum=500, | |
maximum=2000, | |
value=1000, | |
step=100, | |
label="์ฒญํฌ ํฌ๊ธฐ", | |
info="๋ฌธ์๋ฅผ ๋ถํ ํ๋ ์ฒญํฌ์ ํฌ๊ธฐ (๋ฌธ์ ์)" | |
) | |
# ๊ณ ๊ธ ์ต์ | |
with gr.Accordion("โ๏ธ ๋ชจ๋ธ ์ค์ ", open=False): | |
temperature = gr.Slider( | |
minimum=0, | |
maximum=2, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
max_tokens = gr.Slider( | |
minimum=1, | |
maximum=4096, | |
value=512, | |
step=1, | |
label="Max Tokens" | |
) | |
# ๋ฉ์ธ ์ฑํ ์์ญ | |
with gr.Column(scale=3): | |
with gr.Group(elem_classes="main-container"): | |
gr.Markdown("## ๐ฌ Chat Interface") | |
# RAG ์ํ ํ์ | |
with gr.Row(): | |
rag_status = gr.HTML( | |
value="<div style='padding: 10px; background: rgba(59, 130, 246, 0.1); border-radius: 8px; margin-bottom: 10px;'>๐ RAG: <strong>๋นํ์ฑํ</strong></div>" | |
) | |
# ๋ชจ๋ธ ์ธํฐํ์ด์ค ์ปจํ ์ด๋ | |
with gr.Column(visible=True) as model_120b_container: | |
gr.Markdown("### Model: openai/gpt-oss-120b") | |
# ์ค์ ๋ชจ๋ธ ๋ก๋๋ gr.load()๋ก ์ฒ๋ฆฌ | |
chatbot_120b = gr.Chatbot(height=400) | |
msg_box_120b = gr.Textbox( | |
label="๋ฉ์์ง ์ ๋ ฅ", | |
placeholder="PDF ๋ด์ฉ์ ๋ํด ์ง๋ฌธํด๋ณด์ธ์...", | |
lines=2 | |
) | |
with gr.Row(): | |
send_btn_120b = gr.Button("๐ค ์ ์ก", variant="primary") | |
clear_btn_120b = gr.Button("๐๏ธ ๋ํ ์ด๊ธฐํ") | |
with gr.Column(visible=False) as model_20b_container: | |
gr.Markdown("### Model: openai/gpt-oss-20b") | |
chatbot_20b = gr.Chatbot(height=400) | |
msg_box_20b = gr.Textbox( | |
label="๋ฉ์์ง ์ ๋ ฅ", | |
placeholder="PDF ๋ด์ฉ์ ๋ํด ์ง๋ฌธํด๋ณด์ธ์...", | |
lines=2 | |
) | |
with gr.Row(): | |
send_btn_20b = gr.Button("๐ค ์ ์ก", variant="primary") | |
clear_btn_20b = gr.Button("๐๏ธ ๋ํ ์ด๊ธฐํ") | |
# ์ด๋ฒคํธ ํธ๋ค๋ฌ | |
# PDF ์ ๋ก๋ ์ฒ๋ฆฌ | |
pdf_upload.upload( | |
fn=upload_pdf, | |
inputs=[pdf_upload], | |
outputs=[upload_status, document_list, enable_rag] | |
) | |
# ๋ฌธ์ ์ด๊ธฐํ | |
clear_btn.click( | |
fn=clear_documents, | |
outputs=[upload_status, document_list, enable_rag] | |
) | |
# RAG ์ํ ์ ๋ฐ์ดํธ | |
enable_rag.change( | |
fn=lambda x: gr.update( | |
value=f"<div style='padding: 10px; background: rgba(59, 130, 246, 0.1); border-radius: 8px; margin-bottom: 10px;'>๐ RAG: <strong>{'ํ์ฑํ' if x else '๋นํ์ฑํ'}</strong></div>" | |
), | |
inputs=[enable_rag], | |
outputs=[rag_status] | |
) | |
# ๋ชจ๋ธ ์ ํ | |
reload_btn.click( | |
fn=switch_model, | |
inputs=[model_dropdown], | |
outputs=[model_120b_container, model_20b_container, current_model] | |
).then( | |
fn=lambda: gr.Info("๋ชจ๋ธ์ด ์ฑ๊ณต์ ์ผ๋ก ์ ํ๋์์ต๋๋ค!"), | |
inputs=[], | |
outputs=[] | |
) | |
# ์ฑํ ๊ธฐ๋ฅ (RAG ํตํฉ) | |
def chat_with_rag(message, history, enable_rag, selected_docs, top_k): | |
"""RAG๋ฅผ ํ์ฉํ ์ฑํ """ | |
# RAG ์ฒ๋ฆฌ | |
processed_message = process_with_rag(message, enable_rag, selected_docs, top_k) | |
# ์ฌ๊ธฐ์ ์ค์ ๋ชจ๋ธ API ํธ์ถ ์ฝ๋๊ฐ ๋ค์ด๊ฐ์ผ ํจ | |
# ํ์ฌ๋ ์์ ์๋ต | |
if enable_rag and selected_docs: | |
response = f"[RAG ํ์ฑํ] ์ ํ๋ {len(selected_docs)}๊ฐ ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ์ฌ ๋ต๋ณํฉ๋๋ค:\n\n{processed_message[:200]}..." | |
else: | |
response = f"[์ผ๋ฐ ๋ชจ๋] {message}์ ๋ํ ๋ต๋ณ์ ๋๋ค." | |
history.append((message, response)) | |
return "", history | |
# 120b ๋ชจ๋ธ ์ฑํ | |
msg_box_120b.submit( | |
fn=chat_with_rag, | |
inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks], | |
outputs=[msg_box_120b, chatbot_120b] | |
) | |
send_btn_120b.click( | |
fn=chat_with_rag, | |
inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks], | |
outputs=[msg_box_120b, chatbot_120b] | |
) | |
clear_btn_120b.click( | |
lambda: ([], ""), | |
outputs=[chatbot_120b, msg_box_120b] | |
) | |
# 20b ๋ชจ๋ธ ์ฑํ | |
msg_box_20b.submit( | |
fn=chat_with_rag, | |
inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks], | |
outputs=[msg_box_20b, chatbot_20b] | |
) | |
send_btn_20b.click( | |
fn=chat_with_rag, | |
inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks], | |
outputs=[msg_box_20b, chatbot_20b] | |
) | |
clear_btn_20b.click( | |
lambda: ([], ""), | |
outputs=[chatbot_20b, msg_box_20b] | |
) | |
if __name__ == "__main__": | |
demo.launch() |