import requests import io import re import numpy as np import faiss import torch from pypdf import PdfReader from rank_bm25 import BM25Okapi from sentence_transformers import SentenceTransformer from accelerate import Accelerator from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from bert_score import score import gradio as gr # --- Preload Data --- DEFAULT_PDF_URLS = [ "https://www.latentview.com/wp-content/uploads/2023/07/LatentView-Annual-Report-2022-23.pdf", "https://www.latentview.com/wp-content/uploads/2024/08/LatentView-Annual-Report-2023-24.pdf" ] embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") def preload_data(pdf_urls): def download_pdf(url): response = requests.get(url, stream=True) response.raise_for_status() return response.content def extract_text_from_pdf(pdf_bytes): pdf_file = io.BytesIO(pdf_bytes) reader = PdfReader(pdf_file) text = "" for page in reader.pages: text += page.extract_text() or "" return text def preprocess_text(text): financial_symbols = r"\$\€\₹\£\¥\₩\₽\₮\₦\₲" text = re.sub(fr"[^\w\s{financial_symbols}.,%/₹$€¥£-]", "", text) text = re.sub(r'\s+', ' ', text).strip() return text def chunk_text(text, chunk_size=1024, overlap_size=100): chunks = [] start = 0 text_length = len(text) while start < text_length: end = min(start + chunk_size, text_length) if end < text_length and text[end].isalnum(): last_space = text.rfind(" ", start, end) if last_space != -1: end = last_space chunk = text[start:end].strip() if chunk: chunks.append(chunk) if end == text_length: break overlap_start = max(0, end - overlap_size) if overlap_start < end: last_overlap_space = text.rfind(" ", 0, overlap_start) if last_overlap_space != -1 and last_overlap_space > start: start = last_overlap_space + 1 else: start = end else: start = end return chunks all_data = [] for url in pdf_urls: pdf_bytes = download_pdf(url) text = extract_text_from_pdf(pdf_bytes) preprocessed_text = preprocess_text(text) all_data.append(preprocessed_text) chunks = [] for data in all_data: chunks.extend(chunk_text(data)) embeddings = embedding_model.encode(chunks) index = faiss.IndexFlatL2(embeddings.shape[1]) index.add(embeddings) return index, chunks index, chunks = preload_data(DEFAULT_PDF_URLS) accelerator = Accelerator() MODEL_NAME = "microsoft/phi-2" tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto", trust_remote_code=True, cache_dir="./my_models") model = accelerator.prepare(model) generator = pipeline("text-generation", model=model, tokenizer=tokenizer) def bm25_retrieval(query, documents, top_k=3): tokenized_docs = [doc.split() for doc in documents] bm25 = BM25Okapi(tokenized_docs) return [documents[i] for i in np.argsort(bm25.get_scores(query.split()))[::-1][:top_k]] def adaptive_retrieval(query, index, chunks, top_k=3, bm25_weight=0.5): query_embedding = embedding_model.encode([query], convert_to_numpy=True, dtype=np.float16) _, indices = index.search(query_embedding, top_k) vector_results = [chunks[i] for i in indices[0]] bm25_results = bm25_retrieval(query, chunks, top_k) return list(set(vector_results + bm25_results)) def rerank(query, results): query_embedding = embedding_model.encode([query], convert_to_numpy=True) result_embeddings = embedding_model.encode(results, convert_to_numpy=True) similarities = np.dot(result_embeddings, query_embedding.T).flatten() return [results[i] for i in np.argsort(similarities)[::-1]], similarities def merge_chunks(retrieved_chunks, overlap_size=100): merged_chunks = [] buffer = retrieved_chunks[0] if retrieved_chunks else "" for i in range(1, len(retrieved_chunks)): chunk = retrieved_chunks[i] overlap_start = buffer[-overlap_size:] overlap_index = chunk.find(overlap_start) if overlap_index != -1: buffer += chunk[overlap_index + overlap_size:] else: merged_chunks.append(buffer) buffer = chunk if buffer: merged_chunks.append(buffer) return merged_chunks def calculate_confidence(query, answer): P, R, F1 = score([answer], [query], lang="en", verbose=False) return F1.item() def generate_response(query, context): prompt = f"""Your task is to analyze the given Context and answer the Question concisely in plain English. **Guidelines:** - Do NOT include tag, just provide the final answer only. - Provide a direct, factual answer based strictly on the Context. - Avoid generating Python code, solutions, or any irrelevant information. Context: {context} Question: {query} Answer: """ response = generator(prompt, max_new_tokens=150, num_return_sequences=1)[0]['generated_text'] answer = response.split("Answer:")[1].strip() return answer def process_query(query): retrieved_chunks = adaptive_retrieval(query, index, chunks) merged_chunks = merge_chunks(retrieved_chunks, 50) reranked_chunks, similarities = rerank(query, merged_chunks) context = " ".join(reranked_chunks[:3]) answer = generate_response(query, context) confidence = calculate_confidence(query, answer) full_response = f"{answer}\n\nConfidence: {confidence:.2f}" return full_response iface = gr.Interface( fn=process_query, inputs=gr.Textbox(placeholder="Enter your financial question"), outputs="text", title="Financial Document Q&A Chatbot", description="Ask questions about the preloaded financial documents." ) iface.launch() accelerator.free_memory()