File size: 3,540 Bytes
097081a
9b56ad1
3f106f4
9b56ad1
 
3f106f4
 
 
9b56ad1
097081a
9b56ad1
2074ed8
097081a
3f106f4
 
097081a
 
2074ed8
9b56ad1
 
378f4da
9b56ad1
2074ed8
9b56ad1
 
378f4da
 
2074ed8
9b56ad1
 
 
378f4da
9b56ad1
 
 
378f4da
 
3f106f4
378f4da
 
 
 
3f106f4
9b56ad1
 
378f4da
 
 
9b56ad1
2074ed8
3f106f4
9b56ad1
378f4da
 
9b56ad1
378f4da
9b56ad1
 
 
3f106f4
 
 
 
9b56ad1
 
3f106f4
9b56ad1
378f4da
9b56ad1
378f4da
9b56ad1
097081a
3f106f4
9b56ad1
 
3f106f4
 
 
9b56ad1
378f4da
2074ed8
9b56ad1
 
 
 
378f4da
9b56ad1
3f106f4
 
9b56ad1
2074ed8
378f4da
 
9b56ad1
 
378f4da
 
9b56ad1
 
378f4da
 
9b56ad1
 
378f4da
9b56ad1
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import gradio as gr
import fitz  # PyMuPDF
import faiss
import numpy as np
from io import BytesIO
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from langchain.text_splitter import RecursiveCharacterTextSplitter
from huggingface_hub import login

# Authenticate with Hugging Face
hf_token = os.environ.get("HUGGINGFACE_TOKEN")
if not hf_token:
    raise ValueError("⚠️ Please set the HUGGINGFACE_TOKEN environment variable.")
login(token=hf_token)

# Load embedding model
embed_model = SentenceTransformer("BAAI/bge-base-en-v1.5")

# Load Mistral LLM (CPU compatible)
model_id = "mistralai/Mistral-7B-Instruct-v0.1"
tokenizer = AutoTokenizer.from_pretrained(model_id, token=hf_token)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map={"": "cpu"},  # Force CPU
    torch_dtype="auto",      # Safe for CPU
    token=hf_token
)
llm = pipeline("text-generation", model=model, tokenizer=tokenizer)

# Global state
index = None
doc_texts = []

# Extract text from uploaded file
def extract_text(file_obj):
    text = ""
    file_path = file_obj.name
    if file_path.endswith(".pdf"):
        with open(file_path, "rb") as f:
            pdf_stream = BytesIO(f.read())
        doc = fitz.open(stream=pdf_stream, filetype="pdf")
        for page in doc:
            text += page.get_text()
    elif file_path.endswith(".txt"):
        with open(file_path, "r", encoding="utf-8") as f:
            text = f.read()
    else:
        return "❌ Unsupported file type."
    return text

# Process file and build FAISS index
def process_file(file_obj):
    global index, doc_texts
    text = extract_text(file_obj)
    if text.startswith("❌"):
        return text

    splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50)
    doc_texts = splitter.split_text(text)

    embeddings = embed_model.encode(doc_texts, convert_to_numpy=True)
    dim = embeddings.shape[1]
    index = faiss.IndexFlatL2(dim)
    index.add(embeddings)

    return "βœ… File processed successfully. You can now ask questions!"

# Generate answer from FAISS context + LLM
def generate_answer(question):
    global index, doc_texts
    if index is None or not doc_texts:
        return "⚠️ Please upload and process a file first."

    question_emb = embed_model.encode([question], convert_to_numpy=True)
    _, I = index.search(question_emb, k=3)
    context = "\n".join([doc_texts[i] for i in I[0]])

    prompt = f"""<s>[INST] You are a helpful assistant. Use the context below to answer the question.

Context:
{context}

Question: {question}
Answer: [/INST]</s>"""

    response = llm(prompt, max_new_tokens=300, do_sample=True, temperature=0.7)
    return response[0]["generated_text"].split("Answer:")[-1].strip()

# Gradio UI
with gr.Blocks(title="RAG Chatbot with Mistral-7B (CPU-Friendly)") as demo:
    gr.Markdown("## πŸ€– Upload a PDF/TXT file and ask questions using Mistral-7B")

    with gr.Row():
        file_input = gr.File(label="πŸ“ Upload PDF or TXT", file_types=[".pdf", ".txt"])
        upload_status = gr.Textbox(label="πŸ“₯ Upload Status", interactive=False)

    with gr.Row():
        question_input = gr.Textbox(label="❓ Ask a Question")
        answer_output = gr.Textbox(label="πŸ’¬ Answer", interactive=False)

    file_input.change(fn=process_file, inputs=file_input, outputs=upload_status)
    question_input.submit(fn=generate_answer, inputs=question_input, outputs=answer_output)

demo.launch()