|
import gradio as gr |
|
import os |
|
import re |
|
import PyPDF2 |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
|
|
|
|
|
|
class PDFAnalyzer: |
|
def __init__(self): |
|
self.text_chunks = [] |
|
self.embeddings = None |
|
self.active_doc = None |
|
self.model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
def process_pdf(self, filepath): |
|
"""Handle PDF file processing pipeline""" |
|
try: |
|
if not filepath.lower().endswith('.pdf'): |
|
return False, "Invalid file format - PDF required" |
|
|
|
text = self._extract_text(filepath) |
|
self.text_chunks = self._chunk_text(text) |
|
self.embeddings = self.model.encode(self.text_chunks) |
|
self.active_doc = os.path.basename(filepath) |
|
return True, f"Loaded {self.active_doc} ({len(self.text_chunks)} chunks)" |
|
|
|
except PyPDF2.errors.PdfReadError: |
|
return False, "Error reading PDF - file may be corrupted" |
|
except Exception as e: |
|
return False, f"Processing error: {str(e)}" |
|
|
|
def _extract_text(self, filepath): |
|
"""Extract text from PDF document""" |
|
text = "" |
|
with open(filepath, 'rb') as f: |
|
reader = PyPDF2.PdfReader(f) |
|
for page in reader.pages: |
|
text += page.extract_text() or "" |
|
return text |
|
|
|
def _chunk_text(self, text, chunk_size=400): |
|
"""Create semantic chunks from document text""" |
|
sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text) |
|
chunks = [] |
|
current_chunk = [] |
|
count = 0 |
|
|
|
for sentence in sentences: |
|
current_chunk.append(sentence) |
|
count += len(sentence.split()) |
|
if count >= chunk_size: |
|
chunks.append(' '.join(current_chunk)) |
|
current_chunk = [] |
|
count = 0 |
|
|
|
if current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
return chunks |
|
|
|
def query_document(self, question): |
|
"""Find relevant document section for a question""" |
|
if not self.active_doc: |
|
return "No active document. Please upload a PDF first." |
|
|
|
question_embed = self.model.encode(question) |
|
similarities = cosine_similarity([question_embed], self.embeddings)[0] |
|
best_match = np.argmax(similarities) |
|
return self.text_chunks[best_match] |
|
|
|
|
|
|
|
|
|
|
|
def create_interface(): |
|
analyzer = PDFAnalyzer() |
|
chat_history = [] |
|
|
|
def process_file(file): |
|
success, message = analyzer.process_pdf(file.name) |
|
status = f"β
{message}" if success else f"β {message}" |
|
return status |
|
|
|
def respond(message, history): |
|
nonlocal analyzer |
|
|
|
|
|
if analyzer.active_doc: |
|
response = analyzer.query_document(message) |
|
history.append((message, response)) |
|
return history, history |
|
|
|
|
|
history.append((message, "Please upload a PDF document first")) |
|
return history, history |
|
|
|
def clear_chat(): |
|
nonlocal analyzer |
|
analyzer = PDFAnalyzer() |
|
return [], [], "β No document loaded" |
|
|
|
with gr.Blocks(title="PDF Analysis Assistant", theme=gr.themes.Soft()) as app: |
|
gr.Markdown("# π PDF Analysis Assistant") |
|
gr.Markdown("Upload a PDF document and ask questions about its content") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
file_input = gr.File(label="Upload PDF", type="filepath") |
|
status_output = gr.Markdown("β No document loaded") |
|
upload_btn = gr.Button("Process Document") |
|
|
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot(label="Conversation") |
|
msg = gr.Textbox(label="Your Question") |
|
clear_btn = gr.Button("Clear Chat") |
|
|
|
|
|
upload_btn.click( |
|
process_file, |
|
inputs=file_input, |
|
outputs=status_output |
|
) |
|
|
|
msg.submit( |
|
respond, |
|
inputs=[msg, chatbot], |
|
outputs=[chatbot, chatbot] |
|
) |
|
|
|
clear_btn.click( |
|
clear_chat, |
|
outputs=[chatbot, file_input, status_output] |
|
) |
|
|
|
return app |
|
|
|
if __name__ == "__main__": |
|
app = create_interface() |
|
app.launch() |