Spaces:
Running
Running
import os | |
import gradio as gr | |
import fitz # PyMuPDF | |
import faiss | |
import numpy as np | |
from io import BytesIO | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from huggingface_hub import login | |
# 1. Authenticate HuggingFace | |
hf_token = os.environ.get("HUGGINGFACE_TOKEN") | |
if not hf_token: | |
raise ValueError("β οΈ Please set the HUGGINGFACE_TOKEN environment variable.") | |
login(token=hf_token) | |
# 2. Load embedding model | |
embed_model = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
# 3. Load LLM (Mistral 7B Instruct with 4-bit quantization) | |
model_id = "mistralai/Mistral-7B-Instruct-v0.1" | |
tokenizer = AutoTokenizer.from_pretrained(model_id, use_auth_token=hf_token) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
device_map="auto", | |
load_in_4bit=True, | |
use_auth_token=hf_token | |
) | |
llm = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
# 4. Globals | |
index = None | |
doc_texts = [] | |
# 5. Extract text from uploaded file | |
def extract_text(file): | |
text = "" | |
file_bytes = file.read() | |
if file.name.endswith(".pdf"): | |
pdf_stream = BytesIO(file_bytes) | |
doc = fitz.open(stream=pdf_stream, filetype="pdf") | |
for page in doc: | |
text += page.get_text() | |
elif file.name.endswith(".txt"): | |
text = file_bytes.decode("utf-8") | |
else: | |
return "β Unsupported file type. Only PDF and TXT are allowed." | |
return text | |
# 6. Process the file: split text, create embeddings, build FAISS index | |
def process_file(file): | |
global index, doc_texts | |
text = extract_text(file) | |
if text.startswith("β"): | |
return text | |
# Split text | |
splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50) | |
doc_texts = splitter.split_text(text) | |
# Create embeddings | |
embeddings = embed_model.encode(doc_texts, convert_to_numpy=True) | |
# Build FAISS index | |
dim = embeddings.shape[1] | |
index = faiss.IndexFlatL2(dim) | |
index.add(embeddings) | |
return "β File processed successfully. You can now ask questions!" | |
# 7. Generate answer based on question + retrieved context | |
def generate_answer(question): | |
global index, doc_texts | |
if index is None or not doc_texts: | |
return "β οΈ Please upload and process a file first." | |
# Embed the question | |
question_emb = embed_model.encode([question], convert_to_numpy=True) | |
_, I = index.search(question_emb, k=3) | |
# Build context | |
context = "\n".join([doc_texts[i] for i in I[0]]) | |
# Prompt | |
prompt = f"""[System: You are a helpful assistant. Answer strictly based on the context. Do not hallucinate.] | |
Context: | |
{context} | |
Question: {question} | |
Answer:""" | |
# Generate response | |
response = llm(prompt, max_new_tokens=300, do_sample=True, temperature=0.7) | |
return response[0]["generated_text"].split("Answer:")[-1].strip() | |
# 8. Gradio UI | |
with gr.Blocks(title="π§ RAG Chatbot") as demo: | |
gr.Markdown("## π Retrieval-Augmented Generation Chatbot\nUpload a `.pdf` or `.txt` and ask questions from the content.") | |
with gr.Row(): | |
file_input = gr.File(label="π Upload PDF/TXT", file_types=[".pdf", ".txt"]) | |
upload_status = gr.Textbox(label="π₯ Upload Status", interactive=False) | |
with gr.Row(): | |
question_box = gr.Textbox(label="β Ask a Question", placeholder="Type your question here...") | |
answer_box = gr.Textbox(label="π¬ Answer", interactive=False) | |
file_input.change(fn=process_file, inputs=file_input, outputs=upload_status) | |
question_box.submit(fn=generate_answer, inputs=question_box, outputs=answer_box) | |
# 9. Launch the app | |
demo.launch() | |