import gradio as gr import requests import io from pypdf import PdfReader from concurrent.futures import ThreadPoolExecutor, as_completed import numpy as np import faiss from sentence_transformers import SentenceTransformer, CrossEncoder from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, pipeline as hf_pipeline from accelerate import Accelerator from langchain.text_splitter import NLTKTextSplitter from rank_bm25 import BM25Okapi import os import pickle import nltk nltk.download('punkt_tab') # --- Global Variables for Caching --- index = None chunks = None embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') generator = None # --- PDF Processing and Embedding --- def download_pdf(url): response = requests.get(url, stream=True) response.raise_for_status() return response.content def custom_chunking(text, delimiter="\n\n"): """Splits text based on a specified delimiter.""" return text.split(delimiter) def extract_text_from_pdf(pdf_bytes, document_id): """Extracts text from a PDF, page by page, and then chunks each page.""" pdf_file = io.BytesIO(pdf_bytes) reader = PdfReader(pdf_file) nltk_splitter = NLTKTextSplitter(chunk_size=500) extracted_data = [] for page_num, page in enumerate(reader.pages): page_text = page.extract_text() or "" clean_text = " ".join(page_text.split()) if clean_text: words = clean_text.split() section_header = " ".join(words[:20]) if words else "No Section Name Found" custom_chunks = custom_chunking(clean_text) for custom_chunk in custom_chunks: clean_custom_chunk = " ".join(custom_chunk.split()) if clean_custom_chunk: nltk_chunks = nltk_splitter.split_text(clean_custom_chunk) for nltk_chunk in nltk_chunks: clean_nltk_chunk = " ".join(nltk_chunk.split()) if clean_nltk_chunk: extracted_data.append({ "document_id": document_id, "section_header": section_header, "text": clean_nltk_chunk }) return extracted_data def process_single_pdf(url, doc_id): """Processes a single PDF.""" pdf_bytes = download_pdf(url) return extract_text_from_pdf(pdf_bytes, doc_id) def process_pdfs_parallel(pdf_urls, document_ids): """Processes multiple PDFs in parallel.""" all_data = [] with ThreadPoolExecutor() as pdf_executor: pdf_futures = [pdf_executor.submit(process_single_pdf, url, doc_id) for url, doc_id in zip(pdf_urls, document_ids)] for future in as_completed(pdf_futures): all_data.extend(future.result()) return all_data def create_embeddings_and_index(data): """Create Embeddings""" texts = [item['text'] for item in data] embeddings = embedding_model.encode(texts) dimension = embeddings.shape[1] index = faiss.IndexFlatL2(dimension) index.add(embeddings) return index, data # --- Retrieval Functions --- def bm25_retrieval(query, documents, top_k=10): tokenized_docs = [doc['text'].split() for doc in documents] bm25 = BM25Okapi(tokenized_docs) doc_scores = bm25.get_scores(query.split()) top_indices = np.argsort(doc_scores)[::-1][:top_k] return [documents[i] for i in top_indices] def adaptive_retrieval(query, index, chunks, top_k=10): query_embedding = embedding_model.encode([query], convert_to_numpy=True) _, indices = index.search(query_embedding, top_k) vector_results = [chunks[i] for i in indices[0]] bm25_results = bm25_retrieval(query, chunks, top_k) combined_results = vector_results + bm25_results unique_results = [] seen_texts = set() for result in combined_results: if result['text'] not in seen_texts: unique_results.append(result) seen_texts.add(result['text']) return unique_results def rerank(query, results, keyword_weight=0.3, cross_encoder_weight=0.7): """Combines keyword-based and cross-encoder reranking.""" # Keyword-based scoring keywords = query.lower().split() def score_chunk_keywords(chunk): text = chunk['text'].lower() return sum(1 for keyword in keywords if keyword in text) keyword_scores = [score_chunk_keywords(chunk) for chunk in results] # Cross-encoder scoring rerank_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') query_results = [[query, f"Document: {result['document_id']}, Section: {result['section_header']}, Text: {result['text']}"] for result in results] cross_encoder_scores = rerank_model.predict(query_results) # Combine scores combined_scores = [(keyword_scores[i] * keyword_weight) + (cross_encoder_scores[i] * cross_encoder_weight) for i in range(len(results))] # Rank and select top 3 ranked_results = [results[i] for i in np.argsort(combined_scores)[::-1]] return ranked_results[:3] def merge_chunks(retrieved_chunks): """Merges chunks based on their original order, including metadata.""" merged_text = " ".join([ f"Document: {chunk['document_id']}, Section: {chunk['section_header']}, Text: {chunk['text']}" for chunk in retrieved_chunks ]) return merged_text # --- Confidence Calculation --- def calculate_confidence(query, context, answer): """Calculates confidence score based on question-context and context-answer similarity.""" query_embedding = embedding_model.encode([query], convert_to_numpy=True) context_embedding = embedding_model.encode([context], convert_to_numpy=True) answer_embedding = embedding_model.encode([answer], convert_to_numpy=True) query_context_similarity = np.dot(query_embedding, context_embedding.T).item() context_answer_similarity = np.dot(context_embedding, answer_embedding.T).item() confidence = (query_context_similarity + context_answer_similarity) / 2.0 # Equal weights return confidence # --- Response Generation --- def generate_response(query, context): prompt = f"""Your task is to analyze the given Context and take the answer for the Question and provide a clear relevant answer in plain English. **Guidelines:** - JUST PROVIDE ONLY THE ANSWER. - Provide a elaborate, factual answer based strictly on the Context. - Avoid generating Python code, solutions, or any irrelevant information. Context: {context} Question: {query} Answer:""" response = generator(prompt, max_new_tokens=500, num_return_sequences=1)[0]['generated_text'] return response # --- Guardrail --- def is_sensitive_query(query): sensitive_keywords = ["personal", "address", "phone", "ssn", "credit card", "bank account", "password", "social security", "private", "location"] query_lower = query.lower() if any(keyword in query_lower for keyword in sensitive_keywords): return True classifier = hf_pipeline("text-classification", model="unitary/toxic-bert") result = classifier(query)[0] if result["label"] == "toxic" and result["score"] > 0.7: return True return False # --- Process Query --- def process_query(query): if is_sensitive_query(query): return "I cannot answer questions that involve sensitive or personal information, or that are toxic in nature." retrieved_chunks = adaptive_retrieval(query, index, chunks) reranked_chunks = rerank(query, retrieved_chunks) final_chunks = reranked_chunks[:3] merged_result = merge_chunks(final_chunks) answer = generate_response(query, merged_result) if "" in answer: answer = answer.split("", 1)[-1].strip() confidence = calculate_confidence(query, merged_result, answer) full_response = f"{answer}\n\nConfidence: {confidence:.2f}" return full_response # --- Initialization --- def initialize_app(): global index, chunks, generator pdf_urls = ["https://www.latentview.com/wp-content/uploads/2023/07/LatentView-Annual-Report-2022-23.pdf", "https://www.latentview.com/wp-content/uploads/2024/08/LatentView-Annual-Report-2023-24.pdf"] document_ids = ["LatentView-Annual-Report-2022-23", "LatentView-Annual-Report-2023-24"] if os.path.exists('vector_cache.pkl'): with open('vector_cache.pkl', 'rb') as f: index, chunks = pickle.load(f) else: extracted_data = process_pdfs_parallel(pdf_urls, document_ids) index, chunks = create_embeddings_and_index(extracted_data) with open('vector_cache.pkl', 'wb') as f: pickle.dump((index, chunks), f) accelerator = Accelerator() accelerator.free_memory() MODEL_NAME = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto") model = accelerator.prepare(model) generator = pipeline("text-generation", model=model, tokenizer=tokenizer) return "Initialization Complete!" # --- Gradio Interface --- def gradio_interface(query): return process_query(query) iface = gr.Interface( fn=gradio_interface, inputs=gr.Textbox(lines=2, placeholder="Enter your question here..."), outputs=gr.Textbox(lines=5, placeholder="Answer will appear here..."), title="Annual Report Q&A Chatbot (LatentView Analytics)", description="Ask questions about the company's annual reports. (2022-23 & 2023-24)", examples=[ ["What is the total revenue from operations for 2023-24?"], ["Who is the CEO of Latentview Analytics? "], ["Summarize the key financial highlights in 2023-24"], ["What were the total expenses for 2022-23?"], ], cache_examples=False, ) with gr.Blocks() as demo: gr.Markdown("# Annual Report Q&A Chatbot (LatentView Analytics)") init_button = gr.Button("Initialize") init_output = gr.Textbox(label="Initialization Status") init_button.click( fn=initialize_app, inputs=[], outputs=init_output, ) iface.render() demo.launch()