Spaces:
Running
Running
import os | |
import gradio as gr | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
import faiss | |
import numpy as np | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import fitz # PyMuPDF | |
from huggingface_hub import login | |
# Authenticate with Hugging Face to access gated models | |
hf_token = os.environ.get("HUGGINGFACE_TOKEN") | |
if hf_token is None: | |
raise ValueError("Please set the HUGGINGFACE_TOKEN environment variable") | |
login(token=hf_token) | |
# Load embedding model | |
embed_model = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
# Load LLM model and tokenizer with 4bit quantization | |
model_id = "mistralai/Mistral-7B-Instruct-v0.1" | |
tokenizer = AutoTokenizer.from_pretrained(model_id, use_auth_token=hf_token) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
device_map="auto", | |
load_in_4bit=True, | |
use_auth_token=hf_token | |
) | |
llm = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
# Globals for FAISS index and document texts | |
index = None | |
doc_texts = [] | |
# PDF/Text extraction | |
def extract_text(file): | |
if file.name.endswith(".pdf"): | |
text = "" | |
doc = fitz.open(file.name) | |
for page in doc: | |
text += page.get_text() | |
return text | |
elif file.name.endswith(".txt"): | |
return file.read().decode("utf-8") | |
else: | |
return "β Invalid file type." | |
# File processing: chunk text, create embeddings, build FAISS index | |
def process_file(file): | |
global index, doc_texts | |
text = extract_text(file) | |
if text.startswith("β"): | |
return text | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50) | |
doc_texts = text_splitter.split_text(text) | |
embeddings = embed_model.encode(doc_texts) | |
dim = embeddings.shape[1] | |
index = faiss.IndexFlatL2(dim) | |
index.add(np.array(embeddings)) | |
return "β File processed successfully. You can now ask questions!" | |
# Generate answer using retrieved context and LLM | |
def generate_answer(question): | |
global index, doc_texts | |
if index is None or len(doc_texts) == 0: | |
return "β οΈ Please upload and process a file first." | |
question_embedding = embed_model.encode([question]) | |
_, I = index.search(np.array(question_embedding), k=3) | |
context = "\n".join([doc_texts[i] for i in I[0]]) | |
prompt = f"""[System: You are a helpful assistant. Answer strictly based on the context.] | |
Context: | |
{context} | |
Question: {question} | |
Answer:""" | |
result = llm(prompt, max_new_tokens=300, do_sample=True, temperature=0.7) | |
return result[0]["generated_text"].split("Answer:")[-1].strip() | |
# Gradio UI | |
with gr.Blocks(title="RAG Chatbot") as demo: | |
gr.Markdown("## π RAG Chatbot - Upload PDF/TXT and Ask Questions") | |
with gr.Row(): | |
file_input = gr.File(label="π Upload .pdf or .txt", file_types=[".pdf", ".txt"]) | |
upload_status = gr.Textbox(label="π₯ Upload Status", interactive=False) | |
with gr.Row(): | |
question_box = gr.Textbox(label="β Ask a Question", placeholder="Type your question here...") | |
answer_box = gr.Textbox(label="π¬ Answer", interactive=False) | |
file_input.change(fn=process_file, inputs=file_input, outputs=upload_status) | |
question_box.submit(fn=generate_answer, inputs=question_box, outputs=answer_box) | |
demo.launch() | |