Spaces:
Running
Running
import gradio as gr | |
import requests | |
import io | |
from pypdf import PdfReader | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import numpy as np | |
import faiss | |
from sentence_transformers import SentenceTransformer, CrossEncoder | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, pipeline as hf_pipeline | |
from accelerate import Accelerator | |
from langchain.text_splitter import NLTKTextSplitter | |
from rank_bm25 import BM25Okapi | |
import os | |
import pickle | |
import nltk | |
nltk.download('punkt_tab') | |
# --- Global Variables for Caching --- | |
index = None | |
chunks = None | |
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
generator = None | |
# --- PDF Processing and Embedding --- | |
def download_pdf(url): | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
return response.content | |
def custom_chunking(text, delimiter="\n\n"): | |
"""Splits text based on a specified delimiter.""" | |
return text.split(delimiter) | |
def extract_text_from_pdf(pdf_bytes, document_id): | |
"""Extracts text from a PDF, page by page, and then chunks each page.""" | |
pdf_file = io.BytesIO(pdf_bytes) | |
reader = PdfReader(pdf_file) | |
nltk_splitter = NLTKTextSplitter(chunk_size=500) | |
extracted_data = [] | |
for page_num, page in enumerate(reader.pages): | |
page_text = page.extract_text() or "" | |
clean_text = " ".join(page_text.split()) | |
if clean_text: | |
words = clean_text.split() | |
section_header = " ".join(words[:20]) if words else "No Section Name Found" | |
custom_chunks = custom_chunking(clean_text) | |
for custom_chunk in custom_chunks: | |
clean_custom_chunk = " ".join(custom_chunk.split()) | |
if clean_custom_chunk: | |
nltk_chunks = nltk_splitter.split_text(clean_custom_chunk) | |
for nltk_chunk in nltk_chunks: | |
clean_nltk_chunk = " ".join(nltk_chunk.split()) | |
if clean_nltk_chunk: | |
extracted_data.append({ | |
"document_id": document_id, | |
"section_header": section_header, | |
"text": clean_nltk_chunk | |
}) | |
return extracted_data | |
def process_single_pdf(url, doc_id): | |
"""Processes a single PDF.""" | |
pdf_bytes = download_pdf(url) | |
return extract_text_from_pdf(pdf_bytes, doc_id) | |
def process_pdfs_parallel(pdf_urls, document_ids): | |
"""Processes multiple PDFs in parallel.""" | |
all_data = [] | |
with ThreadPoolExecutor() as pdf_executor: | |
pdf_futures = [pdf_executor.submit(process_single_pdf, url, doc_id) for url, doc_id in zip(pdf_urls, document_ids)] | |
for future in as_completed(pdf_futures): | |
all_data.extend(future.result()) | |
return all_data | |
def create_embeddings_and_index(data): | |
"""Create Embeddings""" | |
texts = [item['text'] for item in data] | |
embeddings = embedding_model.encode(texts) | |
dimension = embeddings.shape[1] | |
index = faiss.IndexFlatL2(dimension) | |
index.add(embeddings) | |
return index, data | |
# --- Retrieval Functions --- | |
def bm25_retrieval(query, documents, top_k=10): | |
tokenized_docs = [doc['text'].split() for doc in documents] | |
bm25 = BM25Okapi(tokenized_docs) | |
doc_scores = bm25.get_scores(query.split()) | |
top_indices = np.argsort(doc_scores)[::-1][:top_k] | |
return [documents[i] for i in top_indices] | |
def adaptive_retrieval(query, index, chunks, top_k=10): | |
query_embedding = embedding_model.encode([query], convert_to_numpy=True) | |
_, indices = index.search(query_embedding, top_k) | |
vector_results = [chunks[i] for i in indices[0]] | |
bm25_results = bm25_retrieval(query, chunks, top_k) | |
combined_results = vector_results + bm25_results | |
unique_results = [] | |
seen_texts = set() | |
for result in combined_results: | |
if result['text'] not in seen_texts: | |
unique_results.append(result) | |
seen_texts.add(result['text']) | |
return unique_results | |
def rerank(query, results, keyword_weight=0.3, cross_encoder_weight=0.7): | |
"""Combines keyword-based and cross-encoder reranking.""" | |
# Keyword-based scoring | |
keywords = query.lower().split() | |
def score_chunk_keywords(chunk): | |
text = chunk['text'].lower() | |
return sum(1 for keyword in keywords if keyword in text) | |
keyword_scores = [score_chunk_keywords(chunk) for chunk in results] | |
# Cross-encoder scoring | |
rerank_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') | |
query_results = [[query, f"Document: {result['document_id']}, Section: {result['section_header']}, Text: {result['text']}"] for result in results] | |
cross_encoder_scores = rerank_model.predict(query_results) | |
# Combine scores | |
combined_scores = [(keyword_scores[i] * keyword_weight) + (cross_encoder_scores[i] * cross_encoder_weight) for i in range(len(results))] | |
# Rank and select top 3 | |
ranked_results = [results[i] for i in np.argsort(combined_scores)[::-1]] | |
return ranked_results[:3] | |
def merge_chunks(retrieved_chunks): | |
"""Merges chunks based on their original order, including metadata.""" | |
merged_text = " ".join([ | |
f"Document: {chunk['document_id']}, Section: {chunk['section_header']}, Text: {chunk['text']}" | |
for chunk in retrieved_chunks | |
]) | |
return merged_text | |
# --- Confidence Calculation --- | |
def calculate_confidence(query, context, answer): | |
"""Calculates confidence score based on question-context and context-answer similarity.""" | |
query_embedding = embedding_model.encode([query], convert_to_numpy=True) | |
context_embedding = embedding_model.encode([context], convert_to_numpy=True) | |
answer_embedding = embedding_model.encode([answer], convert_to_numpy=True) | |
query_context_similarity = np.dot(query_embedding, context_embedding.T).item() | |
context_answer_similarity = np.dot(context_embedding, answer_embedding.T).item() | |
confidence = (query_context_similarity + context_answer_similarity) / 2.0 # Equal weights | |
return confidence | |
# --- Response Generation --- | |
def generate_response(query, context): | |
prompt = f"""Your task is to analyze the given Context and take the answer for the Question and provide a clear relevant answer in plain English. | |
**Guidelines:** | |
- JUST PROVIDE ONLY THE ANSWER. | |
- Provide a elaborate, factual answer based strictly on the Context. | |
- Avoid generating Python code, solutions, or any irrelevant information. | |
Context: {context} | |
Question: {query} | |
Answer:""" | |
response = generator(prompt, max_new_tokens=500, num_return_sequences=1)[0]['generated_text'] | |
return response | |
# --- Guardrail --- | |
def is_sensitive_query(query): | |
sensitive_keywords = ["personal", "address", "phone", "ssn", "credit card", "bank account", "password", "social security", "private", "location"] | |
query_lower = query.lower() | |
if any(keyword in query_lower for keyword in sensitive_keywords): | |
return True | |
classifier = hf_pipeline("text-classification", model="unitary/toxic-bert") | |
result = classifier(query)[0] | |
if result["label"] == "toxic" and result["score"] > 0.7: | |
return True | |
return False | |
# --- Process Query --- | |
def process_query(query): | |
if is_sensitive_query(query): | |
return "I cannot answer questions that involve sensitive or personal information, or that are toxic in nature." | |
retrieved_chunks = adaptive_retrieval(query, index, chunks) | |
reranked_chunks = rerank(query, retrieved_chunks) | |
final_chunks = reranked_chunks[:3] | |
merged_result = merge_chunks(final_chunks) | |
answer = generate_response(query, merged_result) | |
if "</think>" in answer: | |
answer = answer.split("</think>", 1)[-1].strip() | |
confidence = calculate_confidence(query, merged_result, answer) | |
full_response = f"{answer}\n\nConfidence: {confidence:.2f}" | |
return full_response | |
# --- Initialization --- | |
def initialize_app(): | |
global index, chunks, generator | |
pdf_urls = ["https://www.latentview.com/wp-content/uploads/2023/07/LatentView-Annual-Report-2022-23.pdf", | |
"https://www.latentview.com/wp-content/uploads/2024/08/LatentView-Annual-Report-2023-24.pdf"] | |
document_ids = ["LatentView-Annual-Report-2022-23", "LatentView-Annual-Report-2023-24"] | |
if os.path.exists('vector_cache.pkl'): | |
with open('vector_cache.pkl', 'rb') as f: | |
index, chunks = pickle.load(f) | |
else: | |
extracted_data = process_pdfs_parallel(pdf_urls, document_ids) | |
index, chunks = create_embeddings_and_index(extracted_data) | |
with open('vector_cache.pkl', 'wb') as f: | |
pickle.dump((index, chunks), f) | |
accelerator = Accelerator() | |
accelerator.free_memory() | |
MODEL_NAME = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto") | |
model = accelerator.prepare(model) | |
generator = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
return "Initialization Complete!" | |
# --- Gradio Interface --- | |
def gradio_interface(query): | |
return process_query(query) | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=gr.Textbox(lines=2, placeholder="Enter your question here..."), | |
outputs=gr.Textbox(lines=5, placeholder="Answer will appear here..."), | |
title="Annual Report Q&A Chatbot (LatentView Analytics)", | |
description="Ask questions about the company's annual reports. (2022-23 & 2023-24)", | |
examples=[ | |
["What is the total revenue from operations for 2023-24?"], | |
["Who is the CEO of Latentview Analytics? "], | |
["Summarize the key financial highlights in 2023-24"], | |
["What were the total expenses for 2022-23?"], | |
], | |
cache_examples=False, | |
) | |
with gr.Blocks() as demo: | |
gr.Markdown("# Annual Report Q&A Chatbot (LatentView Analytics)") | |
init_button = gr.Button("Initialize") | |
init_output = gr.Textbox(label="Initialization Status") | |
init_button.click( | |
fn=initialize_app, | |
inputs=[], | |
outputs=init_output, | |
) | |
iface.render() | |
demo.launch() |