Niveytha27's picture
Update app.py
b775960 verified
import gradio as gr
import requests
import io
from pypdf import PdfReader
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer, CrossEncoder
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, pipeline as hf_pipeline
from accelerate import Accelerator
from langchain.text_splitter import NLTKTextSplitter
from rank_bm25 import BM25Okapi
import os
import pickle
import nltk
nltk.download('punkt_tab')
# --- Global Variables for Caching ---
index = None
chunks = None
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
generator = None
# --- PDF Processing and Embedding ---
def download_pdf(url):
response = requests.get(url, stream=True)
response.raise_for_status()
return response.content
def custom_chunking(text, delimiter="\n\n"):
"""Splits text based on a specified delimiter."""
return text.split(delimiter)
def extract_text_from_pdf(pdf_bytes, document_id):
"""Extracts text from a PDF, page by page, and then chunks each page."""
pdf_file = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_file)
nltk_splitter = NLTKTextSplitter(chunk_size=500)
extracted_data = []
for page_num, page in enumerate(reader.pages):
page_text = page.extract_text() or ""
clean_text = " ".join(page_text.split())
if clean_text:
words = clean_text.split()
section_header = " ".join(words[:20]) if words else "No Section Name Found"
custom_chunks = custom_chunking(clean_text)
for custom_chunk in custom_chunks:
clean_custom_chunk = " ".join(custom_chunk.split())
if clean_custom_chunk:
nltk_chunks = nltk_splitter.split_text(clean_custom_chunk)
for nltk_chunk in nltk_chunks:
clean_nltk_chunk = " ".join(nltk_chunk.split())
if clean_nltk_chunk:
extracted_data.append({
"document_id": document_id,
"section_header": section_header,
"text": clean_nltk_chunk
})
return extracted_data
def process_single_pdf(url, doc_id):
"""Processes a single PDF."""
pdf_bytes = download_pdf(url)
return extract_text_from_pdf(pdf_bytes, doc_id)
def process_pdfs_parallel(pdf_urls, document_ids):
"""Processes multiple PDFs in parallel."""
all_data = []
with ThreadPoolExecutor() as pdf_executor:
pdf_futures = [pdf_executor.submit(process_single_pdf, url, doc_id) for url, doc_id in zip(pdf_urls, document_ids)]
for future in as_completed(pdf_futures):
all_data.extend(future.result())
return all_data
def create_embeddings_and_index(data):
"""Create Embeddings"""
texts = [item['text'] for item in data]
embeddings = embedding_model.encode(texts)
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(embeddings)
return index, data
# --- Retrieval Functions ---
def bm25_retrieval(query, documents, top_k=10):
tokenized_docs = [doc['text'].split() for doc in documents]
bm25 = BM25Okapi(tokenized_docs)
doc_scores = bm25.get_scores(query.split())
top_indices = np.argsort(doc_scores)[::-1][:top_k]
return [documents[i] for i in top_indices]
def adaptive_retrieval(query, index, chunks, top_k=10):
query_embedding = embedding_model.encode([query], convert_to_numpy=True)
_, indices = index.search(query_embedding, top_k)
vector_results = [chunks[i] for i in indices[0]]
bm25_results = bm25_retrieval(query, chunks, top_k)
combined_results = vector_results + bm25_results
unique_results = []
seen_texts = set()
for result in combined_results:
if result['text'] not in seen_texts:
unique_results.append(result)
seen_texts.add(result['text'])
return unique_results
def rerank(query, results, keyword_weight=0.3, cross_encoder_weight=0.7):
"""Combines keyword-based and cross-encoder reranking."""
# Keyword-based scoring
keywords = query.lower().split()
def score_chunk_keywords(chunk):
text = chunk['text'].lower()
return sum(1 for keyword in keywords if keyword in text)
keyword_scores = [score_chunk_keywords(chunk) for chunk in results]
# Cross-encoder scoring
rerank_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2')
query_results = [[query, f"Document: {result['document_id']}, Section: {result['section_header']}, Text: {result['text']}"] for result in results]
cross_encoder_scores = rerank_model.predict(query_results)
# Combine scores
combined_scores = [(keyword_scores[i] * keyword_weight) + (cross_encoder_scores[i] * cross_encoder_weight) for i in range(len(results))]
# Rank and select top 3
ranked_results = [results[i] for i in np.argsort(combined_scores)[::-1]]
return ranked_results[:3]
def merge_chunks(retrieved_chunks):
"""Merges chunks based on their original order, including metadata."""
merged_text = " ".join([
f"Document: {chunk['document_id']}, Section: {chunk['section_header']}, Text: {chunk['text']}"
for chunk in retrieved_chunks
])
return merged_text
# --- Confidence Calculation ---
def calculate_confidence(query, context, answer):
"""Calculates confidence score based on question-context and context-answer similarity."""
query_embedding = embedding_model.encode([query], convert_to_numpy=True)
context_embedding = embedding_model.encode([context], convert_to_numpy=True)
answer_embedding = embedding_model.encode([answer], convert_to_numpy=True)
query_context_similarity = np.dot(query_embedding, context_embedding.T).item()
context_answer_similarity = np.dot(context_embedding, answer_embedding.T).item()
confidence = (query_context_similarity + context_answer_similarity) / 2.0 # Equal weights
return confidence
# --- Response Generation ---
def generate_response(query, context):
prompt = f"""Your task is to analyze the given Context and take the answer for the Question and provide a clear relevant answer in plain English.
**Guidelines:**
- JUST PROVIDE ONLY THE ANSWER.
- Provide a elaborate, factual answer based strictly on the Context.
- Avoid generating Python code, solutions, or any irrelevant information.
Context: {context}
Question: {query}
Answer:"""
response = generator(prompt, max_new_tokens=500, num_return_sequences=1)[0]['generated_text']
return response
# --- Guardrail ---
def is_sensitive_query(query):
sensitive_keywords = ["personal", "address", "phone", "ssn", "credit card", "bank account", "password", "social security", "private", "location"]
query_lower = query.lower()
if any(keyword in query_lower for keyword in sensitive_keywords):
return True
classifier = hf_pipeline("text-classification", model="unitary/toxic-bert")
result = classifier(query)[0]
if result["label"] == "toxic" and result["score"] > 0.7:
return True
return False
# --- Process Query ---
def process_query(query):
if is_sensitive_query(query):
return "I cannot answer questions that involve sensitive or personal information, or that are toxic in nature."
retrieved_chunks = adaptive_retrieval(query, index, chunks)
reranked_chunks = rerank(query, retrieved_chunks)
final_chunks = reranked_chunks[:3]
merged_result = merge_chunks(final_chunks)
answer = generate_response(query, merged_result)
if "</think>" in answer:
answer = answer.split("</think>", 1)[-1].strip()
confidence = calculate_confidence(query, merged_result, answer)
full_response = f"{answer}\n\nConfidence: {confidence:.2f}"
return full_response
# --- Initialization ---
def initialize_app():
global index, chunks, generator
pdf_urls = ["https://www.latentview.com/wp-content/uploads/2023/07/LatentView-Annual-Report-2022-23.pdf",
"https://www.latentview.com/wp-content/uploads/2024/08/LatentView-Annual-Report-2023-24.pdf"]
document_ids = ["LatentView-Annual-Report-2022-23", "LatentView-Annual-Report-2023-24"]
if os.path.exists('vector_cache.pkl'):
with open('vector_cache.pkl', 'rb') as f:
index, chunks = pickle.load(f)
else:
extracted_data = process_pdfs_parallel(pdf_urls, document_ids)
index, chunks = create_embeddings_and_index(extracted_data)
with open('vector_cache.pkl', 'wb') as f:
pickle.dump((index, chunks), f)
accelerator = Accelerator()
accelerator.free_memory()
MODEL_NAME = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto")
model = accelerator.prepare(model)
generator = pipeline("text-generation", model=model, tokenizer=tokenizer)
return "Initialization Complete!"
# --- Gradio Interface ---
def gradio_interface(query):
return process_query(query)
iface = gr.Interface(
fn=gradio_interface,
inputs=gr.Textbox(lines=2, placeholder="Enter your question here..."),
outputs=gr.Textbox(lines=5, placeholder="Answer will appear here..."),
title="Annual Report Q&A Chatbot (LatentView Analytics)",
description="Ask questions about the company's annual reports. (2022-23 & 2023-24)",
examples=[
["What is the total revenue from operations for 2023-24?"],
["Who is the CEO of Latentview Analytics? "],
["Summarize the key financial highlights in 2023-24"],
["What were the total expenses for 2022-23?"],
],
cache_examples=False,
)
with gr.Blocks() as demo:
gr.Markdown("# Annual Report Q&A Chatbot (LatentView Analytics)")
init_button = gr.Button("Initialize")
init_output = gr.Textbox(label="Initialization Status")
init_button.click(
fn=initialize_app,
inputs=[],
outputs=init_output,
)
iface.render()
demo.launch()