rongo1
fix: fixed init
6a94352
raw
history blame
27.2 kB
import gradio as gr
import google.generativeai as genai
import json
import pandas as pd
from datetime import datetime
import os
from pathlib import Path
from PIL import Image
import io
import base64
import logging
import sys
import shutil
# Configure logging
# Simplified logging for cloud deployment
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Configure Gemini API
logger.info("Configuring Gemini API")
gemini_api_key = os.getenv("Gemini_API")
if not gemini_api_key:
logger.error("Gemini_API environment variable not found!")
logger.error("Please set the Gemini_API environment variable with your Google Gemini API key")
logger.error("For Hugging Face Spaces: Add it as a Repository Secret in Space Settings")
raise ValueError("❌ Gemini_API environment variable is required. Please set it in your environment or Hugging Face Space secrets.")
genai.configure(api_key=gemini_api_key)
logger.info("Gemini API configured successfully")
# Create output directories
logger.info("Setting up output directories")
output_dir = Path("business_card_exports")
images_dir = Path("business_cards")
output_dir.mkdir(exist_ok=True)
images_dir.mkdir(exist_ok=True)
logger.info(f"Export directory created/verified: {output_dir}")
logger.info(f"Images directory created/verified: {images_dir}")
# Log startup
logger.info("Business Card Data Extractor starting up")
logger.info(f"Working directory: {os.getcwd()}")
logger.info(f"Export directory: {output_dir.absolute()}")
logger.info(f"Images directory: {images_dir.absolute()}")
def extract_business_card_data_batch(images, filenames, model_name="gemini-2.5-flash"):
"""Extract data from multiple business card images in a single API call"""
logger.info(f"Starting batch extraction for {len(images)} images using model: {model_name}")
logger.debug(f"Filenames in batch: {filenames}")
# Load prompts
logger.debug("Loading prompt templates")
try:
with open("prompts/prompt.txt", "r", encoding="utf-8") as f:
prompt_template = f.read()
logger.debug(f"Loaded prompt template ({len(prompt_template)} characters)")
with open("prompts/system_prompt.txt", "r", encoding="utf-8") as f:
system_prompt = f.read()
logger.debug(f"Loaded system prompt ({len(system_prompt)} characters)")
except FileNotFoundError as e:
logger.error(f"Failed to load prompt files: {e}")
raise
# Configure model
logger.debug(f"Configuring Gemini model: {model_name}")
generation_config = {
"temperature": 0.1,
"response_mime_type": "application/json"
}
try:
model = genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
system_instruction=system_prompt
)
logger.debug("Gemini model configured successfully")
except Exception as e:
logger.error(f"Failed to configure Gemini model: {e}")
raise
# Prepare multiple images for the model
logger.debug("Preparing content parts for API request")
content_parts = []
# Add the prompt first
batch_prompt = f"""
{prompt_template}
I'm sending you {len(images)} business card images. Please extract the data from each card and return a JSON array with {len(images)} objects. Each object should contain the extracted data for one business card in the same order as the images.
Return format: [card1_data, card2_data, card3_data, ...]
"""
content_parts.append(batch_prompt)
logger.debug(f"Added batch prompt ({len(batch_prompt)} characters)")
# Add each image
logger.debug("Converting and adding images to request")
for i, image in enumerate(images):
try:
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
image_part = {
"mime_type": "image/png",
"data": img_base64
}
content_parts.append(f"Business Card {i+1}:")
content_parts.append(image_part)
logger.debug(f"Added image {i+1} ({len(img_base64)} base64 characters)")
except Exception as e:
logger.error(f"Failed to process image {i+1} ({filenames[i] if i < len(filenames) else 'unknown'}): {e}")
raise
# Generate content
logger.info(f"Making API call to {model_name} with {len(content_parts)} content parts")
try:
response = model.generate_content(content_parts)
logger.info(f"API call successful. Response length: {len(response.text) if response.text else 0} characters")
logger.debug(f"Raw response: {response.text[:500]}..." if len(response.text) > 500 else f"Raw response: {response.text}")
except Exception as e:
logger.error(f"API call failed: {e}")
raise
# Parse response
logger.debug("Parsing JSON response")
try:
# Parse JSON response
response_data = json.loads(response.text)
logger.info(f"Successfully parsed JSON response")
# Ensure we got an array
if not isinstance(response_data, list):
logger.debug("Response is not an array, converting to array")
response_data = [response_data]
logger.info(f"Response contains {len(response_data)} extracted card data objects")
# Add metadata to each card's data
logger.debug("Adding metadata to extracted data")
for i, data in enumerate(response_data):
data['method'] = model_name
if i < len(filenames):
data['filename'] = filenames[i]
logger.debug(f"Added metadata to card {i+1}: {filenames[i]}")
logger.info(f"Batch extraction completed successfully for {len(response_data)} cards")
return response_data
except json.JSONDecodeError as e:
logger.warning(f"Initial JSON parsing failed: {e}. Attempting to clean response.")
# Try to clean the response
text = response.text.strip()
if text.startswith("```json"):
text = text[7:]
logger.debug("Removed ```json prefix")
if text.endswith("```"):
text = text[:-3]
logger.debug("Removed ``` suffix")
try:
response_data = json.loads(text.strip())
logger.info("Successfully parsed cleaned JSON response")
# Ensure we got an array
if not isinstance(response_data, list):
logger.debug("Cleaned response is not an array, converting to array")
response_data = [response_data]
logger.info(f"Cleaned response contains {len(response_data)} extracted card data objects")
# Add metadata to each card's data
logger.debug("Adding metadata to cleaned extracted data")
for i, data in enumerate(response_data):
data['method'] = model_name
if i < len(filenames):
data['filename'] = filenames[i]
logger.debug(f"Added metadata to cleaned card {i+1}: {filenames[i]}")
logger.info(f"Batch extraction completed successfully after cleaning for {len(response_data)} cards")
return response_data
except json.JSONDecodeError as e2:
logger.error(f"Failed to parse even cleaned JSON response: {e2}")
logger.error(f"Cleaned text: {text[:1000]}...")
raise
def extract_business_card_data(image, model_name="gemini-2.5-flash"):
"""Extract data from single business card image - legacy function"""
logger.debug(f"Single card extraction called with model: {model_name}")
result = extract_business_card_data_batch([image], ["single_card"], model_name)
if result:
logger.debug("Single card extraction successful")
return result[0]
else:
logger.warning("Single card extraction returned no results")
return None
def process_business_cards(images, model_name="gemini-2.5-flash", save_images=True):
"""Process multiple business card images and create both current run and cumulative Excel files"""
logger.info(f"Starting business card processing session")
logger.info(f"Number of images received: {len(images) if images else 0}")
logger.info(f"Model selected: {model_name}")
logger.info(f"Save images option: {save_images}")
if not images:
logger.warning("No images provided for processing")
return None, None, "Please upload at least one business card image.", None
all_data = []
errors = []
# Prepare images for batch processing
logger.info("Preparing images for batch processing")
image_batches = []
filename_batches = []
batch_size = 5
logger.debug(f"Using batch size: {batch_size}")
# Load and group images into batches of 5
loaded_images = []
filenames = []
logger.info(f"Loading {len(images)} images")
for idx, image_path in enumerate(images):
try:
# Load image
if isinstance(image_path, str):
logger.debug(f"Loading image {idx+1}: {image_path}")
image = Image.open(image_path)
filename = os.path.basename(image_path)
else:
logger.debug(f"Using direct image object {idx+1}")
image = image_path
filename = f"image_{idx+1}.png"
loaded_images.append(image)
filenames.append(filename)
logger.debug(f"Successfully loaded image {idx+1}: {filename} (size: {image.size})")
except Exception as e:
error_msg = f"Error loading {image_path}: {str(e)}"
logger.error(error_msg)
errors.append(error_msg)
logger.info(f"Successfully loaded {len(loaded_images)} out of {len(images)} images")
# Save images if requested
saved_image_paths = []
if save_images and loaded_images:
logger.info(f"Saving {len(loaded_images)} images to business_cards directory")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
for i, (image, filename) in enumerate(zip(loaded_images, filenames)):
try:
# Create unique filename with timestamp
name, ext = os.path.splitext(filename)
if not ext:
ext = '.png'
unique_filename = f"{timestamp}_{i+1:03d}_{name}{ext}"
image_path = images_dir / unique_filename
# Save the image
image.save(image_path)
saved_image_paths.append(str(image_path))
logger.debug(f"Saved image {i+1}: {unique_filename}")
except Exception as e:
logger.error(f"Failed to save image {filename}: {e}")
logger.info(f"Successfully saved {len(saved_image_paths)} images")
# Group into batches
logger.info(f"Grouping {len(loaded_images)} images into batches of {batch_size}")
for i in range(0, len(loaded_images), batch_size):
batch_images = loaded_images[i:i + batch_size]
batch_filenames = filenames[i:i + batch_size]
image_batches.append(batch_images)
filename_batches.append(batch_filenames)
logger.debug(f"Created batch {len(image_batches)} with {len(batch_images)} images: {batch_filenames}")
logger.info(f"Created {len(image_batches)} batches for processing")
# Process each batch
logger.info(f"Starting processing of {len(image_batches)} batches")
for batch_idx, (batch_images, batch_filenames) in enumerate(zip(image_batches, filename_batches)):
try:
logger.info(f"Processing batch {batch_idx + 1}/{len(image_batches)} ({len(batch_images)} cards)")
print(f"Processing batch {batch_idx + 1}/{len(image_batches)} ({len(batch_images)} cards)")
# Extract data for the entire batch
logger.debug(f"Calling batch extraction for batch {batch_idx + 1}")
batch_data = extract_business_card_data_batch(batch_images, batch_filenames, model_name)
logger.info(f"Batch {batch_idx + 1} extraction completed, got {len(batch_data)} results")
# Process each card's data in the batch
logger.debug(f"Processing individual card data for batch {batch_idx + 1}")
for i, data in enumerate(batch_data):
card_filename = batch_filenames[i] if i < len(batch_filenames) else f"card_{i+1}"
logger.debug(f"Processing card data for: {card_filename}")
# Add timestamp to data
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data['processed_date'] = timestamp
logger.debug(f"Added timestamp {timestamp} to {card_filename}")
# Add saved image path if images were saved
global_index = batch_idx * batch_size + i
if save_images and global_index < len(saved_image_paths):
data['saved_image_path'] = saved_image_paths[global_index]
logger.debug(f"Added saved image path for {card_filename}: {saved_image_paths[global_index]}")
else:
data['saved_image_path'] = None
# Handle multiple values (emails, phones) by joining with commas
list_fields_processed = []
for key, value in data.items():
if isinstance(value, list):
original_count = len(value)
data[key] = ', '.join(str(v) for v in value)
list_fields_processed.append(f"{key}({original_count})")
logger.debug(f"Combined {original_count} {key} values for {card_filename}")
if list_fields_processed:
logger.debug(f"List fields processed for {card_filename}: {list_fields_processed}")
# Combine phone fields if they exist separately
if 'mobile_phones' in data and data['mobile_phones']:
logger.debug(f"Combining phone fields for {card_filename}")
if data.get('phones'):
# Combine mobile and regular phones
existing_phones = str(data['phones']) if data['phones'] else ""
mobile_phones = str(data['mobile_phones']) if data['mobile_phones'] else ""
combined = [p for p in [existing_phones, mobile_phones] if p and p != 'null']
data['phones'] = ', '.join(combined)
logger.debug(f"Combined phones for {card_filename}: {data['phones']}")
else:
data['phones'] = data['mobile_phones']
logger.debug(f"Used mobile phones as phones for {card_filename}: {data['phones']}")
del data['mobile_phones'] # Remove separate mobile field
# Combine address fields if they exist separately
if 'street' in data and data['street']:
logger.debug(f"Combining address fields for {card_filename}")
if data.get('address'):
# If both exist, combine them
if str(data['street']) != str(data['address']) and data['street'] != 'null':
original_address = data['address']
data['address'] = f"{data['street']}, {data['address']}"
logger.debug(f"Combined address for {card_filename}: '{data['street']}' + '{original_address}' = '{data['address']}'")
else:
data['address'] = data['street']
logger.debug(f"Used street as address for {card_filename}: {data['address']}")
del data['street'] # Remove separate street field
all_data.append(data)
logger.debug(f"Added processed data for {card_filename} to results (total: {len(all_data)})")
logger.info(f"Completed processing batch {batch_idx + 1}, total cards processed so far: {len(all_data)}")
except Exception as e:
batch_filenames_str = ', '.join(batch_filenames)
error_msg = f"Error processing batch {batch_idx + 1} ({batch_filenames_str}): {str(e)}"
logger.error(error_msg)
errors.append(error_msg)
if not all_data:
logger.warning("No data could be extracted from any images")
error_summary = "No data could be extracted from the images.\n" + "\n".join(errors)
return None, None, error_summary, None
logger.info(f"Successfully extracted data from {len(all_data)} business cards")
# Create DataFrame for current run
logger.info("Creating DataFrame for current run")
current_df = pd.DataFrame(all_data)
logger.debug(f"Current run DataFrame created with {len(current_df)} rows and {len(current_df.columns)} columns")
logger.debug(f"Columns: {list(current_df.columns)}")
# Generate timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
logger.debug(f"Generated timestamp: {timestamp}")
# Create current run file
current_filename = output_dir / f"current_run_{timestamp}.xlsx"
logger.info(f"Current run file will be saved as: {current_filename}")
# Load existing cumulative data if it exists
cumulative_filename = output_dir / "all_business_cards_total.xlsx"
logger.info(f"Checking for existing cumulative file: {cumulative_filename}")
if cumulative_filename.exists():
logger.info("Existing cumulative file found, loading and merging data")
try:
existing_df = pd.read_excel(cumulative_filename)
logger.info(f"Loaded existing data: {len(existing_df)} rows")
# Append new data to existing
cumulative_df = pd.concat([existing_df, current_df], ignore_index=True)
logger.info(f"Merged data: {len(cumulative_df)} total rows ({len(existing_df)} existing + {len(current_df)} new)")
except Exception as e:
error_msg = f"Warning: Could not load existing data: {e}"
logger.warning(error_msg)
print(error_msg)
cumulative_df = current_df
logger.info("Using current data only for cumulative file")
else:
logger.info("No existing cumulative file found, using current data only")
cumulative_df = current_df
# Write current run Excel file
logger.info(f"Writing current run Excel file: {current_filename}")
try:
with pd.ExcelWriter(current_filename, engine='openpyxl') as writer:
current_df.to_excel(writer, index=False, sheet_name='Current Run')
logger.debug(f"Written {len(current_df)} rows to 'Current Run' sheet")
# Auto-adjust column widths
logger.debug("Auto-adjusting column widths for current run file")
worksheet = writer.sheets['Current Run']
adjusted_columns = []
for column in current_df:
column_length = max(current_df[column].astype(str).map(len).max(), len(column))
col_idx = current_df.columns.get_loc(column)
final_width = min(column_length + 2, 50)
worksheet.column_dimensions[chr(65 + col_idx)].width = final_width
adjusted_columns.append(f"{column}:{final_width}")
logger.debug(f"Adjusted column widths: {adjusted_columns}")
logger.info(f"Current run Excel file saved successfully: {current_filename}")
except Exception as e:
logger.error(f"Failed to write current run Excel file: {e}")
raise
# Write cumulative Excel file
logger.info(f"Writing cumulative Excel file: {cumulative_filename}")
try:
with pd.ExcelWriter(cumulative_filename, engine='openpyxl') as writer:
cumulative_df.to_excel(writer, index=False, sheet_name='All Business Cards')
logger.debug(f"Written {len(cumulative_df)} rows to 'All Business Cards' sheet")
# Auto-adjust column widths
logger.debug("Auto-adjusting column widths for cumulative file")
worksheet = writer.sheets['All Business Cards']
adjusted_columns = []
for column in cumulative_df:
column_length = max(cumulative_df[column].astype(str).map(len).max(), len(column))
col_idx = cumulative_df.columns.get_loc(column)
final_width = min(column_length + 2, 50)
worksheet.column_dimensions[chr(65 + col_idx)].width = final_width
adjusted_columns.append(f"{column}:{final_width}")
logger.debug(f"Adjusted column widths: {adjusted_columns}")
logger.info(f"Cumulative Excel file saved successfully: {cumulative_filename}")
except Exception as e:
logger.error(f"Failed to write cumulative Excel file: {e}")
raise
# Create summary message
logger.info("Creating summary message")
num_batches = len(image_batches) if 'image_batches' in locals() else 1
summary = f"Successfully processed {len(all_data)} business card(s) in {num_batches} batch(es) of up to 5 cards.\n"
summary += f"πŸ€– AI Model used: {model_name}\n"
summary += f"⚑ API calls made: {num_batches} (instead of {len(all_data)})\n"
if save_images:
num_saved = len(saved_image_paths) if 'saved_image_paths' in locals() else 0
summary += f"πŸ’Ύ Images saved: {num_saved} cards saved to business_cards folder\n\n"
else:
summary += f"πŸ’Ύ Images saved: No (save option was disabled)\n\n"
summary += f"πŸ“ Current run file: {current_filename.name}\n"
summary += f"πŸ“ Total cumulative file: {cumulative_filename.name}\n"
summary += f"πŸ“Š Total cards in database: {len(cumulative_df)}\n\n"
if errors:
logger.warning(f"Encountered {len(errors)} errors during processing")
summary += "Errors encountered:\n" + "\n".join(errors)
for error in errors:
logger.warning(f"Processing error: {error}")
else:
logger.info("No errors encountered during processing")
# Display preview of current run
logger.debug("Creating preview DataFrame")
preview_df = current_df.head(10)
logger.debug(f"Preview contains {len(preview_df)} rows")
logger.info("Business card processing session completed successfully")
logger.info(f"Session summary - Cards: {len(all_data)}, Batches: {num_batches}, API calls: {num_batches}, Total DB size: {len(cumulative_df)}")
return str(current_filename), str(cumulative_filename), summary, preview_df
# Create Gradio interface
logger.info("Creating Gradio interface")
with gr.Blocks(title="Business Card Data Extractor") as demo:
gr.Markdown(
"""
# Business Card Data Extractor
Upload business card images to extract contact information and export to Excel.
Cards are processed in batches of 5 for efficiency (fewer API calls, lower cost).
**Two files are generated:**
- πŸ“ **Current Run**: Contains only the cards you just processed
- πŸ“Š **Total Database**: Contains ALL cards ever processed (cumulative)
**Image Storage:**
- πŸ’Ύ **Optional**: Save uploaded images to business_cards folder
- πŸ“ **Tracking**: Image file paths included in Excel database
"""
)
with gr.Row():
with gr.Column():
image_input = gr.File(
label="Upload Business Cards",
file_count="multiple",
file_types=[".jpg", ".jpeg", ".png", ".webp", ".bmp"]
)
model_selector = gr.Dropdown(
choices=["gemini-2.5-flash", "gemini-2.5-pro"],
value="gemini-2.5-flash",
label="AI Model Selection"
)
save_images_checkbox = gr.Checkbox(
value=True,
label="Save Business Card Images"
)
process_btn = gr.Button("Process Business Cards", variant="primary")
with gr.Column():
current_file = gr.File(label="πŸ“ Download Current Run")
total_file = gr.File(label="πŸ“Š Download Total Database")
status_output = gr.Textbox(label="Processing Status", lines=5)
preview_output = gr.Dataframe(label="Data Preview (Current Run)")
# Wrapper function for better error handling and logging
def process_with_logging(images, model_name, save_images):
"""Wrapper function to add error handling and logging to the main process"""
try:
logger.info(f"Gradio interface initiated processing request")
logger.debug(f"Request parameters - Images: {len(images) if images else 0}, Model: {model_name}, Save Images: {save_images}")
return process_business_cards(images, model_name, save_images)
except Exception as e:
logger.error(f"Unexpected error in Gradio processing: {e}")
error_msg = f"An unexpected error occurred: {str(e)}\nPlease check the logs for more details."
return None, None, error_msg, None
# Handle processing
process_btn.click(
fn=process_with_logging,
inputs=[image_input, model_selector, save_images_checkbox],
outputs=[current_file, total_file, status_output, preview_output]
)
gr.Markdown(
"""
## Features:
- πŸ€– **Model Selection**: Choose between Gemini 2.5 Flash (fast) or Pro (accurate)
- ⚑ **Batch Processing**: Processes 5 cards per API call for efficiency
- πŸ“„ **Data Extraction**: Names, emails, phone numbers, addresses, and more
- πŸ“ž **Smart Combination**: Multiple emails/phones combined with commas
- 🏠 **Address Merging**: All phone types and address fields combined
- πŸ’Ύ **Image Storage**: Optionally save images to business_cards folder
- πŸ“Š **Dual Output**: Current run + cumulative database files
- πŸ“ **Full Tracking**: Processing date, filename, image path, and AI model used
- 🎯 **One Row Per Card**: Each business card becomes one spreadsheet row
"""
)
# Launch for Hugging Face Spaces deployment
logger.info("Starting Gradio demo")
demo.launch()