Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import io | |
| import json | |
| import streamlit as st | |
| from PIL import Image, ImageDraw, ImageFont | |
| from google import generativeai as genai # Renamed for clarity from 'google.genai' | |
| from google.generativeai.types import GenerationConfig, HarmCategory, HarmBlockThreshold | |
| from pdf2image import convert_from_bytes | |
| from typing import List, Dict, Any, Tuple, Optional | |
| # --- Constants --- | |
| MODEL_NAME = "gemini-2.5-pro-exp-03-25" # Using Gemini 1.5 Flash | |
| # Unified prompt to get bounding boxes and text content in one go | |
| UNIFIED_DETECTION_EXTRACTION_PROMPT = """\ | |
| Analyze this document image. Identify text regions and extract the text from each region. | |
| Follow these rules for text regions: | |
| 1. GROUP RELATED CONTENT: | |
| - Full tables as SINGLE regions (including headers and all rows). | |
| - Paragraphs as SINGLE rectangular blocks (multiple lines as one box). | |
| - Keep text columns intact. | |
| - Treat list items as single region if visually grouped. | |
| 2. TEXT REGION REQUIREMENTS: | |
| - Boundaries must tightly wrap text content. | |
| - Include approximately 2% padding around text clusters, but ensure the box stays within image bounds. | |
| - Exclude isolated decorative elements unless they contain text. | |
| - Merge adjacent text fragments with β€1% spacing into a single region. | |
| 3. COORDINATE FORMAT: | |
| - Normalized coordinates (0.0 to 1.0) with 3 decimal places. | |
| - Format: [xmin, ymin, xmax, ymax]. | |
| - Order: Top-to-bottom, then left-to-right. | |
| 4. SPECIAL CASES: | |
| - Table cells should NOT have individual boxes; the entire table is one box. | |
| - Page headers/footers as separate regions. | |
| - Text wrapped around images as distinct regions. | |
| OUTPUT FORMAT: | |
| Return a JSON list of objects. Each object MUST have: | |
| - "box": A list of 4 normalized coordinates [xmin, ymin, xmax, ymax]. | |
| - "text": The extracted text string from that box. Ensure all text within the box is captured. | |
| Example of a valid JSON response: | |
| [ | |
| {"box": [0.070, 0.120, 0.930, 0.280], "text": "Document Title and Header Information"}, | |
| {"box": [0.120, 0.350, 0.880, 0.650], "text": "Table content including headers and all rows..."}, | |
| {"box": [0.100, 0.700, 0.900, 0.850], "text": "This is the first paragraph of text, potentially spanning multiple lines but grouped as one logical block."}, | |
| {"box": [0.100, 0.880, 0.900, 0.950], "text": "Another paragraph or distinct text block."} | |
| ] | |
| ONLY RETURN THE VALID JSON LIST. No explanations, apologies, or other text outside the JSON structure. | |
| If no text regions are found, return an empty JSON list: []. | |
| """ | |
| # --- Helper Functions --- | |
| def get_gemini_api_key() -> Optional[str]: | |
| """Gets the Gemini API key from Streamlit secrets, environment variables, or user input.""" | |
| if "GOOGLE_API_KEY" in st.secrets: | |
| return st.secrets["GOOGLE_API_KEY"] | |
| api_key_env = os.getenv("GOOGLE_API_KEY") | |
| if api_key_env: | |
| return api_key_env | |
| st.sidebar.warning("Google API Key not found in secrets or environment variables.") | |
| api_key_input = st.sidebar.text_input( | |
| "Enter your Google API Key:", type="password", key="api_key_input" | |
| ) | |
| if api_key_input: | |
| st.session_state.GOOGLE_API_KEY = api_key_input | |
| return api_key_input | |
| return None | |
| def parse_gemini_response(response_text: str) -> List[Dict[str, Any]]: | |
| """ | |
| Parses the Gemini response to extract a list of dicts with "box" and "text". | |
| Tries to load as JSON, falls back to regex if needed (though ideally not). | |
| """ | |
| try: | |
| # Attempt to find JSON block if model wraps it in markdown | |
| match = re.search(r"```json\s*([\s\S]*?)\s*```", response_text, re.DOTALL) | |
| if match: | |
| json_str = match.group(1) | |
| else: | |
| # Assume the response is plain JSON or a Python list string | |
| json_str = response_text.strip() | |
| if not (json_str.startswith('[') and json_str.endswith(']')): | |
| # If it doesn't look like a list, try to find the list within the text | |
| list_match = re.search(r'(\[[\s\S]*\])', json_str) | |
| if list_match: | |
| json_str = list_match.group(1) | |
| else: | |
| st.warning(f"Response doesn't appear to be a list: {response_text[:200]}...") | |
| return [] | |
| # Try direct JSON parsing | |
| parsed_data = json.loads(json_str) | |
| if isinstance(parsed_data, list): | |
| # Validate structure | |
| validated_data = [] | |
| for item in parsed_data: | |
| if isinstance(item, dict) and "box" in item and "text" in item and \ | |
| isinstance(item["box"], list) and len(item["box"]) == 4: | |
| validated_data.append(item) | |
| else: | |
| st.warning(f"Skipping invalid item in JSON: {item}") | |
| return validated_data | |
| else: | |
| st.warning(f"Parsed JSON is not a list: {type(parsed_data)}") | |
| return [] | |
| except json.JSONDecodeError as e: | |
| st.warning(f"JSONDecodeError: {e}. Raw response: {response_text[:500]}") | |
| # Fallback to regex if JSON parsing fails (less robust) | |
| # This regex assumes the "box" and "text" structure as described in the prompt | |
| # It's a simplified regex for demonstration and might need adjustment for complex texts. | |
| regions = [] | |
| # Regex to find "box": [coords], "text": "content" | |
| # This is a simplified regex and might struggle with escaped quotes in text | |
| pattern = r'\{\s*"box"\s*:\s*\[([\d\.]+),\s*([\d\.]+),\s*([\d\.]+),\s*([\d\.]+)\]\s*,\s*"text"\s*:\s*"(.*?)"\s*\}' | |
| matches = re.findall(pattern, response_text, re.DOTALL) | |
| for match in matches: | |
| try: | |
| box = [float(c) for c in match[:4]] | |
| text = match[4].replace('\\n', '\n').replace('\\"', '"') # Handle basic escapes | |
| regions.append({"box": box, "text": text}) | |
| except ValueError: | |
| st.warning(f"Could not parse box coordinates from regex match: {match}") | |
| if not regions and response_text.strip() and response_text.strip() != "[]": | |
| st.error(f"Failed to parse response using JSON and regex. Raw: {response_text[:200]}") | |
| return regions | |
| except Exception as e: | |
| st.error(f"Error parsing Gemini response: {e}. Raw response: {response_text[:500]}") | |
| return [] | |
| def draw_bounding_boxes(image: Image.Image, regions: List[Dict[str, Any]]) -> Image.Image: | |
| """Draws numbered bounding boxes on the image.""" | |
| if not regions: | |
| return image | |
| draw = ImageDraw.Draw(image) | |
| width, height = image.size | |
| try: | |
| # Try to load a common font, fall back to default | |
| font = ImageFont.truetype("arial.ttf", int(height * 0.02)) # Adjust size as needed | |
| except IOError: | |
| font = ImageFont.load_default() | |
| for i, region_data in enumerate(regions): | |
| try: | |
| box = region_data.get("box") | |
| if not (isinstance(box, list) and len(box) == 4): | |
| st.warning(f"Skipping invalid box data for region {i+1}: {box}") | |
| continue | |
| # Convert normalized coordinates to pixel values, clamping to image bounds | |
| xmin = max(0.0, min(1.0, box[0])) * width | |
| ymin = max(0.0, min(1.0, box[1])) * height | |
| xmax = max(0.0, min(1.0, box[2])) * width | |
| ymax = max(0.0, min(1.0, box[3])) * height | |
| if xmin >= xmax or ymin >= ymax: | |
| st.warning(f"Skipping invalid box dimensions for region {i+1}: {[xmin, ymin, xmax, ymax]}") | |
| continue | |
| draw.rectangle([xmin, ymin, xmax, ymax], outline="#00FF00", width=3) | |
| label = str(i + 1) | |
| # Position label inside the box, handle potential small boxes | |
| text_x = xmin + 5 | |
| text_y = ymin + 5 | |
| # For very small boxes, drawing text might be an issue, but let's try | |
| text_bbox = draw.textbbox((text_x, text_y), label, font=font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| # Simple check to ensure text fits somewhat | |
| if text_x + text_width > xmax - 5: | |
| text_x = max(xmin, xmax - text_width - 5) | |
| if text_y + text_height > ymax - 5: | |
| text_y = max(ymin, ymax - text_height - 5) | |
| draw.text((text_x, text_y), label, fill="red", font=font) | |
| except Exception as e: | |
| st.error(f"Error drawing box for region {i+1}: {str(e)}") | |
| return image | |
| def process_image_with_gemini( | |
| client: genai.GenerativeModel, image_bytes: bytes | |
| ) -> Tuple[List[Dict[str, Any]], str]: | |
| """Sends image to Gemini and gets bounding boxes and text.""" | |
| try: | |
| response = client.generate_content( | |
| contents=[ | |
| UNIFIED_DETECTION_EXTRACTION_PROMPT, | |
| {"mime_type": "image/png", "data": image_bytes} | |
| ], | |
| generation_config=GenerationConfig( | |
| temperature=0.1, # Lower temperature for more deterministic output | |
| # max_output_tokens=8192 # Max for flash 1.5 | |
| ), | |
| safety_settings={ # Adjust as needed | |
| HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, | |
| } | |
| ) | |
| if response.parts: | |
| raw_text_response = response.text # .text often combines parts | |
| else: # Fallback if .text is empty but candidates exist | |
| raw_text_response = response.candidates[0].content.parts[0].text if response.candidates and response.candidates[0].content.parts else "" | |
| if not raw_text_response: | |
| st.error("Received empty response from Gemini API.") | |
| return [], "" | |
| regions = parse_gemini_response(raw_text_response) | |
| return regions, raw_text_response | |
| except Exception as e: | |
| st.error(f"Error calling Gemini API: {str(e)}") | |
| if hasattr(e, 'response') and e.response: # For google.api_core.exceptions | |
| st.error(f"API Response Error Details: {e.response}") | |
| return [], f"API Error: {str(e)}" | |
| # --- Streamlit UI --- | |
| st.set_page_config(layout="wide") | |
| st.title("π PDF Text Detection & Extraction with Gemini") | |
| # --- API Key Configuration --- | |
| st.sidebar.header("π API Configuration") | |
| api_key = get_gemini_api_key() | |
| gemini_client = None | |
| if api_key: | |
| try: | |
| genai.configure(api_key=api_key) | |
| gemini_client = genai.GenerativeModel(MODEL_NAME) | |
| st.sidebar.success("Gemini API Key configured.") | |
| except Exception as e: | |
| st.sidebar.error(f"Failed to configure Gemini: {e}") | |
| api_key = None # Invalidate API key if configuration fails | |
| else: | |
| st.sidebar.info("Please provide your Google API Key to use the application.") | |
| st.info("Please enter your Google API Key in the sidebar to proceed.") | |
| # --- File Upload and Processing --- | |
| st.header("π€ Upload PDF") | |
| uploaded_file = st.file_uploader("Choose a PDF file", type=["pdf"]) | |
| dpi_options = [150, 200, 300, 400] | |
| selected_dpi = st.select_slider( | |
| "Select DPI for PDF to Image conversion:", | |
| options=dpi_options, | |
| value=200 # Default DPI | |
| ) | |
| if uploaded_file and st.button("π Analyze PDF", disabled=not api_key or not gemini_client): | |
| if not api_key or not gemini_client: | |
| st.error("API Key not configured. Please enter it in the sidebar.") | |
| else: | |
| with st.spinner(f"Processing PDF ({uploaded_file.name})... This may take a moment."): | |
| try: | |
| pdf_bytes = uploaded_file.read() | |
| pil_images: List[Image.Image] = convert_from_bytes(pdf_bytes, dpi=selected_dpi) | |
| if not pil_images: | |
| st.warning("Could not extract any images from the PDF.") | |
| else: | |
| st.success(f"PDF converted to {len(pil_images)} image(s). Analyzing with Gemini...") | |
| page_results = [] | |
| for i, p_image in enumerate(pil_images): | |
| page_progess = st.progress(0, text=f"Processing Page {i+1}/{len(pil_images)}...") | |
| # Convert PIL image to bytes for API | |
| img_byte_arr = io.BytesIO() | |
| p_image.save(img_byte_arr, format='PNG') | |
| img_bytes = img_byte_arr.getvalue() | |
| page_progess.progress(30, text=f"Page {i+1}: Sending to Gemini...") | |
| regions, raw_response = process_image_with_gemini(gemini_client, img_bytes) | |
| page_progess.progress(90, text=f"Page {i+1}: Received response.") | |
| page_results.append({ | |
| "original_image": p_image, | |
| "regions": regions, | |
| "raw_response": raw_response | |
| }) | |
| page_progess.progress(100, text=f"Page {i+1}: Done.") | |
| page_progess.empty() | |
| # Store results in session state to avoid re-processing on minor UI interaction | |
| # Though with the button click, this is less critical for single runs. | |
| st.session_state.page_results = page_results | |
| except Exception as e: | |
| st.error(f"An error occurred during PDF processing: {str(e)}") | |
| if "page_results" in st.session_state: | |
| del st.session_state.page_results # Clear partial results | |
| # --- Display Results --- | |
| if "page_results" in st.session_state and st.session_state.page_results: | |
| st.header("π Analysis Results") | |
| results = st.session_state.page_results | |
| tab_titles = [f"Page {i+1}" for i in range(len(results))] | |
| tabs = st.tabs(tab_titles) | |
| for idx, (tab, page_data) in enumerate(zip(tabs, results)): | |
| with tab: | |
| st.subheader(f"Page {idx + 1} Analysis") | |
| original_image = page_data["original_image"] | |
| regions = page_data["regions"] | |
| raw_response = page_data["raw_response"] | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.image(original_image, caption="Original Image", use_container_width=True) | |
| with col2: | |
| if regions: | |
| annotated_image = draw_bounding_boxes(original_image.copy(), regions) | |
| st.image(annotated_image, caption=f"Detected {len(regions)} Text Regions", use_container_width=True) | |
| else: | |
| st.image(original_image.copy(), caption="No text regions detected by model", use_container_width=True) | |
| st.warning("No text regions were successfully parsed from the model's response for this page.") | |
| if regions: | |
| st.subheader("π Extracted Texts & Regions") | |
| for i, region_data in enumerate(regions): | |
| box = region_data.get("box", "N/A") | |
| text_content = region_data.get("text", "No text extracted for this region.") | |
| with st.expander(f"Region {i+1} (Box: {box})", expanded= (i<3) ): # Expand first 3 by default | |
| st.markdown(f"**Coordinates:** `{box}`") | |
| st.markdown("**Text:**") | |
| st.text_area(f"text_area_{idx}_{i}", text_content, height=max(100, int(len(text_content)*0.5)), disabled=True, label_visibility="collapsed") | |
| else: | |
| st.info("No text regions to display for this page.") | |
| with st.expander("π Debug: Raw Gemini API Response", expanded=False): | |
| st.code(raw_response, language='json') | |
| elif uploaded_file and not api_key: | |
| st.warning("Please enter your API key in the sidebar and click 'Analyze PDF' again.") | |
| st.markdown("---") | |
| st.markdown("Developed with Gemini & Streamlit.") | |