Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import asyncio | |
from langchain_core.prompts import PromptTemplate | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
import google.generativeai as genai | |
from langchain.chains.question_answering import load_qa_chain | |
# Initialize a dictionary to store chat history and context per session | |
session_contexts = {} | |
async def initialize(file_path, question, session_id): | |
genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
model = genai.GenerativeModel('gemini-pro') | |
model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3) | |
# Refined prompt template to encourage precise and concise answers | |
prompt_template = """You are a helpful assistant. Use the context provided below to answer the question precisely and concisely. | |
If the answer is not contained in the context, respond with "answer not available in context". | |
Context: | |
{context} | |
Conversation History: | |
{history} | |
Question: | |
{question} | |
Answer: | |
""" | |
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "history", "question"]) | |
# Get or initialize the context and history for the current session | |
context_history = session_contexts.get(session_id, {"context": "", "history": ""}) | |
combined_context = context_history["context"] | |
conversation_history = context_history["history"] | |
if os.path.exists(file_path): | |
pdf_loader = PyPDFLoader(file_path) | |
pages = pdf_loader.load_and_split() | |
# Extract content from each page and store along with page number | |
page_contexts = [page.page_content for i, page in enumerate(pages)] | |
context = "\n".join(page_contexts[:30]) # Using the first 30 pages for context | |
# Load the question-answering chain | |
stuff_chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) | |
# Combine previous context and conversation history with the new context | |
full_context = combined_context + "\n" + context | |
full_history = conversation_history + f"\nQ: {question}\nA: {answer}" | |
# Get the answer from the model | |
stuff_answer = await stuff_chain.ainvoke({"input_documents": pages, "question": question, "context": full_context, "history": full_history}) | |
answer = stuff_answer.get('output_text', '').strip() | |
# Identify key sentences or phrases | |
key_phrases = answer.split(". ") # Split answer into sentences for more precise matching | |
# Score each page based on the presence of key phrases | |
page_scores = [0] * len(pages) | |
for i, page in enumerate(pages): | |
for phrase in key_phrases: | |
if phrase.lower() in page.page_content.lower(): | |
page_scores[i] += 1 | |
# Determine the top pages based on highest scores | |
top_pages_with_scores = sorted(enumerate(page_scores), key=lambda x: x[1], reverse=True) | |
top_pages = [i + 1 for i, score in top_pages_with_scores if score > 0][:2] # Get top 2 pages | |
# Generate links for each top page | |
file_name = os.path.basename(file_path) | |
page_links = [f"[Page {p}](file://{os.path.abspath(file_path)})" for p in top_pages] | |
page_links_str = ', '.join(page_links) | |
if top_pages: | |
source_str = f"Top relevant page(s): {page_links_str}" | |
else: | |
source_str = "Top relevant page(s): Not found in specific page" | |
# Create a clickable link for the document | |
source_link = f"[Document: {file_name}](file://{os.path.abspath(file_path)})" | |
# Update session context with the new question and answer | |
session_contexts[session_id] = { | |
"context": full_context, | |
"history": full_history + f"\nQ: {question}\nA: {answer}" | |
} | |
return f"Answer: {answer}\n{source_str}\n{source_link}" | |
else: | |
return "Error: Unable to process the document. Please ensure the PDF file is valid." | |
# Define Gradio Interface for QA and Chat History | |
input_file = gr.File(label="Upload PDF File") | |
input_question = gr.Textbox(label="Ask about the document") | |
output_text = gr.Textbox(label="Answer and Top Pages", lines=10, max_lines=10) | |
def get_chat_history(session_id): | |
if session_id in session_contexts: | |
return session_contexts[session_id]["history"] | |
else: | |
return "No history available for this session." | |
async def pdf_qa(file, question, session_id): | |
if file is None: | |
return "Error: No file uploaded. Please upload a PDF document." | |
answer = await initialize(file.name, question, session_id) | |
return answer | |
# Create Gradio Interfaces | |
qa_interface = gr.Interface( | |
fn=lambda file, question, session_id: asyncio.run(pdf_qa(file, question, session_id)), | |
inputs=[input_file, input_question, gr.Textbox(label="Session ID", placeholder="Enter a session ID to track your conversation")], | |
outputs=output_text, | |
title="PDF Question Answering System", | |
description="Upload a PDF file and ask questions about the content. Provide a session ID to maintain conversation context." | |
) | |
history_interface = gr.Interface( | |
fn=lambda session_id: get_chat_history(session_id), | |
inputs=gr.Textbox(label="Session ID", placeholder="Enter a session ID to view chat history"), | |
outputs=gr.Textbox(label="Chat History", lines=20, max_lines=20), | |
title="Chat History", | |
description="View the history of interactions for a specific session." | |
) | |
# Launch both interfaces | |
qa_interface.launch(share=True) | |
history_interface.launch(share=True) | |