import os import re import io import json import streamlit as st from PIL import Image, ImageDraw, ImageFont from google import generativeai as genai # Renamed for clarity from 'google.genai' from google.generativeai.types import GenerationConfig, HarmCategory, HarmBlockThreshold from pdf2image import convert_from_bytes from typing import List, Dict, Any, Tuple, Optional # --- Constants --- MODEL_NAME = "gemini-2.5-pro-exp-03-25" # Using Gemini 1.5 Flash # Unified prompt to get bounding boxes and text content in one go UNIFIED_DETECTION_EXTRACTION_PROMPT = """\ Analyze this document image. Identify text regions and extract the text from each region. Follow these rules for text regions: 1. GROUP RELATED CONTENT: - Full tables as SINGLE regions (including headers and all rows). - Paragraphs as SINGLE rectangular blocks (multiple lines as one box). - Keep text columns intact. - Treat list items as single region if visually grouped. 2. TEXT REGION REQUIREMENTS: - Boundaries must tightly wrap text content. - Include approximately 2% padding around text clusters, but ensure the box stays within image bounds. - Exclude isolated decorative elements unless they contain text. - Merge adjacent text fragments with ≤1% spacing into a single region. 3. COORDINATE FORMAT: - Normalized coordinates (0.0 to 1.0) with 3 decimal places. - Format: [xmin, ymin, xmax, ymax]. - Order: Top-to-bottom, then left-to-right. 4. SPECIAL CASES: - Table cells should NOT have individual boxes; the entire table is one box. - Page headers/footers as separate regions. - Text wrapped around images as distinct regions. OUTPUT FORMAT: Return a JSON list of objects. Each object MUST have: - "box": A list of 4 normalized coordinates [xmin, ymin, xmax, ymax]. - "text": The extracted text string from that box. Ensure all text within the box is captured. Example of a valid JSON response: [ {"box": [0.070, 0.120, 0.930, 0.280], "text": "Document Title and Header Information"}, {"box": [0.120, 0.350, 0.880, 0.650], "text": "Table content including headers and all rows..."}, {"box": [0.100, 0.700, 0.900, 0.850], "text": "This is the first paragraph of text, potentially spanning multiple lines but grouped as one logical block."}, {"box": [0.100, 0.880, 0.900, 0.950], "text": "Another paragraph or distinct text block."} ] ONLY RETURN THE VALID JSON LIST. No explanations, apologies, or other text outside the JSON structure. If no text regions are found, return an empty JSON list: []. """ # --- Helper Functions --- def get_gemini_api_key() -> Optional[str]: """Gets the Gemini API key from Streamlit secrets, environment variables, or user input.""" if "GOOGLE_API_KEY" in st.secrets: return st.secrets["GOOGLE_API_KEY"] api_key_env = os.getenv("GOOGLE_API_KEY") if api_key_env: return api_key_env st.sidebar.warning("Google API Key not found in secrets or environment variables.") api_key_input = st.sidebar.text_input( "Enter your Google API Key:", type="password", key="api_key_input" ) if api_key_input: st.session_state.GOOGLE_API_KEY = api_key_input return api_key_input return None def parse_gemini_response(response_text: str) -> List[Dict[str, Any]]: """ Parses the Gemini response to extract a list of dicts with "box" and "text". Tries to load as JSON, falls back to regex if needed (though ideally not). """ try: # Attempt to find JSON block if model wraps it in markdown match = re.search(r"```json\s*([\s\S]*?)\s*```", response_text, re.DOTALL) if match: json_str = match.group(1) else: # Assume the response is plain JSON or a Python list string json_str = response_text.strip() if not (json_str.startswith('[') and json_str.endswith(']')): # If it doesn't look like a list, try to find the list within the text list_match = re.search(r'(\[[\s\S]*\])', json_str) if list_match: json_str = list_match.group(1) else: st.warning(f"Response doesn't appear to be a list: {response_text[:200]}...") return [] # Try direct JSON parsing parsed_data = json.loads(json_str) if isinstance(parsed_data, list): # Validate structure validated_data = [] for item in parsed_data: if isinstance(item, dict) and "box" in item and "text" in item and \ isinstance(item["box"], list) and len(item["box"]) == 4: validated_data.append(item) else: st.warning(f"Skipping invalid item in JSON: {item}") return validated_data else: st.warning(f"Parsed JSON is not a list: {type(parsed_data)}") return [] except json.JSONDecodeError as e: st.warning(f"JSONDecodeError: {e}. Raw response: {response_text[:500]}") # Fallback to regex if JSON parsing fails (less robust) # This regex assumes the "box" and "text" structure as described in the prompt # It's a simplified regex for demonstration and might need adjustment for complex texts. regions = [] # Regex to find "box": [coords], "text": "content" # This is a simplified regex and might struggle with escaped quotes in text pattern = r'\{\s*"box"\s*:\s*\[([\d\.]+),\s*([\d\.]+),\s*([\d\.]+),\s*([\d\.]+)\]\s*,\s*"text"\s*:\s*"(.*?)"\s*\}' matches = re.findall(pattern, response_text, re.DOTALL) for match in matches: try: box = [float(c) for c in match[:4]] text = match[4].replace('\\n', '\n').replace('\\"', '"') # Handle basic escapes regions.append({"box": box, "text": text}) except ValueError: st.warning(f"Could not parse box coordinates from regex match: {match}") if not regions and response_text.strip() and response_text.strip() != "[]": st.error(f"Failed to parse response using JSON and regex. Raw: {response_text[:200]}") return regions except Exception as e: st.error(f"Error parsing Gemini response: {e}. Raw response: {response_text[:500]}") return [] def draw_bounding_boxes(image: Image.Image, regions: List[Dict[str, Any]]) -> Image.Image: """Draws numbered bounding boxes on the image.""" if not regions: return image draw = ImageDraw.Draw(image) width, height = image.size try: # Try to load a common font, fall back to default font = ImageFont.truetype("arial.ttf", int(height * 0.02)) # Adjust size as needed except IOError: font = ImageFont.load_default() for i, region_data in enumerate(regions): try: box = region_data.get("box") if not (isinstance(box, list) and len(box) == 4): st.warning(f"Skipping invalid box data for region {i+1}: {box}") continue # Convert normalized coordinates to pixel values, clamping to image bounds xmin = max(0.0, min(1.0, box[0])) * width ymin = max(0.0, min(1.0, box[1])) * height xmax = max(0.0, min(1.0, box[2])) * width ymax = max(0.0, min(1.0, box[3])) * height if xmin >= xmax or ymin >= ymax: st.warning(f"Skipping invalid box dimensions for region {i+1}: {[xmin, ymin, xmax, ymax]}") continue draw.rectangle([xmin, ymin, xmax, ymax], outline="#00FF00", width=3) label = str(i + 1) # Position label inside the box, handle potential small boxes text_x = xmin + 5 text_y = ymin + 5 # For very small boxes, drawing text might be an issue, but let's try text_bbox = draw.textbbox((text_x, text_y), label, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] # Simple check to ensure text fits somewhat if text_x + text_width > xmax - 5: text_x = max(xmin, xmax - text_width - 5) if text_y + text_height > ymax - 5: text_y = max(ymin, ymax - text_height - 5) draw.text((text_x, text_y), label, fill="red", font=font) except Exception as e: st.error(f"Error drawing box for region {i+1}: {str(e)}") return image def process_image_with_gemini( client: genai.GenerativeModel, image_bytes: bytes ) -> Tuple[List[Dict[str, Any]], str]: """Sends image to Gemini and gets bounding boxes and text.""" try: response = client.generate_content( contents=[ UNIFIED_DETECTION_EXTRACTION_PROMPT, {"mime_type": "image/png", "data": image_bytes} ], generation_config=GenerationConfig( temperature=0.1, # Lower temperature for more deterministic output # max_output_tokens=8192 # Max for flash 1.5 ), safety_settings={ # Adjust as needed HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, } ) if response.parts: raw_text_response = response.text # .text often combines parts else: # Fallback if .text is empty but candidates exist raw_text_response = response.candidates[0].content.parts[0].text if response.candidates and response.candidates[0].content.parts else "" if not raw_text_response: st.error("Received empty response from Gemini API.") return [], "" regions = parse_gemini_response(raw_text_response) return regions, raw_text_response except Exception as e: st.error(f"Error calling Gemini API: {str(e)}") if hasattr(e, 'response') and e.response: # For google.api_core.exceptions st.error(f"API Response Error Details: {e.response}") return [], f"API Error: {str(e)}" # --- Streamlit UI --- st.set_page_config(layout="wide") st.title("📄 PDF Text Detection & Extraction with Gemini") # --- API Key Configuration --- st.sidebar.header("🔑 API Configuration") api_key = get_gemini_api_key() gemini_client = None if api_key: try: genai.configure(api_key=api_key) gemini_client = genai.GenerativeModel(MODEL_NAME) st.sidebar.success("Gemini API Key configured.") except Exception as e: st.sidebar.error(f"Failed to configure Gemini: {e}") api_key = None # Invalidate API key if configuration fails else: st.sidebar.info("Please provide your Google API Key to use the application.") st.info("Please enter your Google API Key in the sidebar to proceed.") # --- File Upload and Processing --- st.header("📤 Upload PDF") uploaded_file = st.file_uploader("Choose a PDF file", type=["pdf"]) dpi_options = [150, 200, 300, 400] selected_dpi = st.select_slider( "Select DPI for PDF to Image conversion:", options=dpi_options, value=200 # Default DPI ) if uploaded_file and st.button("🚀 Analyze PDF", disabled=not api_key or not gemini_client): if not api_key or not gemini_client: st.error("API Key not configured. Please enter it in the sidebar.") else: with st.spinner(f"Processing PDF ({uploaded_file.name})... This may take a moment."): try: pdf_bytes = uploaded_file.read() pil_images: List[Image.Image] = convert_from_bytes(pdf_bytes, dpi=selected_dpi) if not pil_images: st.warning("Could not extract any images from the PDF.") else: st.success(f"PDF converted to {len(pil_images)} image(s). Analyzing with Gemini...") page_results = [] for i, p_image in enumerate(pil_images): page_progess = st.progress(0, text=f"Processing Page {i+1}/{len(pil_images)}...") # Convert PIL image to bytes for API img_byte_arr = io.BytesIO() p_image.save(img_byte_arr, format='PNG') img_bytes = img_byte_arr.getvalue() page_progess.progress(30, text=f"Page {i+1}: Sending to Gemini...") regions, raw_response = process_image_with_gemini(gemini_client, img_bytes) page_progess.progress(90, text=f"Page {i+1}: Received response.") page_results.append({ "original_image": p_image, "regions": regions, "raw_response": raw_response }) page_progess.progress(100, text=f"Page {i+1}: Done.") page_progess.empty() # Store results in session state to avoid re-processing on minor UI interaction # Though with the button click, this is less critical for single runs. st.session_state.page_results = page_results except Exception as e: st.error(f"An error occurred during PDF processing: {str(e)}") if "page_results" in st.session_state: del st.session_state.page_results # Clear partial results # --- Display Results --- if "page_results" in st.session_state and st.session_state.page_results: st.header("📊 Analysis Results") results = st.session_state.page_results tab_titles = [f"Page {i+1}" for i in range(len(results))] tabs = st.tabs(tab_titles) for idx, (tab, page_data) in enumerate(zip(tabs, results)): with tab: st.subheader(f"Page {idx + 1} Analysis") original_image = page_data["original_image"] regions = page_data["regions"] raw_response = page_data["raw_response"] col1, col2 = st.columns(2) with col1: st.image(original_image, caption="Original Image", use_container_width=True) with col2: if regions: annotated_image = draw_bounding_boxes(original_image.copy(), regions) st.image(annotated_image, caption=f"Detected {len(regions)} Text Regions", use_container_width=True) else: st.image(original_image.copy(), caption="No text regions detected by model", use_container_width=True) st.warning("No text regions were successfully parsed from the model's response for this page.") if regions: st.subheader("📝 Extracted Texts & Regions") for i, region_data in enumerate(regions): box = region_data.get("box", "N/A") text_content = region_data.get("text", "No text extracted for this region.") with st.expander(f"Region {i+1} (Box: {box})", expanded= (i<3) ): # Expand first 3 by default st.markdown(f"**Coordinates:** `{box}`") st.markdown("**Text:**") st.text_area(f"text_area_{idx}_{i}", text_content, height=max(100, int(len(text_content)*0.5)), disabled=True, label_visibility="collapsed") else: st.info("No text regions to display for this page.") with st.expander("🔍 Debug: Raw Gemini API Response", expanded=False): st.code(raw_response, language='json') elif uploaded_file and not api_key: st.warning("Please enter your API key in the sidebar and click 'Analyze PDF' again.") st.markdown("---") st.markdown("Developed with Gemini & Streamlit.")