Spaces:
Sleeping
Sleeping
import os | |
import re | |
import io | |
import json | |
import streamlit as st | |
from PIL import Image, ImageDraw, ImageFont | |
from google import generativeai as genai # Renamed for clarity from 'google.genai' | |
from google.generativeai.types import GenerationConfig, HarmCategory, HarmBlockThreshold | |
from pdf2image import convert_from_bytes | |
from typing import List, Dict, Any, Tuple, Optional | |
# --- Constants --- | |
MODEL_NAME = "gemini-2.5-pro-exp-03-25" # Using Gemini 1.5 Flash | |
# Unified prompt to get bounding boxes and text content in one go | |
UNIFIED_DETECTION_EXTRACTION_PROMPT = """\ | |
Analyze this document image. Identify text regions and extract the text from each region. | |
Follow these rules for text regions: | |
1. GROUP RELATED CONTENT: | |
- Full tables as SINGLE regions (including headers and all rows). | |
- Paragraphs as SINGLE rectangular blocks (multiple lines as one box). | |
- Keep text columns intact. | |
- Treat list items as single region if visually grouped. | |
2. TEXT REGION REQUIREMENTS: | |
- Boundaries must tightly wrap text content. | |
- Include approximately 2% padding around text clusters, but ensure the box stays within image bounds. | |
- Exclude isolated decorative elements unless they contain text. | |
- Merge adjacent text fragments with β€1% spacing into a single region. | |
3. COORDINATE FORMAT: | |
- Normalized coordinates (0.0 to 1.0) with 3 decimal places. | |
- Format: [xmin, ymin, xmax, ymax]. | |
- Order: Top-to-bottom, then left-to-right. | |
4. SPECIAL CASES: | |
- Table cells should NOT have individual boxes; the entire table is one box. | |
- Page headers/footers as separate regions. | |
- Text wrapped around images as distinct regions. | |
OUTPUT FORMAT: | |
Return a JSON list of objects. Each object MUST have: | |
- "box": A list of 4 normalized coordinates [xmin, ymin, xmax, ymax]. | |
- "text": The extracted text string from that box. Ensure all text within the box is captured. | |
Example of a valid JSON response: | |
[ | |
{"box": [0.070, 0.120, 0.930, 0.280], "text": "Document Title and Header Information"}, | |
{"box": [0.120, 0.350, 0.880, 0.650], "text": "Table content including headers and all rows..."}, | |
{"box": [0.100, 0.700, 0.900, 0.850], "text": "This is the first paragraph of text, potentially spanning multiple lines but grouped as one logical block."}, | |
{"box": [0.100, 0.880, 0.900, 0.950], "text": "Another paragraph or distinct text block."} | |
] | |
ONLY RETURN THE VALID JSON LIST. No explanations, apologies, or other text outside the JSON structure. | |
If no text regions are found, return an empty JSON list: []. | |
""" | |
# --- Helper Functions --- | |
def get_gemini_api_key() -> Optional[str]: | |
"""Gets the Gemini API key from Streamlit secrets, environment variables, or user input.""" | |
if "GOOGLE_API_KEY" in st.secrets: | |
return st.secrets["GOOGLE_API_KEY"] | |
api_key_env = os.getenv("GOOGLE_API_KEY") | |
if api_key_env: | |
return api_key_env | |
st.sidebar.warning("Google API Key not found in secrets or environment variables.") | |
api_key_input = st.sidebar.text_input( | |
"Enter your Google API Key:", type="password", key="api_key_input" | |
) | |
if api_key_input: | |
st.session_state.GOOGLE_API_KEY = api_key_input | |
return api_key_input | |
return None | |
def parse_gemini_response(response_text: str) -> List[Dict[str, Any]]: | |
""" | |
Parses the Gemini response to extract a list of dicts with "box" and "text". | |
Tries to load as JSON, falls back to regex if needed (though ideally not). | |
""" | |
try: | |
# Attempt to find JSON block if model wraps it in markdown | |
match = re.search(r"```json\s*([\s\S]*?)\s*```", response_text, re.DOTALL) | |
if match: | |
json_str = match.group(1) | |
else: | |
# Assume the response is plain JSON or a Python list string | |
json_str = response_text.strip() | |
if not (json_str.startswith('[') and json_str.endswith(']')): | |
# If it doesn't look like a list, try to find the list within the text | |
list_match = re.search(r'(\[[\s\S]*\])', json_str) | |
if list_match: | |
json_str = list_match.group(1) | |
else: | |
st.warning(f"Response doesn't appear to be a list: {response_text[:200]}...") | |
return [] | |
# Try direct JSON parsing | |
parsed_data = json.loads(json_str) | |
if isinstance(parsed_data, list): | |
# Validate structure | |
validated_data = [] | |
for item in parsed_data: | |
if isinstance(item, dict) and "box" in item and "text" in item and \ | |
isinstance(item["box"], list) and len(item["box"]) == 4: | |
validated_data.append(item) | |
else: | |
st.warning(f"Skipping invalid item in JSON: {item}") | |
return validated_data | |
else: | |
st.warning(f"Parsed JSON is not a list: {type(parsed_data)}") | |
return [] | |
except json.JSONDecodeError as e: | |
st.warning(f"JSONDecodeError: {e}. Raw response: {response_text[:500]}") | |
# Fallback to regex if JSON parsing fails (less robust) | |
# This regex assumes the "box" and "text" structure as described in the prompt | |
# It's a simplified regex for demonstration and might need adjustment for complex texts. | |
regions = [] | |
# Regex to find "box": [coords], "text": "content" | |
# This is a simplified regex and might struggle with escaped quotes in text | |
pattern = r'\{\s*"box"\s*:\s*\[([\d\.]+),\s*([\d\.]+),\s*([\d\.]+),\s*([\d\.]+)\]\s*,\s*"text"\s*:\s*"(.*?)"\s*\}' | |
matches = re.findall(pattern, response_text, re.DOTALL) | |
for match in matches: | |
try: | |
box = [float(c) for c in match[:4]] | |
text = match[4].replace('\\n', '\n').replace('\\"', '"') # Handle basic escapes | |
regions.append({"box": box, "text": text}) | |
except ValueError: | |
st.warning(f"Could not parse box coordinates from regex match: {match}") | |
if not regions and response_text.strip() and response_text.strip() != "[]": | |
st.error(f"Failed to parse response using JSON and regex. Raw: {response_text[:200]}") | |
return regions | |
except Exception as e: | |
st.error(f"Error parsing Gemini response: {e}. Raw response: {response_text[:500]}") | |
return [] | |
def draw_bounding_boxes(image: Image.Image, regions: List[Dict[str, Any]]) -> Image.Image: | |
"""Draws numbered bounding boxes on the image.""" | |
if not regions: | |
return image | |
draw = ImageDraw.Draw(image) | |
width, height = image.size | |
try: | |
# Try to load a common font, fall back to default | |
font = ImageFont.truetype("arial.ttf", int(height * 0.02)) # Adjust size as needed | |
except IOError: | |
font = ImageFont.load_default() | |
for i, region_data in enumerate(regions): | |
try: | |
box = region_data.get("box") | |
if not (isinstance(box, list) and len(box) == 4): | |
st.warning(f"Skipping invalid box data for region {i+1}: {box}") | |
continue | |
# Convert normalized coordinates to pixel values, clamping to image bounds | |
xmin = max(0.0, min(1.0, box[0])) * width | |
ymin = max(0.0, min(1.0, box[1])) * height | |
xmax = max(0.0, min(1.0, box[2])) * width | |
ymax = max(0.0, min(1.0, box[3])) * height | |
if xmin >= xmax or ymin >= ymax: | |
st.warning(f"Skipping invalid box dimensions for region {i+1}: {[xmin, ymin, xmax, ymax]}") | |
continue | |
draw.rectangle([xmin, ymin, xmax, ymax], outline="#00FF00", width=3) | |
label = str(i + 1) | |
# Position label inside the box, handle potential small boxes | |
text_x = xmin + 5 | |
text_y = ymin + 5 | |
# For very small boxes, drawing text might be an issue, but let's try | |
text_bbox = draw.textbbox((text_x, text_y), label, font=font) | |
text_width = text_bbox[2] - text_bbox[0] | |
text_height = text_bbox[3] - text_bbox[1] | |
# Simple check to ensure text fits somewhat | |
if text_x + text_width > xmax - 5: | |
text_x = max(xmin, xmax - text_width - 5) | |
if text_y + text_height > ymax - 5: | |
text_y = max(ymin, ymax - text_height - 5) | |
draw.text((text_x, text_y), label, fill="red", font=font) | |
except Exception as e: | |
st.error(f"Error drawing box for region {i+1}: {str(e)}") | |
return image | |
def process_image_with_gemini( | |
client: genai.GenerativeModel, image_bytes: bytes | |
) -> Tuple[List[Dict[str, Any]], str]: | |
"""Sends image to Gemini and gets bounding boxes and text.""" | |
try: | |
response = client.generate_content( | |
contents=[ | |
UNIFIED_DETECTION_EXTRACTION_PROMPT, | |
{"mime_type": "image/png", "data": image_bytes} | |
], | |
generation_config=GenerationConfig( | |
temperature=0.1, # Lower temperature for more deterministic output | |
# max_output_tokens=8192 # Max for flash 1.5 | |
), | |
safety_settings={ # Adjust as needed | |
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, | |
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, | |
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, | |
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, | |
} | |
) | |
if response.parts: | |
raw_text_response = response.text # .text often combines parts | |
else: # Fallback if .text is empty but candidates exist | |
raw_text_response = response.candidates[0].content.parts[0].text if response.candidates and response.candidates[0].content.parts else "" | |
if not raw_text_response: | |
st.error("Received empty response from Gemini API.") | |
return [], "" | |
regions = parse_gemini_response(raw_text_response) | |
return regions, raw_text_response | |
except Exception as e: | |
st.error(f"Error calling Gemini API: {str(e)}") | |
if hasattr(e, 'response') and e.response: # For google.api_core.exceptions | |
st.error(f"API Response Error Details: {e.response}") | |
return [], f"API Error: {str(e)}" | |
# --- Streamlit UI --- | |
st.set_page_config(layout="wide") | |
st.title("π PDF Text Detection & Extraction with Gemini") | |
# --- API Key Configuration --- | |
st.sidebar.header("π API Configuration") | |
api_key = get_gemini_api_key() | |
gemini_client = None | |
if api_key: | |
try: | |
genai.configure(api_key=api_key) | |
gemini_client = genai.GenerativeModel(MODEL_NAME) | |
st.sidebar.success("Gemini API Key configured.") | |
except Exception as e: | |
st.sidebar.error(f"Failed to configure Gemini: {e}") | |
api_key = None # Invalidate API key if configuration fails | |
else: | |
st.sidebar.info("Please provide your Google API Key to use the application.") | |
st.info("Please enter your Google API Key in the sidebar to proceed.") | |
# --- File Upload and Processing --- | |
st.header("π€ Upload PDF") | |
uploaded_file = st.file_uploader("Choose a PDF file", type=["pdf"]) | |
dpi_options = [150, 200, 300, 400] | |
selected_dpi = st.select_slider( | |
"Select DPI for PDF to Image conversion:", | |
options=dpi_options, | |
value=200 # Default DPI | |
) | |
if uploaded_file and st.button("π Analyze PDF", disabled=not api_key or not gemini_client): | |
if not api_key or not gemini_client: | |
st.error("API Key not configured. Please enter it in the sidebar.") | |
else: | |
with st.spinner(f"Processing PDF ({uploaded_file.name})... This may take a moment."): | |
try: | |
pdf_bytes = uploaded_file.read() | |
pil_images: List[Image.Image] = convert_from_bytes(pdf_bytes, dpi=selected_dpi) | |
if not pil_images: | |
st.warning("Could not extract any images from the PDF.") | |
else: | |
st.success(f"PDF converted to {len(pil_images)} image(s). Analyzing with Gemini...") | |
page_results = [] | |
for i, p_image in enumerate(pil_images): | |
page_progess = st.progress(0, text=f"Processing Page {i+1}/{len(pil_images)}...") | |
# Convert PIL image to bytes for API | |
img_byte_arr = io.BytesIO() | |
p_image.save(img_byte_arr, format='PNG') | |
img_bytes = img_byte_arr.getvalue() | |
page_progess.progress(30, text=f"Page {i+1}: Sending to Gemini...") | |
regions, raw_response = process_image_with_gemini(gemini_client, img_bytes) | |
page_progess.progress(90, text=f"Page {i+1}: Received response.") | |
page_results.append({ | |
"original_image": p_image, | |
"regions": regions, | |
"raw_response": raw_response | |
}) | |
page_progess.progress(100, text=f"Page {i+1}: Done.") | |
page_progess.empty() | |
# Store results in session state to avoid re-processing on minor UI interaction | |
# Though with the button click, this is less critical for single runs. | |
st.session_state.page_results = page_results | |
except Exception as e: | |
st.error(f"An error occurred during PDF processing: {str(e)}") | |
if "page_results" in st.session_state: | |
del st.session_state.page_results # Clear partial results | |
# --- Display Results --- | |
if "page_results" in st.session_state and st.session_state.page_results: | |
st.header("π Analysis Results") | |
results = st.session_state.page_results | |
tab_titles = [f"Page {i+1}" for i in range(len(results))] | |
tabs = st.tabs(tab_titles) | |
for idx, (tab, page_data) in enumerate(zip(tabs, results)): | |
with tab: | |
st.subheader(f"Page {idx + 1} Analysis") | |
original_image = page_data["original_image"] | |
regions = page_data["regions"] | |
raw_response = page_data["raw_response"] | |
col1, col2 = st.columns(2) | |
with col1: | |
st.image(original_image, caption="Original Image", use_container_width=True) | |
with col2: | |
if regions: | |
annotated_image = draw_bounding_boxes(original_image.copy(), regions) | |
st.image(annotated_image, caption=f"Detected {len(regions)} Text Regions", use_container_width=True) | |
else: | |
st.image(original_image.copy(), caption="No text regions detected by model", use_container_width=True) | |
st.warning("No text regions were successfully parsed from the model's response for this page.") | |
if regions: | |
st.subheader("π Extracted Texts & Regions") | |
for i, region_data in enumerate(regions): | |
box = region_data.get("box", "N/A") | |
text_content = region_data.get("text", "No text extracted for this region.") | |
with st.expander(f"Region {i+1} (Box: {box})", expanded= (i<3) ): # Expand first 3 by default | |
st.markdown(f"**Coordinates:** `{box}`") | |
st.markdown("**Text:**") | |
st.text_area(f"text_area_{idx}_{i}", text_content, height=max(100, int(len(text_content)*0.5)), disabled=True, label_visibility="collapsed") | |
else: | |
st.info("No text regions to display for this page.") | |
with st.expander("π Debug: Raw Gemini API Response", expanded=False): | |
st.code(raw_response, language='json') | |
elif uploaded_file and not api_key: | |
st.warning("Please enter your API key in the sidebar and click 'Analyze PDF' again.") | |
st.markdown("---") | |
st.markdown("Developed with Gemini & Streamlit.") | |