|
import gradio as gr |
|
import google.generativeai as genai |
|
import json |
|
import pandas as pd |
|
from datetime import datetime |
|
import os |
|
from pathlib import Path |
|
from PIL import Image |
|
import io |
|
import base64 |
|
import logging |
|
import sys |
|
import tempfile |
|
|
|
|
|
try: |
|
from pillow_heif import register_heif_opener |
|
register_heif_opener() |
|
HEIF_SUPPORTED = True |
|
except ImportError: |
|
HEIF_SUPPORTED = False |
|
logging.warning("pillow-heif not installed. HEIF/HEIC support disabled.") |
|
|
|
|
|
from google_funcs import ( |
|
get_drive_service, |
|
upload_excel_to_exports_folder, |
|
upload_image_to_images_folder, |
|
list_files_in_folder, |
|
download_file_from_drive, |
|
get_existing_cumulative_file, |
|
cleanup_duplicate_cumulative_files, |
|
delete_file_from_drive |
|
) |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(sys.stdout) |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
logger.info("Configuring AI API") |
|
gemini_api_key = os.getenv("Gemini_API") |
|
if not gemini_api_key: |
|
logger.error("Gemini_API environment variable not found!") |
|
logger.error("Please set the Gemini_API environment variable with your AI API key") |
|
raise ValueError("β Gemini_API environment variable is required. Please set it in your environment.") |
|
|
|
genai.configure(api_key=gemini_api_key) |
|
logger.info("AI API configured successfully") |
|
|
|
|
|
logger.info("Initializing Google Drive service") |
|
try: |
|
drive_service = get_drive_service() |
|
logger.info("Google Drive service initialized successfully") |
|
except Exception as e: |
|
logger.error(f"Failed to initialize Google Drive service: {e}") |
|
logger.error("Please ensure GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables are set") |
|
raise ValueError("β Google Drive credentials are required. Please set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables.") |
|
|
|
|
|
logger.info("Business Card Data Extractor starting up with Google Drive storage") |
|
|
|
def upload_to_google_drive(file_path, is_excel=False, filename=None): |
|
"""Upload a file to Google Drive""" |
|
try: |
|
if is_excel: |
|
logger.info(f"Uploading Excel file to Google Drive: {filename or file_path}") |
|
result = upload_excel_to_exports_folder(drive_service, file_path=file_path, filename=filename) |
|
else: |
|
logger.info(f"Uploading image file to Google Drive: {filename or file_path}") |
|
result = upload_image_to_images_folder(drive_service, file_path=file_path, filename=filename) |
|
|
|
if result: |
|
logger.info(f"Successfully uploaded to Google Drive: {result['webViewLink']}") |
|
return result |
|
else: |
|
logger.error("Failed to upload to Google Drive") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Failed to upload to Google Drive: {e}") |
|
return None |
|
|
|
def upload_bytes_to_google_drive(file_data, filename, is_excel=False): |
|
"""Upload file data (bytes) to Google Drive""" |
|
try: |
|
if is_excel: |
|
logger.info(f"Uploading Excel data to Google Drive: {filename}") |
|
result = upload_excel_to_exports_folder(drive_service, file_data=file_data, filename=filename) |
|
else: |
|
logger.info(f"Uploading image data to Google Drive: {filename}") |
|
result = upload_image_to_images_folder(drive_service, file_data=file_data, filename=filename) |
|
|
|
if result: |
|
logger.info(f"Successfully uploaded to Google Drive: {result['webViewLink']}") |
|
return result |
|
else: |
|
logger.error("Failed to upload to Google Drive") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Failed to upload to Google Drive: {e}") |
|
return None |
|
|
|
def extract_business_card_data_batch(images, filenames, model_name="gemini-2.5-flash"): |
|
"""Extract data from multiple business card images in a single API call""" |
|
|
|
logger.info(f"Starting batch extraction for {len(images)} images using model: {model_name}") |
|
logger.debug(f"Filenames in batch: {filenames}") |
|
|
|
|
|
logger.debug("Loading prompt templates") |
|
try: |
|
with open("prompts/prompt.txt", "r", encoding="utf-8") as f: |
|
prompt_template = f.read() |
|
logger.debug(f"Loaded prompt template ({len(prompt_template)} characters)") |
|
|
|
with open("prompts/system_prompt.txt", "r", encoding="utf-8") as f: |
|
system_prompt = f.read() |
|
logger.debug(f"Loaded system prompt ({len(system_prompt)} characters)") |
|
except FileNotFoundError as e: |
|
logger.error(f"Failed to load prompt files: {e}") |
|
raise |
|
|
|
|
|
logger.debug(f"Configuring AI model: {model_name}") |
|
generation_config = { |
|
"temperature": 0.1, |
|
"response_mime_type": "application/json" |
|
} |
|
|
|
try: |
|
model = genai.GenerativeModel( |
|
model_name=model_name, |
|
generation_config=generation_config, |
|
system_instruction=system_prompt |
|
) |
|
logger.debug("AI model configured successfully") |
|
except Exception as e: |
|
logger.error(f"Failed to configure AI model: {e}") |
|
raise |
|
|
|
|
|
logger.debug("Preparing content parts for API request") |
|
content_parts = [] |
|
|
|
|
|
batch_prompt = f""" |
|
{prompt_template} |
|
|
|
I'm sending you {len(images)} business card images. Please extract the data from each card and return a JSON array with {len(images)} objects. Each object should contain the extracted data for one business card in the same order as the images. |
|
|
|
Return format: [card1_data, card2_data, card3_data, ...] |
|
""" |
|
content_parts.append(batch_prompt) |
|
logger.debug(f"Added batch prompt ({len(batch_prompt)} characters)") |
|
|
|
|
|
logger.debug("Converting and adding images to request") |
|
for i, image in enumerate(images): |
|
try: |
|
buffered = io.BytesIO() |
|
image.save(buffered, format="PNG") |
|
img_base64 = base64.b64encode(buffered.getvalue()).decode() |
|
|
|
image_part = { |
|
"mime_type": "image/png", |
|
"data": img_base64 |
|
} |
|
content_parts.append(f"Business Card {i+1}:") |
|
content_parts.append(image_part) |
|
logger.debug(f"Added image {i+1} ({len(img_base64)} base64 characters)") |
|
except Exception as e: |
|
logger.error(f"Failed to process image {i+1} ({filenames[i] if i < len(filenames) else 'unknown'}): {e}") |
|
raise |
|
|
|
|
|
logger.info(f"Making API call to {model_name} with {len(content_parts)} content parts") |
|
try: |
|
response = model.generate_content(content_parts) |
|
logger.info(f"API call successful. Response length: {len(response.text) if response.text else 0} characters") |
|
logger.debug(f"Raw response: {response.text[:500]}..." if len(response.text) > 500 else f"Raw response: {response.text}") |
|
except Exception as e: |
|
logger.error(f"API call failed: {e}") |
|
raise |
|
|
|
|
|
logger.debug("Parsing JSON response") |
|
try: |
|
|
|
response_data = json.loads(response.text) |
|
logger.info(f"Successfully parsed JSON response") |
|
|
|
|
|
if not isinstance(response_data, list): |
|
logger.debug("Response is not an array, converting to array") |
|
response_data = [response_data] |
|
|
|
logger.info(f"Response contains {len(response_data)} extracted card data objects") |
|
|
|
|
|
logger.debug("Adding metadata to extracted data") |
|
for i, data in enumerate(response_data): |
|
|
|
data['method'] = "Speed-Optimized model" if "flash" in model_name else "Accuracy-Optimized model" |
|
if i < len(filenames): |
|
data['filename'] = filenames[i] |
|
logger.debug(f"Added metadata to card {i+1}: {filenames[i]}") |
|
|
|
logger.info(f"Batch extraction completed successfully for {len(response_data)} cards") |
|
return response_data |
|
|
|
except json.JSONDecodeError as e: |
|
logger.warning(f"Initial JSON parsing failed: {e}. Attempting to clean response.") |
|
|
|
text = response.text.strip() |
|
if text.startswith("```json"): |
|
text = text[7:] |
|
logger.debug("Removed ```json prefix") |
|
if text.endswith("```"): |
|
text = text[:-3] |
|
logger.debug("Removed ``` suffix") |
|
|
|
try: |
|
response_data = json.loads(text.strip()) |
|
logger.info("Successfully parsed cleaned JSON response") |
|
|
|
|
|
if not isinstance(response_data, list): |
|
logger.debug("Cleaned response is not an array, converting to array") |
|
response_data = [response_data] |
|
|
|
logger.info(f"Cleaned response contains {len(response_data)} extracted card data objects") |
|
|
|
|
|
logger.debug("Adding metadata to cleaned extracted data") |
|
for i, data in enumerate(response_data): |
|
|
|
data['method'] = "Speed-Optimized model" if "flash" in model_name else "Accuracy-Optimized model" |
|
if i < len(filenames): |
|
data['filename'] = filenames[i] |
|
logger.debug(f"Added metadata to cleaned card {i+1}: {filenames[i]}") |
|
|
|
logger.info(f"Batch extraction completed successfully after cleaning for {len(response_data)} cards") |
|
return response_data |
|
except json.JSONDecodeError as e2: |
|
logger.error(f"Failed to parse even cleaned JSON response: {e2}") |
|
logger.error(f"Cleaned text: {text[:1000]}...") |
|
raise |
|
|
|
def extract_business_card_data(image, model_name="gemini-2.5-flash"): |
|
"""Extract data from single business card image - legacy function""" |
|
logger.debug(f"Single card extraction called with model: {model_name}") |
|
result = extract_business_card_data_batch([image], ["single_card"], model_name) |
|
if result: |
|
logger.debug("Single card extraction successful") |
|
return result[0] |
|
else: |
|
logger.warning("Single card extraction returned no results") |
|
return None |
|
|
|
def convert_image_for_processing(image, filename): |
|
"""Convert image to RGB JPEG format for better compatibility""" |
|
try: |
|
|
|
if image.mode != 'RGB': |
|
image = image.convert('RGB') |
|
|
|
|
|
buffer = io.BytesIO() |
|
image.save(buffer, format='JPEG', quality=95) |
|
buffer.seek(0) |
|
|
|
|
|
return Image.open(buffer) |
|
except Exception as e: |
|
logger.warning(f"Could not convert {filename}: {str(e)}. Using original image.") |
|
return image |
|
|
|
def process_business_cards(images, model_name="gemini-2.5-flash", save_images=True): |
|
"""Process multiple business card images and create both current run and cumulative Excel files""" |
|
|
|
logger.info(f"Starting business card processing session") |
|
logger.info(f"Number of images received: {len(images) if images else 0}") |
|
logger.info(f"Model selected: {model_name}") |
|
logger.info(f"Save images option: {save_images}") |
|
|
|
if not images: |
|
logger.warning("No images provided for processing") |
|
return None, None, "Please upload at least one business card image.", None |
|
|
|
all_data = [] |
|
errors = [] |
|
|
|
|
|
logger.info("Preparing images for batch processing") |
|
image_batches = [] |
|
filename_batches = [] |
|
batch_size = 5 |
|
logger.debug(f"Using batch size: {batch_size}") |
|
|
|
|
|
loaded_images = [] |
|
filenames = [] |
|
uploaded_image_links = [] |
|
|
|
logger.info(f"Loading {len(images)} images") |
|
for idx, image_path in enumerate(images): |
|
try: |
|
|
|
if isinstance(image_path, str): |
|
logger.debug(f"Loading image {idx+1}: {image_path}") |
|
image = Image.open(image_path) |
|
filename = os.path.basename(image_path) |
|
else: |
|
logger.debug(f"Using direct image object {idx+1}") |
|
image = image_path |
|
filename = f"image_{idx+1}.png" |
|
|
|
|
|
converted_image = convert_image_for_processing(image, filename) |
|
|
|
loaded_images.append(converted_image) |
|
filenames.append(filename) |
|
logger.debug(f"Successfully loaded image {idx+1}: {filename} (size: {converted_image.size})") |
|
|
|
except Exception as e: |
|
error_msg = f"Error loading {image_path}: {str(e)}" |
|
logger.error(error_msg) |
|
errors.append(error_msg) |
|
|
|
logger.info(f"Successfully loaded {len(loaded_images)} out of {len(images)} images") |
|
|
|
|
|
if save_images and loaded_images: |
|
logger.info(f"Saving {len(loaded_images)} images to Google Drive") |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
for i, (image, filename) in enumerate(zip(loaded_images, filenames)): |
|
try: |
|
|
|
name, ext = os.path.splitext(filename) |
|
if not ext: |
|
ext = '.png' |
|
unique_filename = f"{timestamp}_{i+1:03d}_{name}{ext}" |
|
|
|
|
|
img_buffer = io.BytesIO() |
|
image.save(img_buffer, format='PNG') |
|
img_bytes = img_buffer.getvalue() |
|
|
|
|
|
result = upload_bytes_to_google_drive(img_bytes, unique_filename, is_excel=False) |
|
|
|
if result: |
|
uploaded_image_links.append(result['webViewLink']) |
|
logger.debug(f"Saved image {i+1}: {unique_filename}") |
|
else: |
|
uploaded_image_links.append(None) |
|
logger.error(f"Failed to upload image {unique_filename}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to save image {filename}: {e}") |
|
uploaded_image_links.append(None) |
|
|
|
logger.info(f"Successfully uploaded {sum(1 for link in uploaded_image_links if link)} images to Google Drive") |
|
|
|
|
|
logger.info(f"Grouping {len(loaded_images)} images into batches of {batch_size}") |
|
for i in range(0, len(loaded_images), batch_size): |
|
batch_images = loaded_images[i:i + batch_size] |
|
batch_filenames = filenames[i:i + batch_size] |
|
image_batches.append(batch_images) |
|
filename_batches.append(batch_filenames) |
|
logger.debug(f"Created batch {len(image_batches)} with {len(batch_images)} images: {batch_filenames}") |
|
|
|
logger.info(f"Created {len(image_batches)} batches for processing") |
|
|
|
|
|
logger.info(f"Starting processing of {len(image_batches)} batches") |
|
for batch_idx, (batch_images, batch_filenames) in enumerate(zip(image_batches, filename_batches)): |
|
try: |
|
logger.info(f"Processing batch {batch_idx + 1}/{len(image_batches)} ({len(batch_images)} cards)") |
|
print(f"Processing batch {batch_idx + 1}/{len(image_batches)} ({len(batch_images)} cards)") |
|
|
|
|
|
logger.debug(f"Calling batch extraction for batch {batch_idx + 1}") |
|
batch_data = extract_business_card_data_batch(batch_images, batch_filenames, model_name) |
|
logger.info(f"Batch {batch_idx + 1} extraction completed, got {len(batch_data)} results") |
|
|
|
|
|
logger.debug(f"Processing individual card data for batch {batch_idx + 1}") |
|
for i, data in enumerate(batch_data): |
|
card_filename = batch_filenames[i] if i < len(batch_filenames) else f"card_{i+1}" |
|
logger.debug(f"Processing card data for: {card_filename}") |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
data['processed_date'] = timestamp |
|
logger.debug(f"Added timestamp {timestamp} to {card_filename}") |
|
|
|
|
|
global_index = batch_idx * batch_size + i |
|
if save_images and global_index < len(uploaded_image_links) and uploaded_image_links[global_index]: |
|
data['google_drive_image_link'] = uploaded_image_links[global_index] |
|
logger.debug(f"Added Google Drive image link for {card_filename}: {uploaded_image_links[global_index]}") |
|
else: |
|
data['google_drive_image_link'] = None |
|
|
|
|
|
list_fields_processed = [] |
|
for key, value in data.items(): |
|
if isinstance(value, list): |
|
original_count = len(value) |
|
data[key] = ', '.join(str(v) for v in value) |
|
list_fields_processed.append(f"{key}({original_count})") |
|
logger.debug(f"Combined {original_count} {key} values for {card_filename}") |
|
|
|
if list_fields_processed: |
|
logger.debug(f"List fields processed for {card_filename}: {list_fields_processed}") |
|
|
|
|
|
if 'mobile_phones' in data and data['mobile_phones']: |
|
logger.debug(f"Combining phone fields for {card_filename}") |
|
if data.get('phones'): |
|
|
|
existing_phones = str(data['phones']) if data['phones'] else "" |
|
mobile_phones = str(data['mobile_phones']) if data['mobile_phones'] else "" |
|
combined = [p for p in [existing_phones, mobile_phones] if p and p != 'null'] |
|
data['phones'] = ', '.join(combined) |
|
logger.debug(f"Combined phones for {card_filename}: {data['phones']}") |
|
else: |
|
data['phones'] = data['mobile_phones'] |
|
logger.debug(f"Used mobile phones as phones for {card_filename}: {data['phones']}") |
|
del data['mobile_phones'] |
|
|
|
|
|
if 'street' in data and data['street']: |
|
logger.debug(f"Combining address fields for {card_filename}") |
|
if data.get('address'): |
|
|
|
if str(data['street']) != str(data['address']) and data['street'] != 'null': |
|
original_address = data['address'] |
|
data['address'] = f"{data['street']}, {data['address']}" |
|
logger.debug(f"Combined address for {card_filename}: '{data['street']}' + '{original_address}' = '{data['address']}'") |
|
else: |
|
data['address'] = data['street'] |
|
logger.debug(f"Used street as address for {card_filename}: {data['address']}") |
|
del data['street'] |
|
|
|
all_data.append(data) |
|
logger.debug(f"Added processed data for {card_filename} to results (total: {len(all_data)})") |
|
|
|
logger.info(f"Completed processing batch {batch_idx + 1}, total cards processed so far: {len(all_data)}") |
|
|
|
except Exception as e: |
|
batch_filenames_str = ', '.join(batch_filenames) |
|
error_msg = f"Error processing batch {batch_idx + 1} ({batch_filenames_str}): {str(e)}" |
|
logger.error(error_msg) |
|
errors.append(error_msg) |
|
|
|
if not all_data: |
|
logger.warning("No data could be extracted from any images") |
|
error_summary = "No data could be extracted from the images.\n" + "\n".join(errors) |
|
return None, None, error_summary, None |
|
|
|
logger.info(f"Successfully extracted data from {len(all_data)} business cards") |
|
|
|
|
|
logger.info("Creating DataFrame for current run") |
|
current_df = pd.DataFrame(all_data) |
|
logger.debug(f"Current run DataFrame created with {len(current_df)} rows and {len(current_df.columns)} columns") |
|
logger.debug(f"Columns: {list(current_df.columns)}") |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
logger.debug(f"Generated timestamp: {timestamp}") |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as current_temp: |
|
current_temp_path = current_temp.name |
|
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as cumulative_temp: |
|
cumulative_temp_path = cumulative_temp.name |
|
|
|
current_filename = f"current_run_{timestamp}.xlsx" |
|
cumulative_filename = "all_business_cards_total.xlsx" |
|
|
|
|
|
logger.info("Checking for existing cumulative file in Google Drive") |
|
cumulative_df = current_df |
|
|
|
try: |
|
|
|
duplicates_removed = cleanup_duplicate_cumulative_files(drive_service) |
|
if duplicates_removed > 0: |
|
logger.info(f"Cleaned up {duplicates_removed} duplicate cumulative files") |
|
|
|
|
|
existing_file = get_existing_cumulative_file(drive_service) |
|
|
|
if existing_file: |
|
logger.info(f"Existing cumulative file found: {existing_file['name']} (ID: {existing_file['id']})") |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as existing_temp: |
|
existing_temp_path = existing_temp.name |
|
|
|
|
|
if download_file_from_drive(drive_service, existing_file['id'], existing_temp_path): |
|
logger.info("Successfully downloaded existing cumulative file") |
|
|
|
try: |
|
|
|
existing_df = pd.read_excel(existing_temp_path) |
|
logger.info(f"Loaded existing data: {len(existing_df)} rows") |
|
|
|
|
|
cumulative_df = pd.concat([existing_df, current_df], ignore_index=True) |
|
logger.info(f"Merged data: {len(existing_df)} existing + {len(current_df)} new = {len(cumulative_df)} total rows") |
|
|
|
|
|
delete_file_from_drive(drive_service, existing_file['id']) |
|
logger.info("Deleted old cumulative file from Google Drive") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to read existing Excel file: {e}") |
|
logger.info("Using current data only") |
|
cumulative_df = current_df |
|
finally: |
|
|
|
try: |
|
os.unlink(existing_temp_path) |
|
except: |
|
pass |
|
else: |
|
logger.warning("Failed to download existing cumulative file, using current data only") |
|
cumulative_df = current_df |
|
else: |
|
logger.info("No existing cumulative file found, using current data only") |
|
cumulative_df = current_df |
|
|
|
except Exception as e: |
|
logger.warning(f"Error handling existing cumulative data: {e}") |
|
logger.info("Using current data only") |
|
cumulative_df = current_df |
|
|
|
|
|
logger.info(f"Creating current run Excel file: {current_filename}") |
|
try: |
|
with pd.ExcelWriter(current_temp_path, engine='openpyxl') as writer: |
|
current_df.to_excel(writer, index=False, sheet_name='Current Run') |
|
logger.debug(f"Written {len(current_df)} rows to 'Current Run' sheet") |
|
|
|
|
|
logger.debug("Auto-adjusting column widths for current run file") |
|
worksheet = writer.sheets['Current Run'] |
|
for column in current_df: |
|
column_length = max(current_df[column].astype(str).map(len).max(), len(column)) |
|
col_idx = current_df.columns.get_loc(column) |
|
final_width = min(column_length + 2, 50) |
|
worksheet.column_dimensions[chr(65 + col_idx)].width = final_width |
|
|
|
logger.info(f"Current run Excel file created locally") |
|
|
|
|
|
current_result = upload_to_google_drive(current_temp_path, is_excel=True, filename=current_filename) |
|
if current_result: |
|
logger.info(f"Current run file uploaded to Google Drive: {current_result['webViewLink']}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to create current run Excel file: {e}") |
|
raise |
|
|
|
|
|
logger.info(f"Creating cumulative Excel file: {cumulative_filename}") |
|
try: |
|
with pd.ExcelWriter(cumulative_temp_path, engine='openpyxl') as writer: |
|
cumulative_df.to_excel(writer, index=False, sheet_name='All Business Cards') |
|
logger.debug(f"Written {len(cumulative_df)} rows to 'All Business Cards' sheet") |
|
|
|
|
|
logger.debug("Auto-adjusting column widths for cumulative file") |
|
worksheet = writer.sheets['All Business Cards'] |
|
for column in cumulative_df: |
|
column_length = max(cumulative_df[column].astype(str).map(len).max(), len(column)) |
|
col_idx = cumulative_df.columns.get_loc(column) |
|
final_width = min(column_length + 2, 50) |
|
worksheet.column_dimensions[chr(65 + col_idx)].width = final_width |
|
|
|
logger.info(f"Cumulative Excel file created locally") |
|
|
|
|
|
cumulative_result = upload_to_google_drive(cumulative_temp_path, is_excel=True, filename=cumulative_filename) |
|
if cumulative_result: |
|
logger.info(f"Cumulative file uploaded to Google Drive: {cumulative_result['webViewLink']}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to create cumulative Excel file: {e}") |
|
raise |
|
|
|
|
|
|
|
|
|
|
|
logger.info("Creating summary message") |
|
num_batches = len(image_batches) if 'image_batches' in locals() else 1 |
|
summary = f"Successfully processed {len(all_data)} business card(s) in {num_batches} batch(es) of up to 5 cards.\n" |
|
model_display = "Speed-Optimized model" if "flash" in model_name else "Accuracy-Optimized model" |
|
summary += f"π€ AI Model used: {model_display}\n" |
|
summary += f"β‘ API calls made: {num_batches} (instead of {len(all_data)})\n" |
|
|
|
if save_images: |
|
num_uploaded = sum(1 for link in uploaded_image_links if link) if 'uploaded_image_links' in locals() else 0 |
|
summary += f"πΎ Images uploaded to Google Drive: {num_uploaded} cards\n\n" |
|
else: |
|
summary += f"πΎ Images uploaded to Google Drive: No (save option was disabled)\n\n" |
|
|
|
summary += f"π Current run file: {current_filename} (uploaded to Google Drive)\n" |
|
summary += f"π Total cumulative file: {cumulative_filename} (uploaded to Google Drive)\n" |
|
summary += f"π Total cards in database: {len(cumulative_df)}\n" |
|
|
|
|
|
if 'duplicates_removed' in locals() and duplicates_removed > 0: |
|
summary += f"π§Ή Cleaned up {duplicates_removed} duplicate cumulative files\n" |
|
if 'old_runs_removed' in locals() and old_runs_removed > 0: |
|
summary += f"π§Ή Cleaned up {old_runs_removed} old current run files\n" |
|
summary += "\n" |
|
|
|
|
|
summary += "π Google Drive Links:\n" |
|
if 'current_result' in locals() and current_result: |
|
summary += f" π Current Run: {current_result['webViewLink']}\n" |
|
if 'cumulative_result' in locals() and cumulative_result: |
|
summary += f" π Total Database: {cumulative_result['webViewLink']}\n" |
|
summary += f" π Exports Folder: https://drive.google.com/drive/folders/1k5iP4egzLrGJwnHkMhxt9bAkaCiieojO\n" |
|
summary += f" πΌοΈ Images Folder: https://drive.google.com/drive/folders/1gd280IqcAzpAFTPeYsZjoBUOU9S7Zx3c\n\n" |
|
|
|
if errors: |
|
logger.warning(f"Encountered {len(errors)} errors during processing") |
|
summary += "Errors encountered:\n" + "\n".join(errors) |
|
for error in errors: |
|
logger.warning(f"Processing error: {error}") |
|
else: |
|
logger.info("No errors encountered during processing") |
|
|
|
|
|
logger.debug("Creating preview DataFrame") |
|
preview_df = current_df.head(10) |
|
logger.debug(f"Preview contains {len(preview_df)} rows") |
|
|
|
logger.info("Business card processing session completed successfully") |
|
logger.info(f"Session summary - Cards: {len(all_data)}, Batches: {num_batches}, API calls: {num_batches}, Total DB size: {len(cumulative_df)}") |
|
|
|
|
|
return current_temp_path, cumulative_temp_path, summary, preview_df |
|
|
|
|
|
logger.info("Creating Gradio interface") |
|
with gr.Blocks(title="Business Card Data Extractor") as demo: |
|
gr.Markdown( |
|
""" |
|
# Business Card Data Extractor |
|
|
|
Upload business card images to extract contact information and export to Excel. |
|
Cards are processed in batches of 5 for efficiency (fewer API calls, lower cost). |
|
|
|
**Two files are generated:** |
|
- π **Current Run**: Contains only the cards you just processed |
|
- π **Total Database**: Contains ALL cards ever processed (cumulative) |
|
|
|
**βοΈ Google Drive Storage:** |
|
- π Excel files: Automatically uploaded to Google Drive exports folder |
|
- πΌοΈ Images: Uploaded to Google Drive images folder (if save option enabled) |
|
- π **Direct Links**: Access files directly through provided Google Drive links |
|
- π **Organized Folders**: Separate folders for exports and images |
|
|
|
**π File Access:** |
|
- β¬οΈ Download directly from interface buttons (temporary copies) |
|
- π Access permanent files via Google Drive links in results |
|
- π **Exports Folder**: https://drive.google.com/drive/folders/1k5iP4egzLrGJwnHkMhxt9bAkaCiieojO |
|
- πΌοΈ **Images Folder**: https://drive.google.com/drive/folders/1gd280IqcAzpAFTPeYsZjoBUOU9S7Zx3c |
|
|
|
**βοΈ Google Drive Integration:** |
|
- Requires `GOOGLE_CLIENT_ID` and `GOOGLE_CLIENT_SECRET` environment variables |
|
- Files are automatically uploaded and organized in predefined folders |
|
""" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
|
|
supported_types = [".jpg", ".jpeg", ".png", ".webp", ".bmp"] |
|
if HEIF_SUPPORTED: |
|
supported_types.extend([".heif", ".heic"]) |
|
|
|
image_input = gr.File( |
|
label="Upload Business Cards", |
|
file_count="multiple", |
|
file_types=supported_types |
|
) |
|
|
|
model_selector = gr.Dropdown( |
|
choices=[("Accuracy-Optimized model", "gemini-2.5-pro"), ("Speed-Optimized model", "gemini-2.5-flash")], |
|
value="gemini-2.5-pro", |
|
label="AI Model Selection" |
|
) |
|
|
|
save_images_checkbox = gr.Checkbox( |
|
value=True, |
|
label="Save Business Card Images" |
|
) |
|
|
|
process_btn = gr.Button("Process Business Cards", variant="primary") |
|
|
|
with gr.Column(): |
|
current_file = gr.File(label="π Download Current Run") |
|
total_file = gr.File(label="π Download Total Database") |
|
status_output = gr.Textbox(label="Processing Status", lines=5) |
|
|
|
preview_output = gr.Dataframe(label="Data Preview (Current Run)") |
|
|
|
|
|
def process_with_logging(images, model_name, save_images): |
|
"""Wrapper function to add error handling and logging to the main process""" |
|
try: |
|
logger.info(f"Gradio interface initiated processing request") |
|
logger.debug(f"Request parameters - Images: {len(images) if images else 0}, Model: {model_name}, Save Images: {save_images}") |
|
return process_business_cards(images, model_name, save_images) |
|
except Exception as e: |
|
logger.error(f"Unexpected error in Gradio processing: {e}") |
|
error_msg = f"An unexpected error occurred: {str(e)}\nPlease check the logs for more details." |
|
return None, None, error_msg, None |
|
|
|
|
|
process_btn.click( |
|
fn=process_with_logging, |
|
inputs=[image_input, model_selector, save_images_checkbox], |
|
outputs=[current_file, total_file, status_output, preview_output] |
|
) |
|
|
|
gr.Markdown( |
|
""" |
|
## Features: |
|
- π€ **Model Selection**: Choose between Speed-Optimized model (fast) or Accuracy-Optimized model (accurate) |
|
- β‘ **Batch Processing**: Processes 5 cards per API call for efficiency |
|
- π **Data Extraction**: Names, emails, phone numbers, addresses, and more |
|
- π **Smart Combination**: Multiple emails/phones combined with commas |
|
- π **Address Merging**: All phone types and address fields combined |
|
- βοΈ **Google Drive Storage**: Automatic upload to organized Drive folders |
|
- π **Direct Links**: Instant access to files via Google Drive URLs |
|
- π **Dual Output**: Current run + cumulative database files |
|
- π **Full Tracking**: Processing date, filename, Google Drive links, and AI model used |
|
- π― **One Row Per Card**: Each business card becomes one spreadsheet row |
|
""" |
|
) |
|
|
|
|
|
logger.info("Starting Gradio demo") |
|
|
|
|
|
hf_space_password = os.getenv("SPACE_PASSWORD") |
|
|
|
if hf_space_password: |
|
|
|
logger.info("Launching with password protection enabled") |
|
demo.launch(auth=("user", hf_space_password)) |
|
else: |
|
|
|
logger.warning("SPACE_PASSWORD not set - launching without password protection") |
|
demo.launch() |