import gradio as gr import os import json import base64 import tempfile from pathlib import Path EXTRACTORS = ['pdf_plumber', 'py_pdf', 'docling', 'extractous', 'pypdfium2', 'pymupdf', 'pymupdf_llm'] def add_page_breaks(text, page_offsets): """Add page break markers to text based on page_offsets.""" if not page_offsets: return text result = [] last_offset = 0 for offset in page_offsets: result.append(text[last_offset:offset]) result.append("\n<---page-break--->\n") last_offset = offset # Add any remaining text if last_offset < len(text): result.append(text[last_offset:]) return "".join(result) class ExtractorComparer: def __init__(self): self.json_files = [] self.current_index = 0 self.current_data = None self.temp_pdf_path = None self.current_pdf_bytes = None def load_files(self, directory_path): """Load all JSON files from the specified directory.""" self.json_files = [] try: for filename in os.listdir(directory_path): if filename.endswith('.json') or filename.endswith('.jsonl'): self.json_files.append(os.path.join(directory_path, filename)) if self.json_files: self.current_index = 0 file_progress, annotation_status = self.get_progress_info() return file_progress, annotation_status else: return "No JSON files found", "No files loaded" except Exception as e: return f"Error loading files: {str(e)}", "Error" def load_current_file(self): """Load the current JSON file data.""" if not self.json_files: return None, "N/A", "N/A" try: with open(self.json_files[self.current_index], 'r') as f: self.current_data = json.load(f) # Extract PDF bytes from pdf_plumber pdf_bytes = None debug_info = "" if 'pdf_plumber' in self.current_data: plumber_data = self.current_data['pdf_plumber'] if 'media' in plumber_data and plumber_data['media'] and isinstance(plumber_data['media'], list) and len(plumber_data['media']) > 0: media_item = plumber_data['media'][0] if 'media_bytes' in media_item and media_item['media_bytes']: try: pdf_bytes = base64.b64decode(media_item['media_bytes']) self.current_pdf_bytes = pdf_bytes except Exception as e: debug_info = f"Error decoding media_bytes: {str(e)}" # Create temporary file for the PDF if we have bytes if pdf_bytes: if self.temp_pdf_path: try: os.remove(self.temp_pdf_path) except: pass with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: temp_file.write(pdf_bytes) self.temp_pdf_path = temp_file.name # Convert to base64 for passing to the frontend base64_pdf = base64.b64encode(pdf_bytes).decode('utf-8') # Generate progress information file_progress, annotation_status = self.get_progress_info() return base64_pdf, file_progress, annotation_status else: file_progress, annotation_status = self.get_progress_info() return None, file_progress, annotation_status except Exception as e: return None, "Error loading file", "No annotation" def get_progress_info(self): """Generate progress information and annotation status.""" if not self.json_files: return "No files loaded", "No annotation" current_file = self.json_files[self.current_index] filename = Path(current_file).name # File progress information file_progress = f"File {self.current_index + 1} of {len(self.json_files)}: {filename}" # Check if this file has been annotated with a best extractor best_extractor_file = os.path.splitext(current_file)[0] + "_best.txt" annotation_status = "Not annotated" if os.path.exists(best_extractor_file): try: with open(best_extractor_file, 'r') as f: best_extractor = f.read().strip() annotation_status = f"Best extractor: {best_extractor}" except: pass # Count total annotated files annotated_count = 0 for json_file in self.json_files: best_file = os.path.splitext(json_file)[0] + "_best.txt" if os.path.exists(best_file): annotated_count += 1 file_progress = f"{file_progress} (Annotated: {annotated_count}/{len(self.json_files)})" return file_progress, annotation_status def get_extractor_text(self, extractor_name): """Get text with page breaks for the specified extractor.""" if not self.current_data or extractor_name not in self.current_data: return "" extractor_data = self.current_data[extractor_name] if 'text' not in extractor_data: return f"No text found for {extractor_name}" text = extractor_data.get('text', '') # Get page offsets page_offsets = [] if 'media' in extractor_data and extractor_data['media'] and len(extractor_data['media']) > 0: media_item = extractor_data['media'][0] if 'metadata' in media_item and 'pdf_metadata' in media_item['metadata'] and 'page_offsets' in media_item['metadata']['pdf_metadata']: page_offsets = media_item['metadata']['pdf_metadata']['page_offsets'] return add_page_breaks(text, page_offsets) def next_pdf(self): """Load the next PDF in the list.""" if not self.json_files: return None, "N/A", "N/A" self.current_index = (self.current_index + 1) % len(self.json_files) return self.load_current_file() def prev_pdf(self): """Load the previous PDF in the list.""" if not self.json_files: return None, "N/A", "N/A" self.current_index = (self.current_index - 1) % len(self.json_files) return self.load_current_file() def set_best_extractor(self, extractor_name): """Record that this extractor is the best for the current file.""" if not self.json_files or not self.current_data: return "N/A", "N/A" try: # Create a record about the best extractor result_file = os.path.splitext(self.json_files[self.current_index])[0] + "_best.txt" with open(result_file, 'w') as f: f.write(extractor_name) # Get updated progress info after annotation file_progress, annotation_status = self.get_progress_info() return file_progress, annotation_status except Exception as e: return "Error saving annotation", "No annotation" def create_interface(): comparer = ExtractorComparer() # Custom CSS for basic font in text areas custom_css = """ .extraction-text textarea { font-family: Arial, Helvetica, sans-serif !important; font-size: 14px !important; line-height: 1.5 !important; } """ with gr.Blocks(title="PDF Extractor Comparer", theme="soft", css=custom_css) as demo: gr.Markdown("## PDF Extractor Comparer") with gr.Row(): directory_input = gr.Textbox( label="Path to JSON Directory", placeholder="e.g., /path/to/your/json/files" ) load_button = gr.Button("Load PDFs", variant="primary") # Main layout: PDF viewer on left, status and controls on right with gr.Row(): # Left column: PDF viewer with gr.Column(scale=3): # PDF viewer using iframe with JavaScript handling pdf_viewer_html = gr.HTML( label="PDF Document", value='''
Click "Load PDFs" to start viewing documents.
''' ) # Hidden component to store the Base64 PDF data pdf_data_hidden = gr.Textbox(visible=False, elem_id="pdf_base64_data") # Right column: Progress and controls with gr.Column(scale=1): # Progress information file_progress_output = gr.Textbox(label="File Progress", interactive=False) annotation_status_output = gr.Textbox(label="Annotation Status", interactive=False) # Navigation with gr.Row(): prev_button = gr.Button("⬅️ Previous", elem_id="prev_button") next_button = gr.Button("Next ➡️", elem_id="next_button") # Best extractor selection gr.Markdown("### Select Best Extractor") extractor_buttons = [] for extractor in EXTRACTORS: button = gr.Button(extractor, variant="secondary") extractor_buttons.append(button) button.click( comparer.set_best_extractor, inputs=[gr.Textbox(value=extractor, visible=False)], outputs=[file_progress_output, annotation_status_output] ) # Extractors section below the PDF gr.Markdown("### Extractor Comparison") # Extractor dropdowns with gr.Row(): extractor1_dropdown = gr.Dropdown( choices=EXTRACTORS, label="Extractor 1", value=EXTRACTORS[0] if EXTRACTORS else None ) extractor2_dropdown = gr.Dropdown( choices=EXTRACTORS, label="Extractor 2", value=EXTRACTORS[1] if len(EXTRACTORS) > 1 else EXTRACTORS[0] if EXTRACTORS else None ) # Extractor text outputs with applied class for styling with gr.Row(): extractor1_text = gr.Textbox( label="Extractor 1 Output", lines=15, elem_classes=["extraction-text"] ) extractor2_text = gr.Textbox( label="Extractor 2 Output", lines=15, elem_classes=["extraction-text"] ) # Event handlers load_button.click( comparer.load_files, inputs=[directory_input], outputs=[file_progress_output, annotation_status_output] ).then( comparer.load_current_file, outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] ).then( comparer.get_extractor_text, inputs=[extractor1_dropdown], outputs=[extractor1_text] ).then( comparer.get_extractor_text, inputs=[extractor2_dropdown], outputs=[extractor2_text] ) prev_button.click( comparer.prev_pdf, outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] ).then( comparer.get_extractor_text, inputs=[extractor1_dropdown], outputs=[extractor1_text] ).then( comparer.get_extractor_text, inputs=[extractor2_dropdown], outputs=[extractor2_text] ) next_button.click( comparer.next_pdf, outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] ).then( comparer.get_extractor_text, inputs=[extractor1_dropdown], outputs=[extractor1_text] ).then( comparer.get_extractor_text, inputs=[extractor2_dropdown], outputs=[extractor2_text] ) extractor1_dropdown.change( comparer.get_extractor_text, inputs=[extractor1_dropdown], outputs=[extractor1_text] ) extractor2_dropdown.change( comparer.get_extractor_text, inputs=[extractor2_dropdown], outputs=[extractor2_text] ) # JavaScript for PDF handling demo.load( fn=None, js=""" function() { console.log("Setting up PDF viewer"); // Store the current blob URL var pdfBlobUrl = null; // Function to display PDF from base64 data function displayPdfFromBase64(base64Data) { try { if (!base64Data || base64Data.length < 100) { console.log("No valid PDF data received"); document.getElementById('pdf-fallback').style.display = 'flex'; document.getElementById('pdf-object').style.display = 'none'; return; } console.log("Displaying PDF from base64 data"); // Clean up previous blob URL if (pdfBlobUrl) { URL.revokeObjectURL(pdfBlobUrl); } // Convert base64 to binary const binary = atob(base64Data); const bytes = new Uint8Array(binary.length); for (let i = 0; i < binary.length; i++) { bytes[i] = binary.charCodeAt(i); } // Create blob and URL const blob = new Blob([bytes], {type: 'application/pdf'}); pdfBlobUrl = URL.createObjectURL(blob); // Display PDF in the object element const pdfObject = document.getElementById('pdf-object'); const fallback = document.getElementById('pdf-fallback'); if (pdfObject && fallback) { pdfObject.data = pdfBlobUrl; pdfObject.style.display = 'block'; fallback.style.display = 'none'; console.log("PDF displayed successfully"); } else { console.error("PDF viewer elements not found"); } } catch (error) { console.error("Error displaying PDF:", error); const fallback = document.getElementById('pdf-fallback'); if (fallback) { fallback.innerHTML = '
Error displaying PDF
'; fallback.style.display = 'flex'; } } } // Check for PDF data repeatedly function checkForPdfData() { const dataElement = document.getElementById('pdf_base64_data'); if (!dataElement) { console.log("PDF data element not found, will retry"); setTimeout(checkForPdfData, 1000); return; } const textarea = dataElement.querySelector('textarea'); if (!textarea) { console.log("Textarea not found, will retry"); setTimeout(checkForPdfData, 1000); return; } // Display initial data if available if (textarea.value && textarea.value.length > 100) { displayPdfFromBase64(textarea.value); } // Set up polling to check for changes setInterval(function() { if (textarea.value && textarea.value.length > 100) { displayPdfFromBase64(textarea.value); } }, 2000); } // Start checking for PDF data setTimeout(checkForPdfData, 1000); // Add keyboard shortcuts document.addEventListener('keydown', function(event) { if (event.target.tagName === 'INPUT' || event.target.tagName === 'TEXTAREA') { return; } var buttonId = null; if (event.key === 'ArrowLeft') buttonId = 'prev_button'; else if (event.key === 'ArrowRight') buttonId = 'next_button'; if (buttonId) { var button = document.getElementById(buttonId); if (button) { event.preventDefault(); button.click(); } } }); } """ ) return demo if __name__ == "__main__": demo = create_interface() demo.launch()