gemini25pdf / src /streamlit_app.py
Sebbe33's picture
Update src/streamlit_app.py
e0d120b verified
import os
import re
import io
import json
import streamlit as st
from PIL import Image, ImageDraw, ImageFont
from google import generativeai as genai # Renamed for clarity from 'google.genai'
from google.generativeai.types import GenerationConfig, HarmCategory, HarmBlockThreshold
from pdf2image import convert_from_bytes
from typing import List, Dict, Any, Tuple, Optional
# --- Constants ---
MODEL_NAME = "gemini-2.5-pro-exp-03-25" # Using Gemini 1.5 Flash
# Unified prompt to get bounding boxes and text content in one go
UNIFIED_DETECTION_EXTRACTION_PROMPT = """\
Analyze this document image. Identify text regions and extract the text from each region.
Follow these rules for text regions:
1. GROUP RELATED CONTENT:
- Full tables as SINGLE regions (including headers and all rows).
- Paragraphs as SINGLE rectangular blocks (multiple lines as one box).
- Keep text columns intact.
- Treat list items as single region if visually grouped.
2. TEXT REGION REQUIREMENTS:
- Boundaries must tightly wrap text content.
- Include approximately 2% padding around text clusters, but ensure the box stays within image bounds.
- Exclude isolated decorative elements unless they contain text.
- Merge adjacent text fragments with ≀1% spacing into a single region.
3. COORDINATE FORMAT:
- Normalized coordinates (0.0 to 1.0) with 3 decimal places.
- Format: [xmin, ymin, xmax, ymax].
- Order: Top-to-bottom, then left-to-right.
4. SPECIAL CASES:
- Table cells should NOT have individual boxes; the entire table is one box.
- Page headers/footers as separate regions.
- Text wrapped around images as distinct regions.
OUTPUT FORMAT:
Return a JSON list of objects. Each object MUST have:
- "box": A list of 4 normalized coordinates [xmin, ymin, xmax, ymax].
- "text": The extracted text string from that box. Ensure all text within the box is captured.
Example of a valid JSON response:
[
{"box": [0.070, 0.120, 0.930, 0.280], "text": "Document Title and Header Information"},
{"box": [0.120, 0.350, 0.880, 0.650], "text": "Table content including headers and all rows..."},
{"box": [0.100, 0.700, 0.900, 0.850], "text": "This is the first paragraph of text, potentially spanning multiple lines but grouped as one logical block."},
{"box": [0.100, 0.880, 0.900, 0.950], "text": "Another paragraph or distinct text block."}
]
ONLY RETURN THE VALID JSON LIST. No explanations, apologies, or other text outside the JSON structure.
If no text regions are found, return an empty JSON list: [].
"""
# --- Helper Functions ---
def get_gemini_api_key() -> Optional[str]:
"""Gets the Gemini API key from Streamlit secrets, environment variables, or user input."""
if "GOOGLE_API_KEY" in st.secrets:
return st.secrets["GOOGLE_API_KEY"]
api_key_env = os.getenv("GOOGLE_API_KEY")
if api_key_env:
return api_key_env
st.sidebar.warning("Google API Key not found in secrets or environment variables.")
api_key_input = st.sidebar.text_input(
"Enter your Google API Key:", type="password", key="api_key_input"
)
if api_key_input:
st.session_state.GOOGLE_API_KEY = api_key_input
return api_key_input
return None
def parse_gemini_response(response_text: str) -> List[Dict[str, Any]]:
"""
Parses the Gemini response to extract a list of dicts with "box" and "text".
Tries to load as JSON, falls back to regex if needed (though ideally not).
"""
try:
# Attempt to find JSON block if model wraps it in markdown
match = re.search(r"```json\s*([\s\S]*?)\s*```", response_text, re.DOTALL)
if match:
json_str = match.group(1)
else:
# Assume the response is plain JSON or a Python list string
json_str = response_text.strip()
if not (json_str.startswith('[') and json_str.endswith(']')):
# If it doesn't look like a list, try to find the list within the text
list_match = re.search(r'(\[[\s\S]*\])', json_str)
if list_match:
json_str = list_match.group(1)
else:
st.warning(f"Response doesn't appear to be a list: {response_text[:200]}...")
return []
# Try direct JSON parsing
parsed_data = json.loads(json_str)
if isinstance(parsed_data, list):
# Validate structure
validated_data = []
for item in parsed_data:
if isinstance(item, dict) and "box" in item and "text" in item and \
isinstance(item["box"], list) and len(item["box"]) == 4:
validated_data.append(item)
else:
st.warning(f"Skipping invalid item in JSON: {item}")
return validated_data
else:
st.warning(f"Parsed JSON is not a list: {type(parsed_data)}")
return []
except json.JSONDecodeError as e:
st.warning(f"JSONDecodeError: {e}. Raw response: {response_text[:500]}")
# Fallback to regex if JSON parsing fails (less robust)
# This regex assumes the "box" and "text" structure as described in the prompt
# It's a simplified regex for demonstration and might need adjustment for complex texts.
regions = []
# Regex to find "box": [coords], "text": "content"
# This is a simplified regex and might struggle with escaped quotes in text
pattern = r'\{\s*"box"\s*:\s*\[([\d\.]+),\s*([\d\.]+),\s*([\d\.]+),\s*([\d\.]+)\]\s*,\s*"text"\s*:\s*"(.*?)"\s*\}'
matches = re.findall(pattern, response_text, re.DOTALL)
for match in matches:
try:
box = [float(c) for c in match[:4]]
text = match[4].replace('\\n', '\n').replace('\\"', '"') # Handle basic escapes
regions.append({"box": box, "text": text})
except ValueError:
st.warning(f"Could not parse box coordinates from regex match: {match}")
if not regions and response_text.strip() and response_text.strip() != "[]":
st.error(f"Failed to parse response using JSON and regex. Raw: {response_text[:200]}")
return regions
except Exception as e:
st.error(f"Error parsing Gemini response: {e}. Raw response: {response_text[:500]}")
return []
def draw_bounding_boxes(image: Image.Image, regions: List[Dict[str, Any]]) -> Image.Image:
"""Draws numbered bounding boxes on the image."""
if not regions:
return image
draw = ImageDraw.Draw(image)
width, height = image.size
try:
# Try to load a common font, fall back to default
font = ImageFont.truetype("arial.ttf", int(height * 0.02)) # Adjust size as needed
except IOError:
font = ImageFont.load_default()
for i, region_data in enumerate(regions):
try:
box = region_data.get("box")
if not (isinstance(box, list) and len(box) == 4):
st.warning(f"Skipping invalid box data for region {i+1}: {box}")
continue
# Convert normalized coordinates to pixel values, clamping to image bounds
xmin = max(0.0, min(1.0, box[0])) * width
ymin = max(0.0, min(1.0, box[1])) * height
xmax = max(0.0, min(1.0, box[2])) * width
ymax = max(0.0, min(1.0, box[3])) * height
if xmin >= xmax or ymin >= ymax:
st.warning(f"Skipping invalid box dimensions for region {i+1}: {[xmin, ymin, xmax, ymax]}")
continue
draw.rectangle([xmin, ymin, xmax, ymax], outline="#00FF00", width=3)
label = str(i + 1)
# Position label inside the box, handle potential small boxes
text_x = xmin + 5
text_y = ymin + 5
# For very small boxes, drawing text might be an issue, but let's try
text_bbox = draw.textbbox((text_x, text_y), label, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# Simple check to ensure text fits somewhat
if text_x + text_width > xmax - 5:
text_x = max(xmin, xmax - text_width - 5)
if text_y + text_height > ymax - 5:
text_y = max(ymin, ymax - text_height - 5)
draw.text((text_x, text_y), label, fill="red", font=font)
except Exception as e:
st.error(f"Error drawing box for region {i+1}: {str(e)}")
return image
def process_image_with_gemini(
client: genai.GenerativeModel, image_bytes: bytes
) -> Tuple[List[Dict[str, Any]], str]:
"""Sends image to Gemini and gets bounding boxes and text."""
try:
response = client.generate_content(
contents=[
UNIFIED_DETECTION_EXTRACTION_PROMPT,
{"mime_type": "image/png", "data": image_bytes}
],
generation_config=GenerationConfig(
temperature=0.1, # Lower temperature for more deterministic output
# max_output_tokens=8192 # Max for flash 1.5
),
safety_settings={ # Adjust as needed
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
}
)
if response.parts:
raw_text_response = response.text # .text often combines parts
else: # Fallback if .text is empty but candidates exist
raw_text_response = response.candidates[0].content.parts[0].text if response.candidates and response.candidates[0].content.parts else ""
if not raw_text_response:
st.error("Received empty response from Gemini API.")
return [], ""
regions = parse_gemini_response(raw_text_response)
return regions, raw_text_response
except Exception as e:
st.error(f"Error calling Gemini API: {str(e)}")
if hasattr(e, 'response') and e.response: # For google.api_core.exceptions
st.error(f"API Response Error Details: {e.response}")
return [], f"API Error: {str(e)}"
# --- Streamlit UI ---
st.set_page_config(layout="wide")
st.title("πŸ“„ PDF Text Detection & Extraction with Gemini")
# --- API Key Configuration ---
st.sidebar.header("πŸ”‘ API Configuration")
api_key = get_gemini_api_key()
gemini_client = None
if api_key:
try:
genai.configure(api_key=api_key)
gemini_client = genai.GenerativeModel(MODEL_NAME)
st.sidebar.success("Gemini API Key configured.")
except Exception as e:
st.sidebar.error(f"Failed to configure Gemini: {e}")
api_key = None # Invalidate API key if configuration fails
else:
st.sidebar.info("Please provide your Google API Key to use the application.")
st.info("Please enter your Google API Key in the sidebar to proceed.")
# --- File Upload and Processing ---
st.header("πŸ“€ Upload PDF")
uploaded_file = st.file_uploader("Choose a PDF file", type=["pdf"])
dpi_options = [150, 200, 300, 400]
selected_dpi = st.select_slider(
"Select DPI for PDF to Image conversion:",
options=dpi_options,
value=200 # Default DPI
)
if uploaded_file and st.button("πŸš€ Analyze PDF", disabled=not api_key or not gemini_client):
if not api_key or not gemini_client:
st.error("API Key not configured. Please enter it in the sidebar.")
else:
with st.spinner(f"Processing PDF ({uploaded_file.name})... This may take a moment."):
try:
pdf_bytes = uploaded_file.read()
pil_images: List[Image.Image] = convert_from_bytes(pdf_bytes, dpi=selected_dpi)
if not pil_images:
st.warning("Could not extract any images from the PDF.")
else:
st.success(f"PDF converted to {len(pil_images)} image(s). Analyzing with Gemini...")
page_results = []
for i, p_image in enumerate(pil_images):
page_progess = st.progress(0, text=f"Processing Page {i+1}/{len(pil_images)}...")
# Convert PIL image to bytes for API
img_byte_arr = io.BytesIO()
p_image.save(img_byte_arr, format='PNG')
img_bytes = img_byte_arr.getvalue()
page_progess.progress(30, text=f"Page {i+1}: Sending to Gemini...")
regions, raw_response = process_image_with_gemini(gemini_client, img_bytes)
page_progess.progress(90, text=f"Page {i+1}: Received response.")
page_results.append({
"original_image": p_image,
"regions": regions,
"raw_response": raw_response
})
page_progess.progress(100, text=f"Page {i+1}: Done.")
page_progess.empty()
# Store results in session state to avoid re-processing on minor UI interaction
# Though with the button click, this is less critical for single runs.
st.session_state.page_results = page_results
except Exception as e:
st.error(f"An error occurred during PDF processing: {str(e)}")
if "page_results" in st.session_state:
del st.session_state.page_results # Clear partial results
# --- Display Results ---
if "page_results" in st.session_state and st.session_state.page_results:
st.header("πŸ“Š Analysis Results")
results = st.session_state.page_results
tab_titles = [f"Page {i+1}" for i in range(len(results))]
tabs = st.tabs(tab_titles)
for idx, (tab, page_data) in enumerate(zip(tabs, results)):
with tab:
st.subheader(f"Page {idx + 1} Analysis")
original_image = page_data["original_image"]
regions = page_data["regions"]
raw_response = page_data["raw_response"]
col1, col2 = st.columns(2)
with col1:
st.image(original_image, caption="Original Image", use_container_width=True)
with col2:
if regions:
annotated_image = draw_bounding_boxes(original_image.copy(), regions)
st.image(annotated_image, caption=f"Detected {len(regions)} Text Regions", use_container_width=True)
else:
st.image(original_image.copy(), caption="No text regions detected by model", use_container_width=True)
st.warning("No text regions were successfully parsed from the model's response for this page.")
if regions:
st.subheader("πŸ“ Extracted Texts & Regions")
for i, region_data in enumerate(regions):
box = region_data.get("box", "N/A")
text_content = region_data.get("text", "No text extracted for this region.")
with st.expander(f"Region {i+1} (Box: {box})", expanded= (i<3) ): # Expand first 3 by default
st.markdown(f"**Coordinates:** `{box}`")
st.markdown("**Text:**")
st.text_area(f"text_area_{idx}_{i}", text_content, height=max(100, int(len(text_content)*0.5)), disabled=True, label_visibility="collapsed")
else:
st.info("No text regions to display for this page.")
with st.expander("πŸ” Debug: Raw Gemini API Response", expanded=False):
st.code(raw_response, language='json')
elif uploaded_file and not api_key:
st.warning("Please enter your API key in the sidebar and click 'Analyze PDF' again.")
st.markdown("---")
st.markdown("Developed with Gemini & Streamlit.")