Spaces:
Runtime error
Runtime error
import os | |
import re | |
import sqlite3 | |
import time | |
import hashlib | |
import base64 | |
import json | |
from datetime import datetime, timedelta | |
from io import BytesIO | |
import pandas as pd | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import gradio as gr | |
from dateutil.relativedelta import relativedelta | |
# Image processing imports | |
try: | |
from PIL import Image, ImageEnhance, ImageFilter | |
import cv2 | |
import numpy as np | |
PIL_AVAILABLE = True | |
except ImportError: | |
PIL_AVAILABLE = False | |
print("β οΈ PIL/OpenCV not installed. Run: pip install Pillow opencv-python") | |
# OCR imports | |
try: | |
import pytesseract | |
TESSERACT_AVAILABLE = True | |
except ImportError: | |
TESSERACT_AVAILABLE = False | |
print("β οΈ Pytesseract not installed. Run: pip install pytesseract") | |
# Google Vision API (optional) | |
try: | |
from google.cloud import vision | |
VISION_API_AVAILABLE = True | |
except ImportError: | |
VISION_API_AVAILABLE = False | |
print("β οΈ Google Vision API not available. Install with: pip install google-cloud-vision") | |
# Twilio Integration | |
try: | |
from twilio.rest import Client | |
TWILIO_AVAILABLE = True | |
except ImportError: | |
TWILIO_AVAILABLE = False | |
print("β οΈ Twilio not installed. Run: pip install twilio") | |
# Constants | |
EXPENSE_CATEGORIES = [ | |
"Housing (Rent/Mortgage)", | |
"Utilities (Electricity/Water)", | |
"Groceries", | |
"Dining Out", | |
"Transportation", | |
"Healthcare", | |
"Entertainment", | |
"Education", | |
"Personal Care", | |
"Debt Payments", | |
"Savings", | |
"Investments", | |
"Charity", | |
"Miscellaneous" | |
] | |
INVESTMENT_TYPES = [ | |
"Stocks", | |
"Bonds", | |
"Mutual Funds", | |
"Real Estate", | |
"Cryptocurrency", | |
"Retirement Accounts", | |
"Other" | |
] | |
RECURRENCE_PATTERNS = [ | |
"Daily", | |
"Weekly", | |
"Monthly", | |
"Quarterly", | |
"Yearly" | |
] | |
# Rate limiting setup | |
MAX_ATTEMPTS = 5 | |
ATTEMPT_WINDOW = 300 # 5 minutes in seconds | |
# Receipt processing constants | |
RECEIPTS_DIR = "receipts" | |
if not os.path.exists(RECEIPTS_DIR): | |
os.makedirs(RECEIPTS_DIR) | |
# Security functions | |
def hash_password(password): | |
"""Hash password using SHA-256 with salt""" | |
salt = "fingenius_secure_salt_2024" | |
return hashlib.sha256((password + salt).encode()).hexdigest() | |
def verify_password(password, hashed): | |
"""Verify password against hash""" | |
return hash_password(password) == hashed | |
# ========== A) IMAGE PROCESSING FUNCTIONS ========== | |
class ImageProcessor: | |
"""Handles image preprocessing for better OCR results""" | |
def process_receipt_image(image_file, phone): | |
""" | |
Complete receipt processing pipeline that handles Gradio file objects | |
Returns: (success, status_message, extracted_data, image_preview) | |
""" | |
try: | |
if not phone: | |
return False, "β Please sign in first", {}, None | |
if not image_file: | |
return False, "β No image uploaded", {}, None | |
# Debug input type | |
print(f"\nπ Input type: {type(image_file)}") | |
# Handle different input types | |
if isinstance(image_file, str): | |
# Case 1: Direct file path (local testing) | |
image_path = image_file | |
elif hasattr(image_file, 'name'): | |
# Case 2: Gradio NamedString object (Hugging Face Spaces) | |
image_path = image_file.name | |
else: | |
return False, "β Unsupported file input type", {}, None | |
# Create receipts directory if needed | |
os.makedirs(RECEIPTS_DIR, exist_ok=True) | |
# Generate unique filename | |
timestamp = int(time.time()) | |
filename = f"receipt_{phone}_{timestamp}{os.path.splitext(image_path)[1]}" | |
save_path = os.path.join(RECEIPTS_DIR, filename) | |
# Copy the uploaded file (works for both Gradio and direct paths) | |
with open(image_path, 'rb') as src, open(save_path, 'wb') as dst: | |
dst.write(src.read()) | |
print(f"π Saved receipt to: {save_path}") | |
# Preprocess image | |
processed_path, preprocessing_info = ImageProcessor.preprocess_receipt_image(save_path) | |
print(f"πΌοΈ Preprocessing: {preprocessing_info}") | |
# Extract text using OCR | |
raw_text, confidence, extracted_data = ocr_service.extract_text_from_receipt(processed_path) | |
print(f"π OCR Confidence: {confidence:.1%}") | |
# Auto-categorize | |
if extracted_data.get('merchant'): | |
suggested_category = db.auto_categorize_receipt( | |
phone, | |
extracted_data['merchant'], | |
extracted_data.get('total_amount', 0) | |
) | |
extracted_data['suggested_category'] = suggested_category | |
print(f"π·οΈ Suggested category: {suggested_category}") | |
# Prepare receipt data for database | |
receipt_data = { | |
'image_path': save_path, | |
'processed_image_path': processed_path, | |
'merchant': extracted_data.get('merchant', ''), | |
'amount': extracted_data.get('total_amount', 0.0), | |
'date': extracted_data.get('date', ''), | |
'category': extracted_data.get('suggested_category', 'Miscellaneous'), | |
'confidence': confidence, | |
'raw_text': raw_text, | |
'extracted_data': extracted_data, | |
'is_validated': False | |
} | |
# Save to database | |
receipt_id = db.save_receipt(phone, receipt_data) | |
extracted_data['receipt_id'] = receipt_id | |
print(f"πΎ Saved to DB with ID: {receipt_id}") | |
# Create image preview | |
try: | |
image_preview = Image.open(save_path) | |
image_preview.thumbnail((400, 600)) # Resize for display | |
except Exception as e: | |
print(f"β οΈ Preview generation failed: {e}") | |
image_preview = None | |
status_msg = f"β Receipt processed! Confidence: {confidence:.1%}" | |
if confidence < 0.7: | |
status_msg += " β οΈ Low confidence - please verify" | |
return True, status_msg, extracted_data, image_preview | |
except Exception as e: | |
print(f"β Processing error: {traceback.format_exc()}") | |
return False, f"β Processing failed: {str(e)}", {}, None | |
def extract_text_regions(image_path): | |
"""Extract text regions from receipt image""" | |
try: | |
if not PIL_AVAILABLE: | |
return [] | |
image = Image.open(image_path) | |
# This is a simplified version - in production, you'd use more advanced techniques | |
# to detect and extract specific regions (header, items, total, etc.) | |
return ["Full image processed"] | |
except Exception as e: | |
print(f"Text region extraction error: {e}") | |
return [] | |
# ========== B) OCR SERVICE CLASS ========== | |
class OCRService: | |
"""Handles OCR processing with multiple backends""" | |
def __init__(self): | |
self.tesseract_available = TESSERACT_AVAILABLE | |
self.vision_api_available = VISION_API_AVAILABLE and os.getenv('GOOGLE_APPLICATION_CREDENTIALS') | |
# Initialize Google Vision client if available | |
if self.vision_api_available: | |
try: | |
self.vision_client = vision.ImageAnnotatorClient() | |
except Exception as e: | |
print(f"Google Vision API initialization failed: {e}") | |
self.vision_api_available = False | |
def extract_text_from_receipt(self, image_path): | |
""" | |
Extract text from receipt using available OCR service | |
Returns: (raw_text, confidence_score, extracted_data) | |
""" | |
try: | |
# Try Google Vision API first if available | |
if self.vision_api_available: | |
return self._extract_with_vision_api(image_path) | |
# Fallback to Tesseract | |
elif self.tesseract_available: | |
return self._extract_with_tesseract(image_path) | |
else: | |
return "OCR not available", 0.0, self._create_empty_data() | |
except Exception as e: | |
print(f"OCR extraction error: {e}") | |
return f"OCR failed: {str(e)}", 0.0, self._create_empty_data() | |
def _extract_with_vision_api(self, image_path): | |
"""Extract text using Google Vision API""" | |
try: | |
with open(image_path, 'rb') as image_file: | |
content = image_file.read() | |
image = vision.Image(content=content) | |
response = self.vision_client.text_detection(image=image) | |
texts = response.text_annotations | |
if texts: | |
raw_text = texts[0].description | |
confidence = min([vertex.confidence for vertex in texts if hasattr(vertex, 'confidence')] or [0.8]) | |
extracted_data = self._parse_receipt_text(raw_text) | |
return raw_text, confidence, extracted_data | |
else: | |
return "No text detected", 0.0, self._create_empty_data() | |
except Exception as e: | |
print(f"Vision API error: {e}") | |
return f"Vision API failed: {str(e)}", 0.0, self._create_empty_data() | |
def _extract_with_tesseract(self, image_path): | |
"""Extract text using Tesseract OCR""" | |
try: | |
# Preprocess image first | |
processed_path, _ = ImageProcessor.preprocess_receipt_image(image_path) | |
# Extract text with Tesseract | |
raw_text = pytesseract.image_to_string( | |
Image.open(processed_path), | |
config='--oem 3 --psm 6' # OCR Engine Mode 3, Page Segmentation Mode 6 | |
) | |
# Get confidence data | |
data = pytesseract.image_to_data(Image.open(processed_path), output_type=pytesseract.Output.DICT) | |
confidences = [int(conf) for conf in data['conf'] if int(conf) > 0] | |
avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 | |
extracted_data = self._parse_receipt_text(raw_text) | |
return raw_text, avg_confidence / 100.0, extracted_data | |
except Exception as e: | |
print(f"Tesseract error: {e}") | |
return f"Tesseract failed: {str(e)}", 0.0, self._create_empty_data() | |
def _parse_receipt_text(self, raw_text): | |
"""Parse raw OCR text to extract structured data""" | |
extracted_data = self._create_empty_data() | |
lines = raw_text.split('\n') | |
# Extract merchant name (usually first non-empty line) | |
for line in lines: | |
if line.strip() and len(line.strip()) > 2: | |
extracted_data['merchant'] = line.strip() | |
break | |
# Extract date using regex patterns | |
date_patterns = [ | |
r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}', | |
r'\d{4}[/-]\d{1,2}[/-]\d{1,2}', | |
r'\d{1,2}\s+\w+\s+\d{4}' | |
] | |
for line in lines: | |
for pattern in date_patterns: | |
match = re.search(pattern, line) | |
if match: | |
extracted_data['date'] = match.group() | |
break | |
if extracted_data['date']: | |
break | |
# Extract total amount | |
amount_patterns = [ | |
r'total[:\s]*\$?(\d+\.?\d*)', | |
r'amount[:\s]*\$?(\d+\.?\d*)', | |
r'sum[:\s]*\$?(\d+\.?\d*)', | |
r'\$(\d+\.?\d*)' | |
] | |
for line in lines: | |
line_lower = line.lower() | |
for pattern in amount_patterns: | |
match = re.search(pattern, line_lower) | |
if match: | |
try: | |
amount = float(match.group(1)) | |
if amount > 0: | |
extracted_data['total_amount'] = amount | |
break | |
except ValueError: | |
continue | |
if extracted_data['total_amount']: | |
break | |
# Extract line items (simplified approach) | |
line_items = [] | |
for line in lines: | |
# Look for lines with item and price pattern | |
item_match = re.search(r'(.+?)\s+(\d+\.?\d*)', line) | |
if item_match and len(item_match.group(1)) > 2: | |
try: | |
item_name = item_match.group(1).strip() | |
item_price = float(item_match.group(2)) | |
if item_price > 0: | |
line_items.append([item_name, item_price]) | |
except ValueError: | |
continue | |
extracted_data['line_items'] = line_items[:10] # Limit to 10 items | |
return extracted_data | |
def _create_empty_data(self): | |
"""Create empty extracted data structure""" | |
return { | |
'merchant': '', | |
'date': '', | |
'total_amount': 0.0, | |
'line_items': [] | |
} | |
# ========== C) ENHANCED DATABASE SERVICE ========== | |
class DatabaseService: | |
def __init__(self, db_name='fin_genius.db'): | |
self.conn = sqlite3.connect(db_name, check_same_thread=False) | |
self.cursor = self.conn.cursor() | |
self._initialize_db() | |
def _initialize_db(self): | |
# Existing tables... | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS users | |
(phone TEXT PRIMARY KEY, | |
name TEXT, | |
password_hash TEXT, | |
monthly_income INTEGER DEFAULT 0, | |
savings_goal INTEGER DEFAULT 0, | |
current_balance INTEGER DEFAULT 0, | |
is_verified BOOLEAN DEFAULT FALSE, | |
family_group TEXT DEFAULT NULL, | |
last_balance_alert TIMESTAMP DEFAULT NULL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS expenses | |
(id INTEGER PRIMARY KEY AUTOINCREMENT, | |
phone TEXT, | |
category TEXT, | |
allocated INTEGER DEFAULT 0, | |
spent INTEGER DEFAULT 0, | |
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
is_recurring BOOLEAN DEFAULT FALSE, | |
recurrence_pattern TEXT DEFAULT NULL, | |
next_occurrence TIMESTAMP DEFAULT NULL, | |
FOREIGN KEY(phone) REFERENCES users(phone))''') | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS spending_log | |
(id INTEGER PRIMARY KEY AUTOINCREMENT, | |
phone TEXT, | |
category TEXT, | |
amount INTEGER, | |
description TEXT, | |
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
balance_after INTEGER, | |
receipt_id TEXT DEFAULT NULL, | |
FOREIGN KEY(phone) REFERENCES users(phone))''') | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS investments | |
(id INTEGER PRIMARY KEY AUTOINCREMENT, | |
phone TEXT, | |
type TEXT, | |
name TEXT, | |
amount INTEGER, | |
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
notes TEXT, | |
FOREIGN KEY(phone) REFERENCES users(phone))''') | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS auth_attempts | |
(phone TEXT PRIMARY KEY, | |
attempts INTEGER DEFAULT 1, | |
last_attempt TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS family_groups | |
(group_id TEXT PRIMARY KEY, | |
name TEXT, | |
admin_phone TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS alerts | |
(id INTEGER PRIMARY KEY AUTOINCREMENT, | |
phone TEXT, | |
alert_type TEXT, | |
message TEXT, | |
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY(phone) REFERENCES users(phone))''') | |
# NEW: Receipts table | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS receipts | |
(receipt_id TEXT PRIMARY KEY, | |
user_phone TEXT, | |
image_path TEXT, | |
processed_image_path TEXT, | |
merchant TEXT, | |
amount REAL, | |
receipt_date TEXT, | |
category TEXT, | |
ocr_confidence REAL, | |
raw_text TEXT, | |
extracted_data TEXT, | |
is_validated BOOLEAN DEFAULT FALSE, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY(user_phone) REFERENCES users(phone))''') | |
self.conn.commit() | |
# Existing methods remain the same... | |
def get_user(self, phone): | |
self.cursor.execute('''SELECT name, monthly_income, savings_goal, family_group, current_balance | |
FROM users WHERE phone=?''', (phone,)) | |
return self.cursor.fetchone() | |
def authenticate_user(self, phone, password): | |
self.cursor.execute('''SELECT name, password_hash FROM users WHERE phone=?''', (phone,)) | |
result = self.cursor.fetchone() | |
if result and verify_password(password, result[1]): | |
return result[0] | |
return None | |
def create_user(self, phone, name, password): | |
try: | |
password_hash = hash_password(password) | |
self.cursor.execute('''INSERT INTO users (phone, name, password_hash, current_balance) VALUES (?, ?, ?, ?)''', | |
(phone, name, password_hash, 0)) | |
self.conn.commit() | |
return True | |
except sqlite3.IntegrityError: | |
return False | |
def update_user_balance(self, phone, new_balance): | |
self.cursor.execute('''UPDATE users SET current_balance=? WHERE phone=?''', | |
(new_balance, phone)) | |
self.conn.commit() | |
def get_current_balance(self, phone): | |
self.cursor.execute('''SELECT current_balance FROM users WHERE phone=?''', (phone,)) | |
result = self.cursor.fetchone() | |
return result[0] if result else 0 | |
def add_income(self, phone, amount, description="Income added"): | |
current_balance = self.get_current_balance(phone) | |
new_balance = current_balance + amount | |
self.cursor.execute('''INSERT INTO spending_log | |
(phone, category, amount, description, balance_after) | |
VALUES (?, ?, ?, ?, ?)''', | |
(phone, "Income", -amount, description, new_balance)) | |
self.update_user_balance(phone, new_balance) | |
self.conn.commit() | |
return new_balance | |
def update_financials(self, phone, income, savings): | |
self.cursor.execute('''UPDATE users | |
SET monthly_income=?, savings_goal=? | |
WHERE phone=?''', | |
(income, savings, phone)) | |
self.conn.commit() | |
def get_expenses(self, phone, months_back=3): | |
end_date = datetime.now() | |
start_date = end_date - relativedelta(months=months_back) | |
self.cursor.execute('''SELECT category, allocated, spent, date(date) as exp_date, is_recurring | |
FROM expenses | |
WHERE phone=? AND date BETWEEN ? AND ? | |
ORDER BY allocated DESC''', | |
(phone, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'))) | |
return self.cursor.fetchall() | |
def update_expense_allocations(self, phone, allocations): | |
self.cursor.execute('''DELETE FROM expenses WHERE phone=? AND allocated > 0 AND is_recurring=FALSE''', (phone,)) | |
for category, alloc in zip(EXPENSE_CATEGORIES, allocations): | |
if alloc > 0: | |
self.cursor.execute('''INSERT INTO expenses | |
(phone, category, allocated) | |
VALUES (?, ?, ?)''', | |
(phone, category, alloc)) | |
self.conn.commit() | |
def log_spending(self, phone, category, amount, description="", receipt_id=None): | |
current_balance = self.get_current_balance(phone) | |
new_balance = current_balance - amount | |
self.cursor.execute('''INSERT INTO spending_log | |
(phone, category, amount, description, balance_after, receipt_id) | |
VALUES (?, ?, ?, ?, ?, ?)''', | |
(phone, category, amount, description, new_balance, receipt_id)) | |
self.update_user_balance(phone, new_balance) | |
self.conn.commit() | |
return new_balance | |
def record_expense(self, phone, category, amount, description="", is_recurring=False, recurrence_pattern=None, receipt_id=None): | |
new_balance = self.log_spending(phone, category, amount, description, receipt_id) | |
self.cursor.execute('''SELECT allocated, spent | |
FROM expenses | |
WHERE phone=? AND category=? AND is_recurring=FALSE''', | |
(phone, category)) | |
result = self.cursor.fetchone() | |
if is_recurring: | |
next_occurrence = self._calculate_next_occurrence(datetime.now(), recurrence_pattern) | |
self.cursor.execute('''INSERT INTO expenses | |
(phone, category, spent, is_recurring, recurrence_pattern, next_occurrence) | |
VALUES (?, ?, ?, ?, ?, ?)''', | |
(phone, category, amount, True, recurrence_pattern, next_occurrence)) | |
elif result: | |
alloc, spent = result | |
new_spent = spent + amount | |
self.cursor.execute('''UPDATE expenses | |
SET spent=? | |
WHERE phone=? AND category=? AND is_recurring=FALSE''', | |
(new_spent, phone, category)) | |
else: | |
self.cursor.execute('''INSERT INTO expenses | |
(phone, category, spent) | |
VALUES (?, ?, ?)''', | |
(phone, category, amount)) | |
self.conn.commit() | |
return True, new_balance | |
def _calculate_next_occurrence(self, current_date, pattern): | |
if pattern == "Daily": | |
return current_date + timedelta(days=1) | |
elif pattern == "Weekly": | |
return current_date + timedelta(weeks=1) | |
elif pattern == "Monthly": | |
return current_date + relativedelta(months=1) | |
elif pattern == "Quarterly": | |
return current_date + relativedelta(months=3) | |
elif pattern == "Yearly": | |
return current_date + relativedelta(years=1) | |
return current_date | |
def record_investment(self, phone, inv_type, name, amount, notes): | |
self.cursor.execute('''INSERT INTO investments | |
(phone, type, name, amount, notes) | |
VALUES (?, ?, ?, ?, ?)''', | |
(phone, inv_type, name, amount, notes)) | |
self.conn.commit() | |
return True | |
def get_investments(self, phone): | |
self.cursor.execute('''SELECT type, name, amount, date(date) as inv_date, notes | |
FROM investments | |
WHERE phone=? | |
ORDER BY date DESC''', (phone,)) | |
return self.cursor.fetchall() | |
def get_spending_log(self, phone, limit=50): | |
self.cursor.execute('''SELECT category, amount, description, date, balance_after | |
FROM spending_log | |
WHERE phone=? | |
ORDER BY date DESC | |
LIMIT ?''', (phone, limit)) | |
return self.cursor.fetchall() | |
def create_family_group(self, group_name, admin_phone): | |
group_id = f"FG-{admin_phone[-4:]}-{int(time.time())}" | |
try: | |
self.cursor.execute('''INSERT INTO family_groups | |
(group_id, name, admin_phone) | |
VALUES (?, ?, ?)''', | |
(group_id, group_name, admin_phone)) | |
self.cursor.execute('''UPDATE users | |
SET family_group=? | |
WHERE phone=?''', | |
(group_id, admin_phone)) | |
self.conn.commit() | |
return group_id | |
except sqlite3.IntegrityError: | |
return None | |
def join_family_group(self, phone, group_id): | |
self.cursor.execute('''UPDATE users | |
SET family_group=? | |
WHERE phone=?''', | |
(group_id, phone)) | |
self.conn.commit() | |
return True | |
def get_family_group(self, group_id): | |
self.cursor.execute('''SELECT name, admin_phone FROM family_groups WHERE group_id=?''', (group_id,)) | |
return self.cursor.fetchone() | |
def get_family_members(self, group_id): | |
self.cursor.execute('''SELECT phone, name FROM users WHERE family_group=?''', (group_id,)) | |
return self.cursor.fetchall() | |
# NEW: Receipt-related methods | |
def save_receipt(self, phone, receipt_data): | |
"""Save receipt data to database""" | |
receipt_id = f"REC-{phone[-4:]}-{int(time.time())}" | |
try: | |
self.cursor.execute('''INSERT INTO receipts | |
(receipt_id, user_phone, image_path, processed_image_path, | |
merchant, amount, receipt_date, category, ocr_confidence, | |
raw_text, extracted_data, is_validated) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', | |
(receipt_id, phone, receipt_data.get('image_path', ''), | |
receipt_data.get('processed_image_path', ''), | |
receipt_data.get('merchant', ''), | |
receipt_data.get('amount', 0.0), | |
receipt_data.get('date', ''), | |
receipt_data.get('category', ''), | |
receipt_data.get('confidence', 0.0), | |
receipt_data.get('raw_text', ''), | |
json.dumps(receipt_data.get('extracted_data', {})), | |
receipt_data.get('is_validated', False))) | |
self.conn.commit() | |
return receipt_id | |
except sqlite3.Error as e: | |
print(f"Database error saving receipt: {e}") | |
return None | |
def get_receipts(self, phone, limit=20): | |
"""Get user's receipts""" | |
self.cursor.execute('''SELECT receipt_id, merchant, amount, receipt_date, category, | |
ocr_confidence, is_validated, created_at | |
FROM receipts | |
WHERE user_phone=? | |
ORDER BY created_at DESC | |
LIMIT ?''', (phone, limit)) | |
return self.cursor.fetchall() | |
def update_receipt(self, receipt_id, updates): | |
"""Update receipt information""" | |
set_clause = ", ".join([f"{key}=?" for key in updates.keys()]) | |
values = list(updates.values()) + [receipt_id] | |
self.cursor.execute(f'''UPDATE receipts SET {set_clause} WHERE receipt_id=?''', values) | |
self.conn.commit() | |
def auto_categorize_receipt(self, phone, merchant, amount): | |
"""Auto-categorize based on user's spending patterns""" | |
# Get user's most common category for similar merchants | |
self.cursor.execute('''SELECT category, COUNT(*) as count | |
FROM spending_log | |
WHERE phone=? AND (description LIKE ? OR description LIKE ?) | |
GROUP BY category | |
ORDER BY count DESC | |
LIMIT 1''', | |
(phone, f'%{merchant}%', f'%{merchant.split()[0]}%')) | |
result = self.cursor.fetchone() | |
if result: | |
return result[0] | |
# Fallback categorization based on merchant keywords | |
merchant_lower = merchant.lower() | |
if any(word in merchant_lower for word in ['grocery', 'market', 'food', 'super']): | |
return "Groceries" | |
elif any(word in merchant_lower for word in ['restaurant', 'cafe', 'pizza', 'burger']): | |
return "Dining Out" | |
elif any(word in merchant_lower for word in ['gas', 'fuel', 'shell', 'bp']): | |
return "Transportation" | |
elif any(word in merchant_lower for word in ['pharmacy', 'medical', 'hospital']): | |
return "Healthcare" | |
else: | |
return "Miscellaneous" | |
# Fixed Twilio WhatsApp Service | |
class TwilioWhatsAppService: | |
def __init__(self): | |
# Set your Twilio credentials here | |
self.account_sid = os.getenv('TWILIO_ACCOUNT_SID', 'your_account_sid_here') | |
self.auth_token = os.getenv('TWILIO_AUTH_TOKEN', 'your_auth_token_here') | |
# For Twilio Sandbox - use this number | |
self.whatsapp_number = 'whatsapp:+14155238886' | |
# For production Twilio (after approval) - use your approved number | |
# self.whatsapp_number = 'whatsapp:+1234567890' # Your approved WhatsApp Business number | |
print(f"π§ Twilio Configuration:") | |
print(f" Account SID: {self.account_sid[:10]}... (masked)") | |
print(f" Auth Token: {'*' * len(self.auth_token) if self.auth_token != 'your_auth_token_here' else 'Not set'}") | |
print(f" WhatsApp Number: {self.whatsapp_number}") | |
if self.account_sid != 'your_account_sid_here' and self.auth_token != 'your_auth_token_here' and TWILIO_AVAILABLE: | |
try: | |
self.client = Client(self.account_sid, self.auth_token) | |
# Test the connection by getting account info | |
account = self.client.api.accounts(self.account_sid).fetch() | |
print(f"β Twilio WhatsApp Service initialized successfully") | |
print(f" Account Status: {account.status}") | |
print(f" Account Name: {account.friendly_name}") | |
self.enabled = True | |
except Exception as e: | |
print(f"β Failed to initialize Twilio: {e}") | |
print(f" Please check your Account SID and Auth Token") | |
self.client = None | |
self.enabled = False | |
else: | |
print("β Twilio credentials not configured or Twilio not installed") | |
print(" Please set TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN environment variables") | |
print(" Or modify the credentials directly in the code") | |
self.client = None | |
self.enabled = False | |
def send_whatsapp(self, phone, message): | |
"""Send WhatsApp message with proper error handling""" | |
if not self.enabled or not self.client: | |
print(f"π± [DEMO MODE] WhatsApp to {phone}: {message}") | |
return False | |
try: | |
# Ensure phone number has correct format | |
if not phone.startswith('+'): | |
phone = '+' + phone | |
to_whatsapp = f"whatsapp:{phone}" | |
print(f"π€ Attempting to send WhatsApp message:") | |
print(f" From: {self.whatsapp_number}") | |
print(f" To: {to_whatsapp}") | |
print(f" Message: {message[:50]}...") | |
twilio_message = self.client.messages.create( | |
body=message, | |
from_=self.whatsapp_number, | |
to=to_whatsapp | |
) | |
print(f"β WhatsApp sent successfully!") | |
print(f" Message SID: {twilio_message.sid}") | |
print(f" Status: {twilio_message.status}") | |
return True | |
except Exception as e: | |
print(f"β Failed to send WhatsApp to {phone}") | |
print(f" Error: {str(e)}") | |
print(f" Error Type: {type(e).__name__}") | |
# Common error messages and solutions | |
if "not a valid phone number" in str(e).lower(): | |
print(f" π‘ Solution: Check phone number format. Should be +country_code + number") | |
elif "unverified" in str(e).lower(): | |
print(f" π‘ Solution: For Twilio Sandbox, recipient must send 'join catch-manner' to +14155238886 first") | |
elif "forbidden" in str(e).lower(): | |
print(f" π‘ Solution: Check your Twilio credentials and account status") | |
elif "unauthorized" in str(e).lower(): | |
print(f" π‘ Solution: Verify your Account SID and Auth Token are correct") | |
print(f"π± [FALLBACK] Message content: {message}") | |
return False | |
# Initialize services | |
db = DatabaseService() | |
twilio = TwilioWhatsAppService() | |
ocr_service = OCRService() | |
# Helper functions (unchanged) | |
def validate_phone_number(phone): | |
pattern = r'^\+\d{1,3}\d{6,14}$' # Added missing closing quote and $ | |
return re.match(pattern, phone) is not None | |
def validate_password(password): | |
if len(password) < 6: | |
return False, "Password must be at least 6 characters long" | |
if not re.search(r'[A-Za-z]', password): | |
return False, "Password must contain at least one letter" | |
if not re.search(r'\d', password): | |
return False, "Password must contain at least one number" | |
return True, "Password is valid" | |
def format_currency(amount): | |
return f"{int(amount):,} PKR" if amount else "0 PKR" | |
def generate_spending_chart(phone, months=3): | |
expenses = db.get_expenses(phone, months) | |
if not expenses: | |
return None | |
df = pd.DataFrame(expenses, columns=['Category', 'Allocated', 'Spent', 'Date', 'IsRecurring']) | |
df['Date'] = pd.to_datetime(df['Date']) | |
df['Month'] = df['Date'].dt.strftime('%Y-%m') | |
monthly_data = df.groupby(['Month', 'Category'])['Spent'].sum().unstack().fillna(0) | |
fig = go.Figure() | |
colors = px.colors.qualitative.Set3 | |
for i, category in enumerate(monthly_data.columns): | |
fig.add_trace(go.Bar( | |
x=monthly_data.index, | |
y=monthly_data[category], | |
name=category, | |
marker_color=colors[i % len(colors)], | |
hoverinfo='y+name', | |
textposition='auto' | |
)) | |
fig.update_layout( | |
barmode='stack', | |
title=f'π Spending Trends (Last {months} Months)', | |
xaxis_title='Month', | |
yaxis_title='Amount (PKR)', | |
height=500, | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)' | |
) | |
return fig | |
def generate_balance_chart(phone): | |
spending_log = db.get_spending_log(phone, 100) | |
if not spending_log: | |
return None | |
df = pd.DataFrame(spending_log, columns=['Category', 'Amount', 'Description', 'Date', 'Balance']) | |
df['Date'] = pd.to_datetime(df['Date']) | |
df = df.sort_values('Date') | |
fig = go.Figure() | |
fig.add_trace(go.Scatter( | |
x=df['Date'], | |
y=df['Balance'], | |
mode='lines+markers', | |
name='Balance', | |
line=dict(color='#00CC96', width=3), | |
marker=dict(size=6), | |
hovertemplate='<b>Date:</b> %{x}<br><b>Balance:</b> %{y:,} PKR<extra></extra>' | |
)) | |
fig.update_layout( | |
title='π° Balance Trend Over Time', | |
xaxis_title='Date', | |
yaxis_title='Balance (PKR)', | |
height=400, | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)' | |
) | |
return fig | |
# ========== D) RECEIPT PROCESSING FUNCTIONS ========== | |
def process_receipt_image(image_file, phone): | |
""" | |
Complete receipt processing pipeline | |
Returns: (success, status_message, extracted_data, image_preview) | |
""" | |
try: | |
if not image_file: | |
return False, "β No image uploaded", {}, None | |
# Save uploaded image | |
timestamp = int(time.time()) | |
filename = f"receipt_{phone}_{timestamp}.jpg" | |
image_path = os.path.join(RECEIPTS_DIR, filename) | |
# Handle different input types | |
if hasattr(image_file, 'name'): | |
# File upload | |
with open(image_path, 'wb') as f: | |
f.write(image_file.read()) | |
else: | |
# Direct file path | |
image_path = image_file | |
# Preprocess image | |
processed_path, preprocessing_info = ImageProcessor.preprocess_receipt_image(image_path) | |
# Extract text using OCR | |
raw_text, confidence, extracted_data = ocr_service.extract_text_from_receipt(processed_path) | |
# Auto-categorize | |
if extracted_data.get('merchant'): | |
suggested_category = db.auto_categorize_receipt( | |
phone, | |
extracted_data['merchant'], | |
extracted_data.get('total_amount', 0) | |
) | |
extracted_data['suggested_category'] = suggested_category | |
# Prepare receipt data for database | |
receipt_data = { | |
'image_path': image_path, | |
'processed_image_path': processed_path, | |
'merchant': extracted_data.get('merchant', ''), | |
'amount': extracted_data.get('total_amount', 0.0), | |
'date': extracted_data.get('date', ''), | |
'category': extracted_data.get('suggested_category', 'Miscellaneous'), | |
'confidence': confidence, | |
'raw_text': raw_text, | |
'extracted_data': extracted_data, | |
'is_validated': False | |
} | |
# Save to database | |
receipt_id = db.save_receipt(phone, receipt_data) | |
extracted_data['receipt_id'] = receipt_id | |
status_msg = f"β Receipt processed successfully! Confidence: {confidence:.1%}" | |
if confidence < 0.7: | |
status_msg += " β οΈ Low confidence - please verify extracted data" | |
return True, status_msg, extracted_data, image_path | |
except Exception as e: | |
print(f"Receipt processing error: {e}") | |
return False, f"β Processing failed: {str(e)}", {}, None | |
def validate_and_save_receipt(phone, receipt_id, merchant, amount, date, category, line_items_data): | |
""" | |
Validate edited receipt data and save as expense | |
""" | |
try: | |
if not phone or not receipt_id: | |
return "β Session expired. Please sign in again.", "", [], [] | |
if not merchant.strip(): | |
return "β Merchant name is required", "", [], [] | |
if amount <= 0: | |
return "β Amount must be positive", "", [], [] | |
# Check balance | |
current_balance = db.get_current_balance(phone) | |
if current_balance < amount: | |
return "β Insufficient balance for this expense", "", [], [] | |
# Update receipt in database | |
receipt_updates = { | |
'merchant': merchant, | |
'amount': amount, | |
'receipt_date': date, | |
'category': category, | |
'is_validated': True | |
} | |
db.update_receipt(receipt_id, receipt_updates) | |
# Record as expense | |
description = f"Receipt: {merchant}" | |
if date: | |
description += f" ({date})" | |
success, new_balance = db.record_expense( | |
phone, category, amount, description, receipt_id=receipt_id | |
) | |
if not success: | |
return "β Failed to record expense", "", [], [] | |
# Send WhatsApp confirmation | |
user_data = db.get_user(phone) | |
name = user_data[0] if user_data else "User" | |
msg = f"π§Ύ Receipt Expense - Hi {name}! Merchant: {merchant}, Amount: {format_currency(amount)}, Category: {category}, Remaining Balance: {format_currency(new_balance)}" | |
twilio.send_whatsapp(phone, msg) | |
# Get updated data for UI | |
expenses = db.get_expenses(phone) | |
formatted_expenses = [] | |
if expenses: | |
for cat, alloc, spent, date, _ in expenses: | |
formatted_expenses.append([ | |
cat, alloc, spent, alloc - spent, date.split()[0] if date else "" | |
]) | |
spending_log = db.get_spending_log(phone, 10) | |
formatted_spending_log = [] | |
if spending_log: | |
for cat, amt, desc, date, balance_after in spending_log: | |
formatted_spending_log.append([ | |
cat, amt, desc[:50] + "..." if len(desc) > 50 else desc, | |
date.split()[0] if date else "", balance_after | |
]) | |
status_msg = f"β Receipt saved! Recorded {format_currency(amount)} for {category}" | |
balance_html = f"<div class='balance-amount'>π° {format_currency(new_balance)}</div>" | |
return status_msg, balance_html, formatted_expenses, formatted_spending_log | |
except Exception as e: | |
print(f"Receipt validation error: {e}") | |
return f"β Error saving receipt: {str(e)}", "", [], [] | |
# ========== PAGE NAVIGATION FUNCTIONS ========== | |
def show_signin(): | |
return [ | |
gr.update(visible=False), # landing_page | |
gr.update(visible=True), # signin_page | |
gr.update(visible=False), # signup_page | |
gr.update(visible=False), # dashboard_page | |
"", # Clear signin inputs | |
"" | |
] | |
def show_signup(): | |
return [ | |
gr.update(visible=False), # landing_page | |
gr.update(visible=False), # signin_page | |
gr.update(visible=True), # signup_page | |
gr.update(visible=False), # dashboard_page | |
"", # Clear signup inputs | |
"", | |
"", | |
"" | |
] | |
def show_dashboard(phone, name): | |
user_data = db.get_user(phone) | |
current_balance = user_data[4] if user_data else 0 | |
monthly_income = user_data[1] if user_data else 0 | |
savings_goal = user_data[2] if user_data else 0 | |
# Get expense data | |
expenses = db.get_expenses(phone) | |
formatted_expenses = [] | |
if expenses: | |
for cat, alloc, spent, date, _ in expenses: | |
formatted_expenses.append([ | |
cat, alloc, spent, alloc - spent, date.split()[0] if date else "" | |
]) | |
# Get investment data | |
investments = db.get_investments(phone) | |
formatted_investments = [] | |
if investments: | |
for inv_type, name, amount, date, notes in investments: | |
formatted_investments.append([ | |
inv_type, name, amount, date.split()[0] if date else "", notes or "" | |
]) | |
# Get spending log | |
spending_log = db.get_spending_log(phone, 10) | |
formatted_spending_log = [] | |
if spending_log: | |
for category, amount, description, date, balance_after in spending_log: | |
formatted_spending_log.append([ | |
category, amount, description[:50] + "..." if len(description) > 50 else description, | |
date.split()[0] if date else "", balance_after | |
]) | |
# Get family info | |
family_info = "No family group" | |
family_members = [] | |
if user_data and user_data[3]: | |
group_data = db.get_family_group(user_data[3]) | |
if group_data: | |
family_info = f"Family Group: {group_data[0]} (Admin: {group_data[1]})" | |
members = db.get_family_members(user_data[3]) | |
family_members = [[m[0], m[1]] for m in members] | |
# Get receipt data | |
receipts = db.get_receipts(phone) | |
formatted_receipts = [] | |
if receipts: | |
for receipt_id, merchant, amount, date, category, confidence, is_validated, created_at in receipts: | |
status = "β Validated" if is_validated else "β³ Pending" | |
formatted_receipts.append([ | |
receipt_id, merchant or "Unknown", format_currency(amount), | |
date or "N/A", category or "N/A", f"{confidence:.1%}", | |
status, created_at.split()[0] if created_at else "" | |
]) | |
# Prepare allocation inputs | |
alloc_inputs = [] | |
if expenses: | |
alloc_dict = {cat: alloc for cat, alloc, _, _, _ in expenses} | |
alloc_inputs = [alloc_dict.get(cat, 0) for cat in EXPENSE_CATEGORIES] | |
else: | |
alloc_inputs = [0] * len(EXPENSE_CATEGORIES) | |
return [ | |
gr.update(visible=False), # landing_page | |
gr.update(visible=False), # signin_page | |
gr.update(visible=False), # signup_page | |
gr.update(visible=True), # dashboard_page | |
f"Welcome back, {name}! π", # welcome message | |
f"<div class='balance-amount'>π° {format_currency(current_balance)}</div>", # balance display | |
monthly_income, # income | |
savings_goal, # savings_goal | |
*alloc_inputs, # allocation inputs | |
formatted_expenses, # expense_table | |
formatted_investments, # investments_table | |
formatted_spending_log, # spending_log_table | |
generate_spending_chart(phone), # spending_chart | |
generate_balance_chart(phone), # balance_chart | |
family_info, # family_info | |
family_members, # family_members | |
formatted_receipts # receipts_table | |
] | |
def return_to_landing(): | |
return [ | |
gr.update(visible=True), # landing_page | |
gr.update(visible=False), # signin_page | |
gr.update(visible=False), # signup_page | |
gr.update(visible=False), # dashboard_page | |
"", # Clear welcome | |
"<div class='balance-amount'>π° 0 PKR</div>" # Clear balance | |
] | |
# ========== AUTHENTICATION FUNCTIONS ========== | |
def authenticate_user(phone, password): | |
if not phone or not password: | |
return "β Please fill all fields" | |
if not validate_phone_number(phone): | |
return "β Invalid phone format. Use +92XXXXXXXXXX" | |
user_name = db.authenticate_user(phone, password) | |
if not user_name: | |
return "β Invalid phone number or password." | |
return f"β Signed in as {user_name}" | |
def register_user(name, phone, password, confirm_password): | |
if not name or not phone or not password or not confirm_password: | |
return "β Please fill all fields" | |
if not validate_phone_number(phone): | |
return "β Invalid phone format. Use +92XXXXXXXXXX" | |
if password != confirm_password: | |
return "β Passwords don't match" | |
is_valid, password_msg = validate_password(password) | |
if not is_valid: | |
return f"β {password_msg}" | |
success = db.create_user(phone, name, password) | |
if not success: | |
return "β οΈ This number is already registered" | |
msg = f"π¦ Welcome to FinGenius Pro, {name}! Your account has been created successfully. You can now track expenses, manage budgets, and receive instant financial alerts. Start by adding your first balance! π°" | |
whatsapp_sent = twilio.send_whatsapp(phone, msg) | |
if whatsapp_sent: | |
return "β Registration complete! Check WhatsApp for confirmation and sign in to continue." | |
else: | |
return "β Registration complete! WhatsApp alerts are not configured, but you can still use all features. Sign in to continue." | |
def add_balance(phone, amount_val, description=""): | |
if not phone: | |
return "β Session expired. Please sign in again.", "" | |
if amount_val <= 0: | |
return "β Amount must be positive", "" | |
new_balance = db.add_income(phone, amount_val, description or "Balance added") | |
user_data = db.get_user(phone) | |
if user_data: | |
name = user_data[0] | |
msg = f"π° Balance Added - Hi {name}! Added: {format_currency(amount_val)}. New Balance: {format_currency(new_balance)}. Description: {description or 'Balance update'}" | |
twilio.send_whatsapp(phone, msg) | |
return f"β Added {format_currency(amount_val)} to balance!", f"<div class='balance-amount'>π° {format_currency(new_balance)}</div>" | |
def update_financials(phone, income_val, savings_val): | |
if not phone: | |
return "β Session expired. Please sign in again." | |
if income_val < 0 or savings_val < 0: | |
return "β Values cannot be negative" | |
db.update_financials(phone, income_val, savings_val) | |
user_data = db.get_user(phone) | |
if user_data: | |
name = user_data[0] | |
msg = f"π Financial Goals Updated - Hi {name}! Monthly Income: {format_currency(income_val)}, Savings Goal: {format_currency(savings_val)}. Your budget planning is now ready! π―" | |
twilio.send_whatsapp(phone, msg) | |
return f"β Updated! Monthly Income: {format_currency(income_val)}, Savings Goal: {format_currency(savings_val)}" | |
def save_allocations(phone, *allocations): | |
if not phone: | |
return "β Session expired. Please sign in again.", [] | |
if any(alloc < 0 for alloc in allocations): | |
return "β Allocations cannot be negative", [] | |
total_alloc = sum(allocations) | |
user_data = db.get_user(phone) | |
if not user_data: | |
return "β User not found", [] | |
if total_alloc + user_data[2] > user_data[1]: | |
return "β Total allocations exceed available income!", [] | |
db.update_expense_allocations(phone, allocations) | |
name = user_data[0] | |
msg = f"π Budget Allocated - Hi {name}! Your monthly budget has been set. Total allocated: {format_currency(total_alloc)}. Start tracking your expenses now! π³" | |
twilio.send_whatsapp(phone, msg) | |
expenses = db.get_expenses(phone) | |
formatted_expenses = [] | |
if expenses: | |
for cat, alloc, spent, date, _ in expenses: | |
formatted_expenses.append([ | |
cat, alloc, spent, alloc - spent, date.split()[0] if date else "" | |
]) | |
return "β Budget allocations saved!", formatted_expenses | |
def record_expense(phone, category, amount, description="", is_recurring=False, recurrence_pattern=None): | |
if not phone: | |
return "β Session expired. Please sign in again.", "", [], [] | |
if amount <= 0: | |
return "β Amount must be positive", "", [], [] | |
current_balance = db.get_current_balance(phone) | |
if current_balance < amount: | |
return "β Insufficient balance for this expense", "", [], [] | |
success, new_balance = db.record_expense(phone, category, amount, description, is_recurring, recurrence_pattern) | |
if not success: | |
return "β Failed to record expense", "", [], [] | |
user_data = db.get_user(phone) | |
name = user_data[0] if user_data else "User" | |
msg = f"πΈ Expense Recorded - Hi {name}! Category: {category}, Amount: {format_currency(amount)}, Remaining Balance: {format_currency(new_balance)}" | |
if description: | |
msg += f", Note: {description}" | |
if is_recurring: | |
msg += f" (Recurring: {recurrence_pattern})" | |
twilio.send_whatsapp(phone, msg) | |
expenses = db.get_expenses(phone) | |
formatted_expenses = [] | |
if expenses: | |
for cat, alloc, spent, date, _ in expenses: | |
formatted_expenses.append([ | |
cat, alloc, spent, alloc - spent, date.split()[0] if date else "" | |
]) | |
spending_log = db.get_spending_log(phone, 10) | |
formatted_spending_log = [] | |
if spending_log: | |
for cat, amt, desc, date, balance_after in spending_log: | |
formatted_spending_log.append([ | |
cat, amt, desc[:50] + "..." if len(desc) > 50 else desc, | |
date.split()[0] if date else "", balance_after | |
]) | |
status_msg = f"β Recorded {format_currency(amount)} for {category}" | |
balance_html = f"<div class='balance-amount'>π° {format_currency(new_balance)}</div>" | |
return status_msg, balance_html, formatted_expenses, formatted_spending_log | |
def add_investment(phone, inv_type, name, amount, notes): | |
if not phone: | |
return "β Session expired. Please sign in again.", "", [] | |
if amount <= 0: | |
return "β Amount must be positive", "", [] | |
current_balance = db.get_current_balance(phone) | |
if current_balance < amount: | |
return "β Insufficient balance for investment", "", [] | |
new_balance = db.log_spending(phone, "Investment", amount, f"Investment: {name}") | |
db.record_investment(phone, inv_type, name, amount, notes) | |
user_data = db.get_user(phone) | |
if user_data: | |
user_name = user_data[0] | |
msg = f"π Investment Added - Hi {user_name}! Type: {inv_type}, Name: {name}, Amount: {format_currency(amount)}, Remaining Balance: {format_currency(new_balance)}" | |
if notes: | |
msg += f", Notes: {notes}" | |
twilio.send_whatsapp(phone, msg) | |
investments = db.get_investments(phone) | |
formatted_investments = [] | |
if investments: | |
for inv_type, name, amount, date, notes in investments: | |
formatted_investments.append([ | |
inv_type, name, amount, date.split()[0] if date else "", notes or "" | |
]) | |
balance_html = f"<div class='balance-amount'>π° {format_currency(new_balance)}</div>" | |
return f"β Added investment: {name} ({format_currency(amount)})", balance_html, formatted_investments | |
def create_family_group(phone, group_name): | |
if not phone or not group_name: | |
return "β Group name required", "", [] | |
group_id = db.create_family_group(group_name, phone) | |
if not group_id: | |
return "β Failed to create group", "", [] | |
user_data = db.get_user(phone) | |
if user_data: | |
name = user_data[0] | |
msg = f"πͺ Family Group Created - Hi {name}! You've created '{group_name}' (ID: {group_id}). Share this ID with family members to join. Manage finances together! π " | |
twilio.send_whatsapp(phone, msg) | |
return f"β Created group: {group_name} (ID: {group_id})", f"Family Group: {group_name} (Admin: {phone})", [[phone, db.get_user(phone)[0] if db.get_user(phone) else "You"]] | |
def join_family_group(phone, group_id): | |
if not phone or not group_id: | |
return "β Group ID required", "", [] | |
success = db.join_family_group(phone, group_id) | |
if not success: | |
return "β Failed to join group", "", [] | |
group_data = db.get_family_group(group_id) | |
if not group_data: | |
return "β Group not found", "", [] | |
user_data = db.get_user(phone) | |
if user_data: | |
name = user_data[0] | |
msg = f"πͺ Joined Family Group - Hi {name}! You've joined '{group_data[0]}'. Start collaborating on family finances together! π€" | |
twilio.send_whatsapp(phone, msg) | |
members = db.get_family_members(group_id) | |
member_list = [[m[0], m[1]] for m in members] | |
return f"β Joined group: {group_data[0]}", f"Family Group: {group_data[0]} (Admin: {group_data[1]})", member_list | |
# ========== E) RECEIPT PROCESSING EVENT HANDLERS ========== | |
def handle_receipt_upload(image_file, phone): | |
"""Handle receipt image upload and processing""" | |
if not phone: | |
return "β Please sign in first", {}, "", "", "", [], None, "" | |
if not image_file: | |
return "β Please upload an image", {}, "", "", "", [], None, "" | |
# Process the receipt | |
success, status, extracted_data, image_path = process_receipt_image(image_file, phone) | |
if not success: | |
return status, {}, "", "", "", [], None, "" | |
# Prepare UI updates | |
merchant = extracted_data.get('merchant', '') | |
amount = extracted_data.get('total_amount', 0.0) | |
date = extracted_data.get('date', '') | |
category = extracted_data.get('suggested_category', 'Miscellaneous') | |
line_items = extracted_data.get('line_items', []) | |
# Create image preview | |
try: | |
image_preview = Image.open(image_path) | |
# Resize for preview | |
image_preview.thumbnail((400, 600)) | |
except: | |
image_preview = None | |
return ( | |
status, | |
{"receipt_id": extracted_data.get('receipt_id', ''), "confidence": extracted_data.get('confidence', 0.0)}, | |
merchant, | |
amount, | |
date, | |
line_items, | |
image_preview, | |
category | |
) | |
def handle_receipt_save(phone, receipt_data, merchant, amount, date, category, line_items_data): | |
"""Save validated receipt as expense""" | |
if not phone or not receipt_data: | |
return "β No receipt data to save", "", [], [] | |
receipt_id = receipt_data.get('receipt_id') | |
if not receipt_id: | |
return "β Invalid receipt data", "", [], [] | |
return validate_and_save_receipt(phone, receipt_id, merchant, amount, date, category, line_items_data) | |
# ========== F) INTERFACE ========== | |
custom_css = """ | |
/* Fixed CSS for proper page transitions */ | |
.gradio-container { | |
max-width: 1200px !important; | |
margin: 0 auto !important; | |
} | |
.landing-hero { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
min-height: 80vh; | |
padding: 3rem 2rem; | |
color: white; | |
text-align: center; | |
border-radius: 20px; | |
margin: 2rem 0; | |
} | |
.hero-title { | |
font-size: 3.5rem; | |
font-weight: 700; | |
margin-bottom: 1rem; | |
text-shadow: 2px 2px 4px rgba(0,0,0,0.3); | |
} | |
.hero-subtitle { | |
font-size: 1.5rem; | |
margin-bottom: 2rem; | |
opacity: 0.9; | |
} | |
.features-grid { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(280px, 1fr)); | |
gap: 2rem; | |
margin: 3rem 0; | |
} | |
.feature-card { | |
background: rgba(255,255,255,0.1); | |
backdrop-filter: blur(10px); | |
border-radius: 15px; | |
padding: 2rem; | |
text-align: center; | |
border: 1px solid rgba(255,255,255,0.2); | |
transition: transform 0.3s ease; | |
} | |
.feature-card:hover { | |
transform: translateY(-5px); | |
} | |
.feature-icon { | |
font-size: 3rem; | |
margin-bottom: 1rem; | |
} | |
.auth-container { | |
max-width: 450px; | |
margin: 2rem auto; | |
background: white; | |
border-radius: 20px; | |
padding: 3rem; | |
box-shadow: 0 20px 40px rgba(0,0,0,0.1); | |
border: 1px solid #e2e8f0; | |
} | |
.whatsapp-setup { | |
background: linear-gradient(135deg, #25D366 0%, #128C7E 100%); | |
color: white; | |
padding: 2rem; | |
border-radius: 15px; | |
margin: 2rem 0; | |
text-align: center; | |
box-shadow: 0 10px 25px rgba(37, 211, 102, 0.3); | |
} | |
.whatsapp-steps { | |
background: rgba(255, 255, 255, 0.1); | |
backdrop-filter: blur(10px); | |
border-radius: 10px; | |
padding: 1.5rem; | |
margin: 1rem 0; | |
border: 1px solid rgba(255, 255, 255, 0.2); | |
} | |
.phone-highlight { | |
background: rgba(255, 255, 255, 0.2); | |
padding: 0.5rem 1rem; | |
border-radius: 8px; | |
font-family: monospace; | |
font-size: 1.1rem; | |
font-weight: bold; | |
display: inline-block; | |
margin: 0.5rem 0; | |
} | |
.code-highlight { | |
background: rgba(255, 255, 255, 0.15); | |
padding: 0.5rem 1rem; | |
border-radius: 8px; | |
font-family: monospace; | |
font-size: 1rem; | |
font-weight: bold; | |
display: inline-block; | |
margin: 0.5rem 0; | |
border-left: 3px solid #fff; | |
} | |
.dashboard-header { | |
background: linear-gradient(135deg, #2d3748 0%, #4a5568 100%); | |
color: white; | |
padding: 2rem; | |
border-radius: 15px; | |
margin-bottom: 2rem; | |
text-align: center; | |
font-size: 1.5rem; | |
} | |
.balance-card { | |
background: linear-gradient(135deg, #48bb78 0%, #38a169 100%); | |
color: white; | |
padding: 2rem; | |
border-radius: 15px; | |
text-align: center; | |
margin-bottom: 2rem; | |
box-shadow: 0 10px 25px rgba(72, 187, 120, 0.3); | |
} | |
.balance-amount { | |
font-size: 2.5rem; | |
font-weight: bold; | |
margin: 1rem 0; | |
} | |
.receipt-upload-area { | |
border: 2px dashed #cbd5e0; | |
border-radius: 15px; | |
padding: 2rem; | |
text-align: center; | |
background: #f7fafc; | |
transition: all 0.3s ease; | |
} | |
.receipt-upload-area:hover { | |
border-color: #4299e1; | |
background: #ebf8ff; | |
} | |
.receipt-preview { | |
max-width: 100%; | |
max-height: 400px; | |
border-radius: 10px; | |
box-shadow: 0 4px 15px rgba(0,0,0,0.1); | |
} | |
.low-confidence { | |
background-color: #fff3cd !important; | |
border: 1px solid #ffc107 !important; | |
} | |
.high-confidence { | |
background-color: #d4edda !important; | |
border: 1px solid #28a745 !important; | |
} | |
/* Button styling */ | |
.primary-btn { | |
background: linear-gradient(45deg, #ff6b6b, #ee5a24) !important; | |
border: none !important; | |
border-radius: 25px !important; | |
padding: 1rem 2rem !important; | |
font-size: 1.1rem !important; | |
font-weight: 600 !important; | |
color: white !important; | |
transition: all 0.3s ease !important; | |
box-shadow: 0 4px 15px rgba(238, 90, 36, 0.4) !important; | |
} | |
.secondary-btn { | |
background: linear-gradient(45deg, #74b9ff, #0984e3) !important; | |
border: none !important; | |
border-radius: 25px !important; | |
padding: 1rem 2rem !important; | |
font-size: 1.1rem !important; | |
font-weight: 600 !important; | |
color: white !important; | |
transition: all 0.3s ease !important; | |
box-shadow: 0 4px 15px rgba(116, 185, 255, 0.4) !important; | |
} | |
/* Tab styling */ | |
.tab-nav { | |
background: #f8fafc; | |
border-radius: 10px; | |
padding: 0.5rem; | |
margin-bottom: 2rem; | |
} | |
/* Hide elements properly */ | |
.hide { | |
display: none !important; | |
} | |
/* Ensure proper spacing */ | |
.gradio-row { | |
margin: 1rem 0; | |
} | |
.gradio-column { | |
padding: 0 1rem; | |
} | |
/* Custom button hover effects */ | |
button:hover { | |
transform: translateY(-2px); | |
box-shadow: 0 6px 20px rgba(0,0,0,0.15); | |
} | |
/* Status message styling */ | |
.status-success { | |
color: #38a169; | |
font-weight: 600; | |
} | |
.status-error { | |
color: #e53e3e; | |
font-weight: 600; | |
} | |
/* Table styling */ | |
.dataframe { | |
border-radius: 10px; | |
overflow: hidden; | |
box-shadow: 0 4px 15px rgba(0,0,0,0.1); | |
} | |
""" | |
with gr.Blocks(title="FinGenius Pro", theme=gr.themes.Soft(), css=custom_css) as demo: | |
# State to track current user | |
current_user = gr.State() | |
receipt_data = gr.State({}) | |
# ===== LANDING PAGE ===== | |
with gr.Column(visible=True) as landing_page: | |
gr.HTML(""" | |
<div class="landing-hero"> | |
<div class="hero-title">π¦ FinGenius Pro</div> | |
<div class="hero-subtitle">Your Complete Personal Finance Manager with Smart AI Alerts</div> | |
<div class="features-grid"> | |
<div class="feature-card"> | |
<div class="feature-icon">π°</div> | |
<h3>Smart Balance Tracking</h3> | |
<p>Real-time balance monitoring with intelligent spending alerts</p> | |
</div> | |
<div class="feature-card"> | |
<div class="feature-icon">π±</div> | |
<h3>WhatsApp Integration</h3> | |
<p>Get instant notifications for every expense and budget alert</p> | |
</div> | |
<div class="feature-card"> | |
<div class="feature-icon">π</div> | |
<h3>Advanced Analytics</h3> | |
<p>Beautiful charts and insights to track your spending patterns</p> | |
</div> | |
<div class="feature-card"> | |
<div class="feature-icon">π§Ύ</div> | |
<h3>Receipt Scanning</h3> | |
<p>AI-powered OCR to automatically extract expense data from receipts</p> | |
</div> | |
<div class="feature-card"> | |
<div class="feature-icon">πͺ</div> | |
<h3>Family Finance</h3> | |
<p>Create family groups to manage household finances together</p> | |
</div> | |
<div class="feature-card"> | |
<div class="feature-icon">π</div> | |
<h3>Secure & Private</h3> | |
<p>Password-protected accounts with encrypted data storage</p> | |
</div> | |
</div> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
signin_btn = gr.Button("π Sign In", variant="primary", elem_classes="primary-btn", size="lg") | |
with gr.Column(scale=1): | |
signup_btn = gr.Button("β¨ Create Account", variant="secondary", elem_classes="secondary-btn", size="lg") | |
# ===== SIGN IN PAGE ===== | |
with gr.Column(visible=False) as signin_page: | |
with gr.Column(elem_classes="auth-container"): | |
gr.HTML("<h2 style='text-align: center; color: #2d3748; margin-bottom: 2rem;'>π Welcome Back</h2>") | |
signin_phone = gr.Textbox( | |
label="π± WhatsApp Number", | |
placeholder="+92XXXXXXXXXX", | |
info="Enter your registered WhatsApp number" | |
) | |
signin_password = gr.Textbox( | |
label="π Password", | |
type="password", | |
placeholder="Enter your secure password" | |
) | |
with gr.Row(): | |
submit_signin = gr.Button("Sign In", variant="primary", elem_classes="primary-btn", scale=2) | |
back_to_landing_1 = gr.Button("β Back", variant="secondary", scale=1) | |
signin_status = gr.Textbox(label="Status", interactive=False) | |
# ===== SIGN UP PAGE ===== | |
with gr.Column(visible=False) as signup_page: | |
with gr.Column(elem_classes="auth-container"): | |
gr.HTML("<h2 style='text-align: center; color: #2d3748; margin-bottom: 2rem;'>β¨ Create Your Account</h2>") | |
signup_name = gr.Textbox( | |
label="π€ Full Name", | |
placeholder="Enter your full name" | |
) | |
signup_phone = gr.Textbox( | |
label="π± WhatsApp Number", | |
placeholder="+92XXXXXXXXXX", | |
info="This will be used for notifications" | |
) | |
signup_password = gr.Textbox( | |
label="π Create Password", | |
type="password", | |
placeholder="Minimum 6 characters with letters and numbers" | |
) | |
signup_confirm_password = gr.Textbox( | |
label="π Confirm Password", | |
type="password", | |
placeholder="Re-enter your password" | |
) | |
# WhatsApp Setup Instructions | |
gr.HTML(""" | |
<div class='whatsapp-setup'> | |
<h3>π± Enable WhatsApp Alerts</h3> | |
<p style='font-size: 1.1rem; margin-bottom: 1.5rem;'>To receive instant notifications for your financial activities, follow these steps:</p> | |
<div class='whatsapp-steps'> | |
<h4>Step 1: Save the Bot Number</h4> | |
<p>Add this Twilio WhatsApp Sandbox number to your contacts:</p> | |
<div class='phone-highlight'>+1 (415) 523-8886</div> | |
</div> | |
<div class='whatsapp-steps'> | |
<h4>Step 2: Send Activation Code</h4> | |
<p>Send this exact message to the number above:</p> | |
<div class='code-highlight'>join catch-manner</div> | |
<p style='font-size: 0.9rem; opacity: 0.8; margin-top: 0.5rem;'> | |
β οΈ <strong>Important:</strong> You must send this exact code to activate the sandbox. | |
</p> | |
</div> | |
<div class='whatsapp-steps'> | |
<h4>Step 3: Confirm Registration</h4> | |
<p>After sending the code, register your FinGenius account with the <strong>same phone number</strong> you used to message the bot.</p> | |
</div> | |
<div class='whatsapp-steps'> | |
<h4>Step 4: Start Receiving Alerts</h4> | |
<p>You'll receive instant WhatsApp notifications for:</p> | |
<ul style='text-align: left; margin-left: 1rem; opacity: 0.9;'> | |
<li>β Account registration confirmation</li> | |
<li>π° Balance updates</li> | |
<li>πΈ Expense notifications</li> | |
<li>π§Ύ Receipt processing confirmations</li> | |
<li>π Investment tracking</li> | |
<li>π¨ Budget alerts</li> | |
</ul> | |
</div> | |
</div> | |
""") | |
with gr.Row(): | |
submit_signup = gr.Button("Complete Registration", variant="primary", elem_classes="primary-btn", scale=2) | |
back_to_landing_2 = gr.Button("β Back", variant="secondary", scale=1) | |
signup_status = gr.Textbox(label="Status", interactive=False) | |
# ===== DASHBOARD PAGE ===== | |
with gr.Column(visible=False) as dashboard_page: | |
# Dashboard Header | |
welcome_message = gr.HTML("", elem_classes="dashboard-header") | |
# Current Balance Display | |
with gr.Column(elem_classes="balance-card"): | |
balance_display = gr.HTML("<div class='balance-amount'>π° 0 PKR</div>") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
balance_amount = gr.Number(label="π° Add to Balance (PKR)", minimum=1, step=100, value=0) | |
balance_description = gr.Textbox(label="Description", placeholder="Salary, gift, bonus, etc.") | |
with gr.Column(scale=1): | |
add_balance_btn = gr.Button("Add Balance", variant="primary", elem_classes="primary-btn") | |
balance_status = gr.Textbox(label="Balance Status", interactive=False) | |
with gr.Tabs(elem_classes="tab-nav"): | |
# Dashboard Overview Tab | |
with gr.Tab("π Dashboard Overview"): | |
gr.HTML(""" | |
<div style="text-align: center; padding: 3rem; background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); border-radius: 15px; color: white; margin: 2rem 0;"> | |
<h2>π Welcome to FinGenius Pro!</h2> | |
<p style="font-size: 1.2rem; opacity: 0.9;">Your personal finance management just got smarter. Start by adding some balance and setting up your budget allocations.</p> | |
</div> | |
""") | |
with gr.Row(): | |
gr.HTML(""" | |
<div style="background: #e6fffa; padding: 2rem; border-radius: 15px; border-left: 4px solid #38b2ac;"> | |
<h3>π Quick Start Guide:</h3> | |
<ol style="text-align: left; margin-left: 1rem;"> | |
<li><strong>Add Balance:</strong> Use the balance card above to add your initial funds</li> | |
<li><strong>Set Income & Goals:</strong> Go to Income & Goals tab to set your monthly income and savings target</li> | |
<li><strong>Plan Budget:</strong> Use Budget Planner to allocate money to different expense categories</li> | |
<li><strong>Track Expenses:</strong> Log your daily expenses in the Expense Tracker</li> | |
<li><strong>Scan Receipts:</strong> Use Receipt Scan to automatically extract expense data from photos</li> | |
<li><strong>Monitor Investments:</strong> Keep track of your investment portfolio</li> | |
</ol> | |
</div> | |
""") | |
# Income & Goals Tab | |
with gr.Tab("π₯ Income & Goals"): | |
gr.HTML("<h3>π΅ Set Your Financial Goals</h3>") | |
with gr.Row(): | |
income = gr.Number(label="π΅ Monthly Income (PKR)", minimum=0, step=1000, value=0) | |
savings_goal = gr.Number(label="π― Savings Goal (PKR)", minimum=0, step=1000, value=0) | |
update_btn = gr.Button("πΎ Update Financial Info", variant="primary", elem_classes="primary-btn") | |
income_status = gr.Textbox(label="Status", interactive=False) | |
# Budget Planner Tab | |
with gr.Tab("π Budget Planner"): | |
gr.HTML("<h3>πΌ Allocate Your Monthly Budget</h3>") | |
with gr.Column(): | |
allocation_inputs = [] | |
with gr.Row(): | |
for i, category in enumerate(EXPENSE_CATEGORIES[:7]): | |
alloc = gr.Number(label=f"π·οΈ {category}", minimum=0, step=100, value=0) | |
allocation_inputs.append(alloc) | |
with gr.Row(): | |
for i, category in enumerate(EXPENSE_CATEGORIES[7:]): | |
alloc = gr.Number(label=f"π·οΈ {category}", minimum=0, step=100, value=0) | |
allocation_inputs.append(alloc) | |
allocate_btn = gr.Button("πΎ Save Budget Allocations", variant="primary", elem_classes="primary-btn", size="lg") | |
allocation_status = gr.Textbox(label="Status", interactive=False) | |
gr.HTML("<h4>π Current Budget Allocations</h4>") | |
expense_table = gr.Dataframe( | |
headers=["Category", "Allocated", "Spent", "Remaining", "Date"], | |
interactive=False, | |
wrap=True | |
) | |
# Receipt Scan Tab - NEW! | |
with gr.Tab("π· Receipt Scan"): | |
gr.HTML(""" | |
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 2rem; border-radius: 15px; margin-bottom: 2rem; text-align: center;"> | |
<h2>π§Ύ AI-Powered Receipt Scanner</h2> | |
<p style="font-size: 1.1rem; opacity: 0.9;">Upload receipt photos and let AI extract expense data automatically!</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.HTML("<h4>π€ Upload Receipt</h4>") | |
receipt_image = gr.File( | |
label="π· Receipt Image", | |
file_types=["image"], | |
elem_classes="receipt-upload-area" | |
) | |
process_receipt_btn = gr.Button( | |
"π Process Receipt", | |
variant="primary", | |
elem_classes="primary-btn", | |
size="lg" | |
) | |
receipt_status = gr.Textbox(label="Processing Status", interactive=False) | |
# Image Preview | |
gr.HTML("<h4>πΈ Receipt Preview</h4>") | |
receipt_preview = gr.Image( | |
label="Receipt Preview", | |
type="pil", | |
elem_classes="receipt-preview" | |
) | |
with gr.Column(scale=1): | |
gr.HTML("<h4>βοΈ Verify & Edit Extracted Data</h4>") | |
extracted_merchant = gr.Textbox( | |
label="πͺ Merchant Name", | |
placeholder="Store/Restaurant name", | |
info="Edit if incorrectly detected" | |
) | |
with gr.Row(): | |
extracted_amount = gr.Number( | |
label="π° Total Amount (PKR)", | |
minimum=0, | |
step=0.01, | |
value=0 | |
) | |
extracted_date = gr.Textbox( | |
label="π Date", | |
placeholder="YYYY-MM-DD or DD/MM/YYYY" | |
) | |
extracted_category = gr.Dropdown( | |
choices=EXPENSE_CATEGORIES, | |
label="π·οΈ Category", | |
value="Miscellaneous", | |
info="AI-suggested category (you can change it)" | |
) | |
gr.HTML("<h4>π Line Items (Optional)</h4>") | |
line_items_table = gr.Dataframe( | |
headers=["Item", "Price"], | |
datatype=["str", "number"], | |
row_count=5, | |
col_count=2, | |
interactive=True, | |
label="Receipt Items" | |
) | |
save_receipt_btn = gr.Button( | |
"πΎ Save as Expense", | |
variant="primary", | |
elem_classes="primary-btn", | |
size="lg" | |
) | |
# Receipt History | |
gr.HTML("<h4>π§Ύ Recent Receipts</h4>") | |
receipts_table = gr.Dataframe( | |
headers=["Receipt ID", "Merchant", "Amount", "Date", "Category", "Confidence", "Status", "Processed"], | |
interactive=False, | |
wrap=True | |
) | |
# Expense Tracker Tab | |
with gr.Tab("πΈ Expense Tracker"): | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML("<h4>β Log New Expense</h4>") | |
expense_category = gr.Dropdown(choices=EXPENSE_CATEGORIES, label="π·οΈ Category") | |
expense_amount = gr.Number(label="π° Amount (PKR)", minimum=1, step=100, value=0) | |
expense_description = gr.Textbox(label="π Description", placeholder="What did you buy?") | |
with gr.Accordion("π Recurring Expense Settings", open=False): | |
is_recurring = gr.Checkbox(label="This is a recurring expense") | |
recurrence_pattern = gr.Dropdown(choices=RECURRENCE_PATTERNS, label="Frequency") | |
record_expense_btn = gr.Button("πΈ Record Expense", variant="primary", elem_classes="primary-btn", size="lg") | |
expense_status = gr.Textbox(label="Status", interactive=False) | |
with gr.Column(): | |
gr.HTML("<h4>π Spending Analytics</h4>") | |
spending_chart = gr.Plot(label="π Spending Analysis") | |
balance_chart = gr.Plot(label="π° Balance Trend") | |
with gr.Row(): | |
months_history = gr.Slider(1, 12, value=3, step=1, label="π Months History") | |
update_charts_btn = gr.Button("π Update Analytics", variant="secondary") | |
# Spending History Tab | |
with gr.Tab("π Spending History"): | |
gr.HTML("<h3>π³ Recent Transaction History</h3>") | |
spending_log_table = gr.Dataframe( | |
headers=["Category", "Amount", "Description", "Date", "Balance After"], | |
interactive=False, | |
wrap=True | |
) | |
# Investment Portfolio Tab | |
with gr.Tab("π Investment Portfolio"): | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML("<h4>β Add New Investment</h4>") | |
investment_type = gr.Dropdown(choices=INVESTMENT_TYPES, label="π’ Investment Type") | |
investment_name = gr.Textbox(label="π Name/Description") | |
investment_amount = gr.Number(label="π° Amount (PKR)", minimum=1, step=1000, value=0) | |
investment_notes = gr.Textbox(label="π Notes", lines=2, placeholder="Additional details...") | |
add_investment_btn = gr.Button("π Add Investment", variant="primary", elem_classes="primary-btn") | |
investment_status = gr.Textbox(label="Status", interactive=False) | |
with gr.Column(): | |
gr.HTML("<h4>πΌ Your Investment Portfolio</h4>") | |
investments_table = gr.Dataframe( | |
headers=["Type", "Name", "Amount", "Date", "Notes"], | |
interactive=False, | |
wrap=True | |
) | |
# Family Finance Tab | |
with gr.Tab("πͺ Family Finance"): | |
gr.HTML("<h3>π¨βπ©βπ§βπ¦ Family Financial Management</h3>") | |
family_info = gr.Textbox(label="π₯ Current Family Group", interactive=False) | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML("<h4>β Create New Family Group</h4>") | |
create_group_name = gr.Textbox(label="πͺ Group Name", placeholder="Smith Family Budget") | |
create_group_btn = gr.Button("Create Family Group", variant="primary", elem_classes="primary-btn") | |
with gr.Column(): | |
gr.HTML("<h4>π Join Existing Group</h4>") | |
join_group_id = gr.Textbox(label="π Group ID", placeholder="FG-XXXX-XXXXXXXX") | |
join_group_btn = gr.Button("Join Family Group", variant="secondary", elem_classes="secondary-btn") | |
family_status = gr.Textbox(label="Status", interactive=False) | |
gr.HTML("<h4>π₯ Family Members</h4>") | |
family_members = gr.Dataframe( | |
headers=["Phone", "Name"], | |
interactive=False, | |
wrap=True | |
) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
pass | |
with gr.Column(scale=1): | |
sign_out_btn = gr.Button("πͺ Sign Out", variant="stop", elem_classes="secondary-btn", size="lg") | |
# ===== EVENT HANDLERS ===== | |
# Navigation | |
signin_btn.click( | |
show_signin, | |
outputs=[landing_page, signin_page, signup_page, dashboard_page, signin_phone, signin_password] | |
) | |
signup_btn.click( | |
show_signup, | |
outputs=[landing_page, signin_page, signup_page, dashboard_page, signup_name, signup_phone, signup_password, signup_confirm_password] | |
) | |
back_to_landing_1.click( | |
return_to_landing, | |
outputs=[landing_page, signin_page, signup_page, dashboard_page, welcome_message, balance_display] | |
) | |
back_to_landing_2.click( | |
return_to_landing, | |
outputs=[landing_page, signin_page, signup_page, dashboard_page, welcome_message, balance_display] | |
) | |
sign_out_btn.click( | |
return_to_landing, | |
outputs=[landing_page, signin_page, signup_page, dashboard_page, welcome_message, balance_display] | |
) | |
# Authentication | |
def handle_signin(phone, password): | |
status = authenticate_user(phone, password) | |
if "β " in status: | |
user_name = status.split("as ")[1] | |
current_user.value = phone | |
pages = show_dashboard(phone, user_name) | |
return [status] + pages | |
else: | |
empty_alloc = [0] * len(EXPENSE_CATEGORIES) | |
return [status, gr.update(), gr.update(), gr.update(), gr.update(), "", "<div class='balance-amount'>π° 0 PKR</div>", 0, 0] + empty_alloc + [[], [], [], None, None, "No family group", [], []] | |
submit_signin.click( | |
handle_signin, | |
inputs=[signin_phone, signin_password], | |
outputs=[signin_status, landing_page, signin_page, signup_page, dashboard_page, welcome_message, balance_display, income, savings_goal] + allocation_inputs + [expense_table, investments_table, spending_log_table, spending_chart, balance_chart, family_info, family_members, receipts_table] | |
) | |
def handle_signup(name, phone, password, confirm_password): | |
status = register_user(name, phone, password, confirm_password) | |
return status | |
submit_signup.click( | |
handle_signup, | |
inputs=[signup_name, signup_phone, signup_password, signup_confirm_password], | |
outputs=[signup_status] | |
) | |
# Balance Management | |
def handle_add_balance(amount_val, description): | |
if current_user.value: | |
status, balance_html = add_balance(current_user.value, amount_val, description) | |
return status, balance_html | |
else: | |
return "β Please sign in first", "<div class='balance-amount'>π° 0 PKR</div>" | |
add_balance_btn.click( | |
handle_add_balance, | |
inputs=[balance_amount, balance_description], | |
outputs=[balance_status, balance_display] | |
) | |
# Financial Operations | |
def handle_update_financials(income_val, savings_val): | |
if current_user.value: | |
return update_financials(current_user.value, income_val, savings_val) | |
else: | |
return "β Please sign in first" | |
update_btn.click( | |
handle_update_financials, | |
inputs=[income, savings_goal], | |
outputs=[income_status] | |
) | |
def handle_save_allocations(*allocations): | |
if current_user.value: | |
return save_allocations(current_user.value, *allocations) | |
else: | |
return "β Please sign in first", [] | |
allocate_btn.click( | |
handle_save_allocations, | |
inputs=allocation_inputs, | |
outputs=[allocation_status, expense_table] | |
) | |
def handle_record_expense(category, amount, description, is_recurring, recurrence_pattern): | |
if current_user.value: | |
return record_expense(current_user.value, category, amount, description, is_recurring, recurrence_pattern) | |
else: | |
return "β Please sign in first", "<div class='balance-amount'>π° 0 PKR</div>", [], [] | |
record_expense_btn.click( | |
handle_record_expense, | |
inputs=[expense_category, expense_amount, expense_description, is_recurring, recurrence_pattern], | |
outputs=[expense_status, balance_display, expense_table, spending_log_table] | |
) | |
def handle_add_investment(inv_type, name, amount, notes): | |
if current_user.value: | |
return add_investment(current_user.value, inv_type, name, amount, notes) | |
else: | |
return "β Please sign in first", "<div class='balance-amount'>π° 0 PKR</div>", [] | |
add_investment_btn.click( | |
handle_add_investment, | |
inputs=[investment_type, investment_name, investment_amount, investment_notes], | |
outputs=[investment_status, balance_display, investments_table] | |
) | |
def handle_create_family_group(group_name): | |
if current_user.value: | |
return create_family_group(current_user.value, group_name) | |
else: | |
return "β Please sign in first", "", [] | |
create_group_btn.click( | |
handle_create_family_group, | |
inputs=[create_group_name], | |
outputs=[family_status, family_info, family_members] | |
) | |
def handle_join_family_group(group_id): | |
if current_user.value: | |
return join_family_group(current_user.value, group_id) | |
else: | |
return "β Please sign in first", "", [] | |
join_group_btn.click( | |
handle_join_family_group, | |
inputs=[join_group_id], | |
outputs=[family_status, family_info, family_members] | |
) | |
def handle_update_charts(months_history): | |
if current_user.value: | |
return generate_spending_chart(current_user.value, months_history), generate_balance_chart(current_user.value) | |
else: | |
return None, None | |
update_charts_btn.click( | |
handle_update_charts, | |
inputs=[months_history], | |
outputs=[spending_chart, balance_chart] | |
) | |
# Receipt Processing Event Handlers - NEW! | |
process_receipt_btn.click( | |
handle_receipt_upload, | |
inputs=[receipt_image, current_user], | |
outputs=[receipt_status, receipt_data, extracted_merchant, extracted_amount, extracted_date, line_items_table, receipt_preview, extracted_category] | |
) | |
save_receipt_btn.click( | |
handle_receipt_save, | |
inputs=[current_user, receipt_data, extracted_merchant, extracted_amount, extracted_date, extracted_category, line_items_table], | |
outputs=[receipt_status, balance_display, expense_table, spending_log_table] | |
) | |
if __name__ == "__main__": | |
print("π Starting FinGenius Pro...") | |
print("π± WhatsApp Integration Status:") | |
print(f" Twilio Available: {TWILIO_AVAILABLE}") | |
print(f" Service Enabled: {twilio.enabled}") | |
print("π OCR Services Status:") | |
print(f" Tesseract Available: {TESSERACT_AVAILABLE}") | |
print(f" Google Vision Available: {VISION_API_AVAILABLE}") | |
print("πΌοΈ Image Processing Status:") | |
print(f" PIL/OpenCV Available: {PIL_AVAILABLE}") | |
print("") | |
print("π Setup Instructions:") | |
if not twilio.enabled: | |
print(" 1. Set TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN environment variables") | |
print(" 2. Or modify credentials directly in TwilioWhatsAppService class") | |
print(" 3. Users must send 'join catch-manner' to +14155238886 to activate WhatsApp") | |
print(" 4. Use the same phone number for both WhatsApp activation and app registration") | |
print("") | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
show_error=True | |
) |