import os
import re
import sqlite3
import time
import hashlib
import base64
import json
import uuid
import threading
import traceback
import logging
from datetime import datetime, timedelta
from io import BytesIO
from contextlib import contextmanager
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import gradio as gr
from dateutil.relativedelta import relativedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('fingenius.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Image processing imports
try:
from PIL import Image, ImageEnhance, ImageFilter
import cv2
import numpy as np
PIL_AVAILABLE = True
CV2_AVAILABLE = True
logger.info("✅ PIL and OpenCV loaded successfully")
except ImportError as e:
PIL_AVAILABLE = False
CV2_AVAILABLE = False
logger.warning(f"⚠️ PIL/OpenCV not installed: {e}")
# OCR imports
try:
import pytesseract
# Test if tesseract binary is available
pytesseract.get_tesseract_version()
TESSERACT_AVAILABLE = True
logger.info("✅ Tesseract OCR loaded successfully")
except (ImportError, pytesseract.TesseractNotFoundError) as e:
TESSERACT_AVAILABLE = False
logger.warning(f"⚠️ Tesseract not available: {e}")
# Google Vision API (optional)
try:
from google.cloud import vision
VISION_API_AVAILABLE = bool(os.getenv('GOOGLE_APPLICATION_CREDENTIALS'))
if VISION_API_AVAILABLE:
logger.info("✅ Google Vision API credentials found")
else:
logger.info("ℹ️ Google Vision API credentials not configured")
except ImportError:
VISION_API_AVAILABLE = False
logger.info("ℹ️ Google Vision API not installed")
# Twilio Integration
try:
from twilio.rest import Client
TWILIO_AVAILABLE = True
logger.info("✅ Twilio library loaded")
except ImportError:
TWILIO_AVAILABLE = False
logger.warning("⚠️ Twilio not installed")
# Constants
EXPENSE_CATEGORIES = [
"Housing (Rent/Mortgage)",
"Utilities (Electricity/Water)",
"Groceries",
"Dining Out",
"Transportation",
"Healthcare",
"Entertainment",
"Education",
"Personal Care",
"Debt Payments",
"Savings",
"Investments",
"Charity",
"Miscellaneous"
]
INVESTMENT_TYPES = [
"Stocks",
"Bonds",
"Mutual Funds",
"Real Estate",
"Cryptocurrency",
"Retirement Accounts",
"Other"
]
RECURRENCE_PATTERNS = [
"Daily",
"Weekly",
"Monthly",
"Quarterly",
"Yearly"
]
# File upload constants
ALLOWED_IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
RECEIPTS_DIR = "receipts"
# Create receipts directory safely
try:
os.makedirs(RECEIPTS_DIR, exist_ok=True)
logger.info(f"📁 Receipts directory: {os.path.abspath(RECEIPTS_DIR)}")
except OSError as e:
logger.error(f"❌ Could not create receipts directory: {e}")
# Security functions
def generate_salt():
"""Generate a random salt for password hashing"""
return os.urandom(32).hex()
def hash_password(password, salt=None):
"""Hash password using SHA-256 with salt"""
if salt is None:
salt = generate_salt()
password_hash = hashlib.sha256((password + salt).encode('utf-8')).hexdigest()
return f"{password_hash}:{salt}"
def verify_password(password, hashed):
"""Verify password against hash"""
try:
if ':' not in hashed:
logger.warning("Legacy password hash detected - please update")
return False
hash_part, salt = hashed.split(":", 1)
computed_hash = hashlib.sha256((password + salt).encode('utf-8')).hexdigest()
return computed_hash == hash_part
except Exception as e:
logger.error(f"Password verification error: {e}")
return False
def validate_phone_number(phone):
"""Validate phone number format - Pakistani mobile numbers"""
if not phone:
return False
# Pakistan mobile numbers: +92 followed by 3 and then 9 digits
pattern = r'^\+92[3][0-9]{9}$'
return bool(re.match(pattern, phone))
def validate_password(password):
"""Validate password strength"""
if not password:
return False, "Password is required"
if len(password) < 6:
return False, "Password must be at least 6 characters long"
if not re.search(r'[A-Za-z]', password):
return False, "Password must contain at least one letter"
if not re.search(r'\d', password):
return False, "Password must contain at least one number"
return True, "Password is valid"
def format_currency(amount):
"""Format currency with proper PKR display"""
if amount is None:
return "0 PKR"
return f"{int(amount):,} PKR"
def safe_file_extension(filename):
"""Get and validate file extension"""
if not filename:
return None
ext = os.path.splitext(filename.lower())[1]
return ext if ext in ALLOWED_IMAGE_EXTENSIONS else None
# ========== A) ENHANCED IMAGE PROCESSING ==========
class ImageProcessor:
"""Enhanced image preprocessing for better OCR results"""
@staticmethod
@contextmanager
def open_image(image_path):
"""Context manager for safe image handling"""
image = None
try:
image = Image.open(image_path)
yield image
finally:
if image:
image.close()
@classmethod
def preprocess_receipt_image(cls, image_path):
"""
Preprocess receipt image for optimal OCR
Returns: (processed_image_path, preprocessing_info)
"""
try:
if not PIL_AVAILABLE or not os.path.exists(image_path):
return image_path, "No preprocessing available"
with cls.open_image(image_path) as image:
# Convert to RGB if needed
if image.mode != 'RGB':
image = image.convert('RGB')
# Enhance contrast and sharpness
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(1.5)
enhancer = ImageEnhance.Sharpness(image)
image = enhancer.enhance(2.0)
# Convert to grayscale for better OCR
image = image.convert('L')
# Apply slight blur to reduce noise
image = image.filter(ImageFilter.GaussianBlur(radius=0.5))
# OpenCV processing if available
if CV2_AVAILABLE:
img_array = np.array(image)
# Apply adaptive threshold
_, binary = cv2.threshold(
img_array, 0, 255,
cv2.THRESH_BINARY + cv2.THRESH_OTSU
)
# Clean up with morphological operations
kernel = np.ones((1, 1), np.uint8)
binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
image = Image.fromarray(binary)
# Save processed image
processed_path = image_path.replace('.', '_processed.')
image.save(processed_path, optimize=True, quality=95)
return processed_path, "Enhanced: contrast, sharpness, thresholding applied"
except Exception as e:
logger.error(f"Image preprocessing error: {e}")
return image_path, f"Preprocessing failed: {str(e)}"
@classmethod
def process_receipt_image(cls, image_file, phone):
"""
Complete receipt processing pipeline
Returns: (success, status_message, extracted_data, image_preview_path)
"""
try:
if not phone:
return False, "❌ Please sign in first", {}, None
if not image_file:
return False, "❌ No image uploaded", {}, None
# Handle different input types from Gradio
image_path = None
if isinstance(image_file, str):
image_path = image_file
elif hasattr(image_file, 'name') and image_file.name:
image_path = image_file.name
else:
return False, "❌ Invalid file format", {}, None
# Validate file existence and extension
if not os.path.exists(image_path):
return False, "❌ File not found", {}, None
file_ext = safe_file_extension(image_path)
if not file_ext:
return False, "❌ Invalid image format. Use JPG, PNG, or other supported formats", {}, None
# Check file size
file_size = os.path.getsize(image_path)
if file_size > MAX_FILE_SIZE:
return False, f"❌ File too large. Maximum size: {MAX_FILE_SIZE // (1024*1024)}MB", {}, None
# Generate secure filename
timestamp = int(time.time())
unique_id = uuid.uuid4().hex[:8]
filename = f"receipt_{phone[-4:]}_{timestamp}_{unique_id}{file_ext}"
save_path = os.path.abspath(os.path.join(RECEIPTS_DIR, filename))
# Ensure the save path is within the receipts directory (security check)
receipts_abs = os.path.abspath(RECEIPTS_DIR)
if not save_path.startswith(receipts_abs):
return False, "❌ Invalid file path", {}, None
# Copy file safely with chunked reading
try:
with open(image_path, 'rb') as src, open(save_path, 'wb') as dst:
while True:
chunk = src.read(64 * 1024) # 64KB chunks
if not chunk:
break
dst.write(chunk)
except (IOError, OSError) as e:
logger.error(f"File copy failed: {e}")
return False, f"❌ File save failed: {str(e)}", {}, None
logger.info(f"📄 Receipt saved: {save_path}")
# Preprocess image
processed_path, preprocessing_info = cls.preprocess_receipt_image(save_path)
logger.info(f"🖼️ {preprocessing_info}")
# Extract text using OCR
raw_text, confidence, extracted_data = ocr_service.extract_text_from_receipt(processed_path)
logger.info(f"🔍 OCR Confidence: {confidence:.1%}")
# Auto-categorize expense
if extracted_data.get('merchant'):
suggested_category = db.auto_categorize_receipt(
phone,
extracted_data['merchant'],
extracted_data.get('total_amount', 0)
)
extracted_data['suggested_category'] = suggested_category
logger.info(f"🏷️ Suggested category: {suggested_category}")
# Save receipt data to database
receipt_data = {
'image_path': save_path,
'processed_image_path': processed_path,
'merchant': extracted_data.get('merchant', ''),
'amount': extracted_data.get('total_amount', 0.0),
'date': extracted_data.get('date', ''),
'category': extracted_data.get('suggested_category', 'Miscellaneous'),
'confidence': confidence,
'raw_text': raw_text,
'extracted_data': extracted_data,
'is_validated': False
}
receipt_id = db.save_receipt(phone, receipt_data)
if receipt_id:
extracted_data['receipt_id'] = receipt_id
logger.info(f"💾 Receipt saved to DB: {receipt_id}")
# Generate status message
status_msg = f"✅ Receipt processed! Confidence: {confidence:.1%}"
if confidence < 0.7:
status_msg += " ⚠️ Low confidence - please verify data"
return True, status_msg, extracted_data, save_path
except Exception as e:
logger.error(f"Receipt processing error: {traceback.format_exc()}")
return False, f"❌ Processing failed: {str(e)}", {}, None
# ========== B) ENHANCED OCR SERVICE ==========
class OCRService:
"""Enhanced OCR processing with multiple backends"""
def __init__(self):
self.tesseract_available = TESSERACT_AVAILABLE
self.vision_api_available = VISION_API_AVAILABLE
if self.vision_api_available:
try:
self.vision_client = vision.ImageAnnotatorClient()
logger.info("✅ Google Vision API initialized")
except Exception as e:
logger.error(f"Google Vision API init failed: {e}")
self.vision_api_available = False
def extract_text_from_receipt(self, image_path):
"""
Extract text from receipt using best available OCR service
Returns: (raw_text, confidence_score, extracted_data)
"""
try:
if not os.path.exists(image_path):
return "Image file not found", 0.0, self._create_empty_data()
# Try Google Vision API first (more accurate)
if self.vision_api_available:
return self._extract_with_vision_api(image_path)
# Fallback to Tesseract
elif self.tesseract_available:
return self._extract_with_tesseract(image_path)
else:
logger.warning("No OCR service available")
return "OCR service not available", 0.0, self._create_empty_data()
except Exception as e:
logger.error(f"OCR extraction error: {e}")
return f"OCR failed: {str(e)}", 0.0, self._create_empty_data()
def _extract_with_vision_api(self, image_path):
"""Extract text using Google Vision API"""
try:
with open(image_path, 'rb') as image_file:
content = image_file.read()
image = vision.Image(content=content)
response = self.vision_client.text_detection(image=image)
if response.error.message:
raise Exception(f"Vision API error: {response.error.message}")
texts = response.text_annotations
if texts:
raw_text = texts[0].description
# Vision API doesn't provide confidence directly, estimate it
confidence = 0.85 # Default high confidence for Vision API
extracted_data = self._parse_receipt_text(raw_text)
return raw_text, confidence, extracted_data
else:
return "No text detected by Vision API", 0.0, self._create_empty_data()
except Exception as e:
logger.error(f"Vision API error: {e}")
return f"Vision API failed: {str(e)}", 0.0, self._create_empty_data()
def _extract_with_tesseract(self, image_path):
"""Extract text using Tesseract OCR"""
try:
with ImageProcessor.open_image(image_path) as image:
# Extract text with optimized config
custom_config = r'--oem 1 --psm 6 -c tessedit_char_whitelist=0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz.,$:/- '
raw_text = pytesseract.image_to_string(image, config=custom_config)
# Get confidence data
data = pytesseract.image_to_data(
image,
output_type=pytesseract.Output.DICT,
config=custom_config
)
# Calculate average confidence (filter out -1 values)
confidences = [int(conf) for conf in data['conf'] if int(conf) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
avg_confidence = avg_confidence / 100.0 # Convert to 0-1 scale
extracted_data = self._parse_receipt_text(raw_text)
return raw_text, avg_confidence, extracted_data
except Exception as e:
logger.error(f"Tesseract error: {e}")
return f"Tesseract failed: {str(e)}", 0.0, self._create_empty_data()
def _parse_receipt_text(self, raw_text):
"""Parse raw OCR text to extract structured data"""
extracted_data = self._create_empty_data()
if not raw_text or not raw_text.strip():
return extracted_data
lines = [line.strip() for line in raw_text.split('\n') if line.strip()]
# Extract merchant name (first meaningful line)
for line in lines:
if len(line) > 2 and not re.match(r'^\d+[./\-]\d+', line):
extracted_data['merchant'] = line[:50] # Limit length
break
# Extract date patterns
date_patterns = [
r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', # MM/DD/YYYY or DD/MM/YYYY
r'\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b', # YYYY/MM/DD
r'\b\d{1,2}\s+\w{3,9}\s+\d{4}\b' # DD Month YYYY
]
for line in lines:
for pattern in date_patterns:
match = re.search(pattern, line)
if match:
extracted_data['date'] = match.group().strip()
break
if extracted_data['date']:
break
# Extract total amount (look for common patterns)
amount_patterns = [
r'(?:total|amount|sum|grand\s*total)[:\s]*(?:rs\.?|pkr)?\s*(\d+(?:[.,]\d{1,2})?)',
r'(?:rs\.?|pkr)\s*(\d+(?:[.,]\d{1,2})?)',
r'\b(\d+(?:[.,]\d{1,2})?)\s*(?:rs\.?|pkr)\b'
]
amounts_found = []
for line in lines:
line_lower = line.lower()
for pattern in amount_patterns:
matches = re.finditer(pattern, line_lower)
for match in matches:
try:
amount_str = match.group(1).replace(',', '.')
amount = float(amount_str)
if 1 <= amount <= 1000000: # Reasonable range
amounts_found.append(amount)
except (ValueError, IndexError):
continue
# Use the largest amount found (likely the total)
if amounts_found:
extracted_data['total_amount'] = max(amounts_found)
# Extract line items
line_items = []
for line in lines:
# Look for item-price patterns
item_patterns = [
r'(.+?)\s+(?:rs\.?|pkr)?\s*(\d+(?:[.,]\d{1,2})?)',
r'(.+?)\s+(\d+(?:[.,]\d{1,2})?)\s*(?:rs\.?|pkr)?'
]
for pattern in item_patterns:
match = re.search(pattern, line.lower())
if match and len(match.group(1).strip()) > 1:
try:
item_name = match.group(1).strip()[:30] # Limit length
price_str = match.group(2).replace(',', '.')
price = float(price_str)
if 1 <= price <= 10000: # Reasonable item price range
line_items.append([item_name, price])
if len(line_items) >= 10: # Limit items
break
except (ValueError, IndexError):
continue
extracted_data['line_items'] = line_items
return extracted_data
def _create_empty_data(self):
"""Create empty extracted data structure"""
return {
'merchant': '',
'date': '',
'total_amount': 0.0,
'line_items': []
}
# ========== C) ENHANCED DATABASE SERVICE ==========
class DatabaseService:
"""Enhanced database service with proper transaction handling"""
def __init__(self, db_name='fingenius.db'):
self.db_name = db_name
self.db_lock = threading.RLock() # Use RLock for nested calls
self._initialize_db()
logger.info(f"📊 Database initialized: {db_name}")
@contextmanager
def get_connection(self):
"""Get database connection with proper cleanup"""
conn = None
try:
conn = sqlite3.connect(self.db_name, timeout=30.0)
conn.row_factory = sqlite3.Row # Enable dict-like access
yield conn
except sqlite3.Error as e:
if conn:
conn.rollback()
logger.error(f"Database error: {e}")
raise
finally:
if conn:
conn.close()
def _initialize_db(self):
"""Initialize database with proper schema"""
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
# Users table with proper constraints
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
phone TEXT PRIMARY KEY,
name TEXT NOT NULL CHECK(length(name) > 0),
password_hash TEXT NOT NULL,
monthly_income INTEGER DEFAULT 0 CHECK(monthly_income >= 0),
savings_goal INTEGER DEFAULT 0 CHECK(savings_goal >= 0),
current_balance INTEGER DEFAULT 0,
is_verified BOOLEAN DEFAULT FALSE,
family_group TEXT DEFAULT NULL,
last_balance_alert TIMESTAMP DEFAULT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
# Expenses table
cursor.execute('''CREATE TABLE IF NOT EXISTS expenses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT NOT NULL,
category TEXT NOT NULL CHECK(length(category) > 0),
allocated INTEGER DEFAULT 0 CHECK(allocated >= 0),
spent INTEGER DEFAULT 0 CHECK(spent >= 0),
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_recurring BOOLEAN DEFAULT FALSE,
recurrence_pattern TEXT DEFAULT NULL,
next_occurrence TIMESTAMP DEFAULT NULL,
FOREIGN KEY(phone) REFERENCES users(phone) ON DELETE CASCADE
)''')
# Spending log table
cursor.execute('''CREATE TABLE IF NOT EXISTS spending_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT NOT NULL,
category TEXT NOT NULL,
amount INTEGER NOT NULL CHECK(amount > 0),
description TEXT DEFAULT '',
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
balance_after INTEGER NOT NULL,
receipt_id TEXT DEFAULT NULL,
FOREIGN KEY(phone) REFERENCES users(phone) ON DELETE CASCADE
)''')
# Receipts table
cursor.execute('''CREATE TABLE IF NOT EXISTS receipts (
receipt_id TEXT PRIMARY KEY,
user_phone TEXT NOT NULL,
image_path TEXT NOT NULL,
processed_image_path TEXT,
merchant TEXT DEFAULT '',
amount REAL DEFAULT 0 CHECK(amount >= 0),
receipt_date TEXT DEFAULT '',
category TEXT DEFAULT 'Miscellaneous',
ocr_confidence REAL DEFAULT 0 CHECK(ocr_confidence >= 0 AND ocr_confidence <= 1),
raw_text TEXT DEFAULT '',
extracted_data TEXT DEFAULT '{}',
is_validated BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_phone) REFERENCES users(phone) ON DELETE CASCADE
)''')
# Other tables
cursor.execute('''CREATE TABLE IF NOT EXISTS investments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT NOT NULL,
type TEXT NOT NULL CHECK(length(type) > 0),
name TEXT NOT NULL CHECK(length(name) > 0),
amount INTEGER NOT NULL CHECK(amount > 0),
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
notes TEXT DEFAULT '',
FOREIGN KEY(phone) REFERENCES users(phone) ON DELETE CASCADE
)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS family_groups (
group_id TEXT PRIMARY KEY,
name TEXT NOT NULL CHECK(length(name) > 0),
admin_phone TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(admin_phone) REFERENCES users(phone)
)''')
# Create indexes for better performance
indexes = [
'CREATE INDEX IF NOT EXISTS idx_expenses_phone ON expenses(phone)',
'CREATE INDEX IF NOT EXISTS idx_spending_log_phone ON spending_log(phone)',
'CREATE INDEX IF NOT EXISTS idx_receipts_phone ON receipts(user_phone)',
'CREATE INDEX IF NOT EXISTS idx_investments_phone ON investments(phone)',
'CREATE INDEX IF NOT EXISTS idx_spending_log_date ON spending_log(date)',
'CREATE INDEX IF NOT EXISTS idx_receipts_date ON receipts(created_at)'
]
for index_sql in indexes:
cursor.execute(index_sql)
conn.commit()
logger.info("✅ Database schema initialized with indexes")
def create_user(self, phone, name, password):
"""Create new user with proper validation"""
try:
if not validate_phone_number(phone):
return False, "Invalid phone number format"
is_valid, msg = validate_password(password)
if not is_valid:
return False, msg
password_hash = hash_password(password)
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''INSERT INTO users (phone, name, password_hash, current_balance)
VALUES (?, ?, ?, ?)''',
(phone, name.strip(), password_hash, 0))
conn.commit()
logger.info(f"👤 User created: {phone}")
return True, "User created successfully"
except sqlite3.IntegrityError:
return False, "Phone number already registered"
except Exception as e:
logger.error(f"User creation error: {e}")
return False, f"Registration failed: {str(e)}"
def authenticate_user(self, phone, password):
"""Authenticate user with proper error handling"""
try:
if not validate_phone_number(phone) or not password:
return None
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT name, password_hash FROM users WHERE phone = ?', (phone,))
result = cursor.fetchone()
if result and verify_password(password, result['password_hash']):
logger.info(f"🔑 User authenticated: {phone}")
return result['name']
return None
except Exception as e:
logger.error(f"Authentication error: {e}")
return None
def get_user(self, phone):
"""Get user data safely"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT name, monthly_income, savings_goal, family_group, current_balance
FROM users WHERE phone = ?''', (phone,))
result = cursor.fetchone()
return tuple(result) if result else None
except Exception as e:
logger.error(f"Get user error: {e}")
return None
def update_user_balance(self, phone, new_balance):
"""Update user balance with transaction"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE users SET current_balance = ? WHERE phone = ?',
(new_balance, phone))
conn.commit()
return True
except Exception as e:
logger.error(f"Balance update error: {e}")
return False
def get_current_balance(self, phone):
"""Get current balance safely"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT current_balance FROM users WHERE phone = ?', (phone,))
result = cursor.fetchone()
return result['current_balance'] if result else 0
except Exception as e:
logger.error(f"Get balance error: {e}")
return 0
def add_income(self, phone, amount, description="Income added"):
"""Add income with proper transaction handling"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
# Get current balance
cursor.execute('SELECT current_balance FROM users WHERE phone = ?', (phone,))
result = cursor.fetchone()
if not result:
return 0
current_balance = result['current_balance']
new_balance = current_balance + amount
# Update balance and log transaction
cursor.execute('UPDATE users SET current_balance = ? WHERE phone = ?',
(new_balance, phone))
cursor.execute('''INSERT INTO spending_log
(phone, category, amount, description, balance_after)
VALUES (?, ?, ?, ?, ?)''',
(phone, "Income", -amount, description, new_balance))
conn.commit()
logger.info(f"💰 Income added: {phone} - {amount}")
return new_balance
except Exception as e:
logger.error(f"Add income error: {e}")
return 0
def update_financials(self, phone, income, savings):
"""Update financial goals with validation"""
try:
if income < 0 or savings < 0:
return False
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''UPDATE users
SET monthly_income = ?, savings_goal = ?
WHERE phone = ?''',
(income, savings, phone))
conn.commit()
logger.info(f"📊 Financials updated: {phone}")
return True
except Exception as e:
logger.error(f"Update financials error: {e}")
return False
def get_expenses(self, phone, months_back=3):
"""Get expenses with proper date filtering"""
try:
end_date = datetime.now()
start_date = end_date - relativedelta(months=months_back)
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT category, allocated, spent, date(date) as exp_date, is_recurring
FROM expenses
WHERE phone = ? AND date BETWEEN ? AND ?
ORDER BY allocated DESC''',
(phone, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d')))
return [tuple(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Get expenses error: {e}")
return []
def update_expense_allocations(self, phone, allocations):
"""Update expense allocations with transaction"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
# Clear existing allocations for non-recurring expenses
cursor.execute('''DELETE FROM expenses
WHERE phone = ? AND allocated > 0 AND is_recurring = FALSE''',
(phone,))
# Insert new allocations
for category, alloc in zip(EXPENSE_CATEGORIES, allocations):
if alloc > 0:
cursor.execute('''INSERT INTO expenses
(phone, category, allocated)
VALUES (?, ?, ?)''',
(phone, category, alloc))
conn.commit()
logger.info(f"💼 Allocations updated: {phone}")
return True
except Exception as e:
logger.error(f"Update allocations error: {e}")
return False
def record_expense(self, phone, category, amount, description="", is_recurring=False, recurrence_pattern=None, receipt_id=None):
"""Record expense with proper transaction handling"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
# Get current balance
cursor.execute('SELECT current_balance FROM users WHERE phone = ?', (phone,))
result = cursor.fetchone()
if not result:
return False, 0
current_balance = result['current_balance']
if current_balance < amount:
return False, current_balance
new_balance = current_balance - amount
# Update user balance
cursor.execute('UPDATE users SET current_balance = ? WHERE phone = ?',
(new_balance, phone))
# Log spending
cursor.execute('''INSERT INTO spending_log
(phone, category, amount, description, balance_after, receipt_id)
VALUES (?, ?, ?, ?, ?, ?)''',
(phone, category, amount, description, new_balance, receipt_id))
# Handle recurring expenses
if is_recurring and recurrence_pattern:
next_occurrence = self._calculate_next_occurrence(datetime.now(), recurrence_pattern)
cursor.execute('''INSERT INTO expenses
(phone, category, spent, is_recurring, recurrence_pattern, next_occurrence)
VALUES (?, ?, ?, ?, ?, ?)''',
(phone, category, amount, True, recurrence_pattern, next_occurrence))
else:
# Update existing expense allocation
cursor.execute('''SELECT allocated, spent FROM expenses
WHERE phone = ? AND category = ? AND is_recurring = FALSE''',
(phone, category))
expense_result = cursor.fetchone()
if expense_result:
new_spent = expense_result['spent'] + amount
cursor.execute('''UPDATE expenses
SET spent = ? WHERE phone = ? AND category = ? AND is_recurring = FALSE''',
(new_spent, phone, category))
else:
cursor.execute('''INSERT INTO expenses (phone, category, spent)
VALUES (?, ?, ?)''',
(phone, category, amount))
conn.commit()
logger.info(f"💸 Expense recorded: {phone} - {category} - {amount}")
return True, new_balance
except Exception as e:
logger.error(f"Record expense error: {e}")
return False, 0
def _calculate_next_occurrence(self, current_date, pattern):
"""Calculate next occurrence for recurring expenses"""
if pattern == "Daily":
return current_date + timedelta(days=1)
elif pattern == "Weekly":
return current_date + timedelta(weeks=1)
elif pattern == "Monthly":
return current_date + relativedelta(months=1)
elif pattern == "Quarterly":
return current_date + relativedelta(months=3)
elif pattern == "Yearly":
return current_date + relativedelta(years=1)
return current_date
def record_investment(self, phone, inv_type, name, amount, notes):
"""Record investment with validation"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''INSERT INTO investments
(phone, type, name, amount, notes)
VALUES (?, ?, ?, ?, ?)''',
(phone, inv_type, name, amount, notes))
conn.commit()
logger.info(f"📈 Investment recorded: {phone} - {name}")
return True
except Exception as e:
logger.error(f"Record investment error: {e}")
return False
def get_investments(self, phone):
"""Get user investments"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT type, name, amount, date(date) as inv_date, notes
FROM investments
WHERE phone = ?
ORDER BY date DESC''', (phone,))
return [tuple(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Get investments error: {e}")
return []
def get_spending_log(self, phone, limit=50):
"""Get spending history"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT category, amount, description, date, balance_after
FROM spending_log
WHERE phone = ?
ORDER BY date DESC
LIMIT ?''', (phone, limit))
return [tuple(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Get spending log error: {e}")
return []
def save_receipt(self, phone, receipt_data):
"""Save receipt data to database"""
try:
receipt_id = f"REC-{phone[-4:]}-{uuid.uuid4().hex[:8]}"
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
# Safe JSON serialization
try:
extracted_data_json = json.dumps(receipt_data.get('extracted_data', {}))
except (TypeError, ValueError) as e:
logger.warning(f"JSON serialization warning: {e}")
extracted_data_json = "{}"
cursor.execute('''INSERT INTO receipts
(receipt_id, user_phone, image_path, processed_image_path,
merchant, amount, receipt_date, category, ocr_confidence,
raw_text, extracted_data, is_validated)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(receipt_id, phone,
receipt_data.get('image_path', ''),
receipt_data.get('processed_image_path', ''),
receipt_data.get('merchant', ''),
receipt_data.get('amount', 0.0),
receipt_data.get('date', ''),
receipt_data.get('category', ''),
receipt_data.get('confidence', 0.0),
receipt_data.get('raw_text', ''),
extracted_data_json,
receipt_data.get('is_validated', False)))
conn.commit()
logger.info(f"🧾 Receipt saved: {receipt_id}")
return receipt_id
except Exception as e:
logger.error(f"Save receipt error: {e}")
return None
def get_receipts(self, phone, limit=20):
"""Get user receipts"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''SELECT receipt_id, merchant, amount, receipt_date, category,
ocr_confidence, is_validated, created_at
FROM receipts
WHERE user_phone = ?
ORDER BY created_at DESC
LIMIT ?''', (phone, limit))
return [tuple(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Get receipts error: {e}")
return []
def update_receipt(self, receipt_id, updates):
"""Update receipt information safely"""
if not updates:
return False
try:
# Whitelist allowed columns for security
allowed_columns = {
'merchant': str,
'amount': (int, float),
'receipt_date': str,
'category': str,
'is_validated': bool,
'raw_text': str
}
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
set_clauses = []
values = []
for key, value in updates.items():
if key in allowed_columns:
# Type validation
expected_type = allowed_columns[key]
if isinstance(expected_type, tuple):
if not isinstance(value, expected_type):
continue
elif not isinstance(value, expected_type):
continue
set_clauses.append(f"{key} = ?")
values.append(value)
if set_clauses:
set_clause = ", ".join(set_clauses)
values.append(receipt_id)
cursor.execute(f'UPDATE receipts SET {set_clause} WHERE receipt_id = ?', values)
conn.commit()
logger.info(f"📝 Receipt updated: {receipt_id}")
return True
return False
except Exception as e:
logger.error(f"Update receipt error: {e}")
return False
def auto_categorize_receipt(self, phone, merchant, amount):
"""Auto-categorize based on patterns and history"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
# Check user's spending history for similar merchants
cursor.execute('''SELECT category, COUNT(*) as count
FROM spending_log
WHERE phone = ? AND (description LIKE ? OR description LIKE ?)
GROUP BY category
ORDER BY count DESC
LIMIT 1''',
(phone, f'%{merchant}%', f'%{merchant.split()[0] if merchant.split() else merchant}%'))
result = cursor.fetchone()
if result:
return result['category']
# Fallback to keyword-based categorization
return self._categorize_by_keywords(merchant)
except Exception as e:
logger.error(f"Auto categorize error: {e}")
return "Miscellaneous"
def _categorize_by_keywords(self, merchant):
"""Categorize based on merchant name keywords"""
if not merchant:
return "Miscellaneous"
merchant_lower = merchant.lower()
# Define keyword categories
categories = {
"Groceries": ['grocery', 'market', 'food', 'super', 'mart', 'store'],
"Dining Out": ['restaurant', 'cafe', 'pizza', 'burger', 'hotel', 'dining'],
"Transportation": ['gas', 'fuel', 'shell', 'bp', 'petrol', 'uber', 'taxi'],
"Healthcare": ['pharmacy', 'medical', 'hospital', 'clinic', 'doctor'],
"Utilities (Electricity/Water)": ['electric', 'water', 'utility', 'bill'],
"Entertainment": ['cinema', 'movie', 'game', 'entertainment']
}
for category, keywords in categories.items():
if any(keyword in merchant_lower for keyword in keywords):
return category
return "Miscellaneous"
# Family group methods
def create_family_group(self, group_name, admin_phone):
"""Create family group"""
try:
group_id = f"FG-{admin_phone[-4:]}-{uuid.uuid4().hex[:8]}"
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''INSERT INTO family_groups
(group_id, name, admin_phone)
VALUES (?, ?, ?)''',
(group_id, group_name, admin_phone))
cursor.execute('''UPDATE users
SET family_group = ?
WHERE phone = ?''',
(group_id, admin_phone))
conn.commit()
logger.info(f"👪 Family group created: {group_id}")
return group_id
except Exception as e:
logger.error(f"Create family group error: {e}")
return None
def join_family_group(self, phone, group_id):
"""Join existing family group"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
# Verify group exists
cursor.execute('SELECT name FROM family_groups WHERE group_id = ?', (group_id,))
if not cursor.fetchone():
return False
cursor.execute('UPDATE users SET family_group = ? WHERE phone = ?',
(group_id, phone))
conn.commit()
logger.info(f"👪 User joined family group: {phone} -> {group_id}")
return True
except Exception as e:
logger.error(f"Join family group error: {e}")
return False
def get_family_group(self, group_id):
"""Get family group info"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT name, admin_phone FROM family_groups WHERE group_id = ?',
(group_id,))
result = cursor.fetchone()
return tuple(result) if result else None
except Exception as e:
logger.error(f"Get family group error: {e}")
return None
def get_family_members(self, group_id):
"""Get family group members"""
try:
with self.db_lock, self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT phone, name FROM users WHERE family_group = ?',
(group_id,))
return [tuple(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Get family members error: {e}")
return []
# ========== D) ENHANCED TWILIO SERVICE ==========
class TwilioWhatsAppService:
"""Enhanced Twilio WhatsApp service with better error handling"""
def __init__(self):
self.account_sid = os.getenv('TWILIO_ACCOUNT_SID', 'your_account_sid_here')
self.auth_token = os.getenv('TWILIO_AUTH_TOKEN', 'your_auth_token_here')
self.whatsapp_number = 'whatsapp:+14155238886' # Twilio Sandbox
self.enabled = False
self.client = None
if (self.account_sid != 'your_account_sid_here' and
self.auth_token != 'your_auth_token_here' and
TWILIO_AVAILABLE):
try:
self.client = Client(self.account_sid, self.auth_token)
# Test connection
account = self.client.api.accounts(self.account_sid).fetch()
self.enabled = True
logger.info(f"✅ Twilio initialized: {account.friendly_name}")
except Exception as e:
logger.error(f"❌ Twilio initialization failed: {e}")
self.enabled = False
else:
logger.warning("⚠️ Twilio credentials not configured")
def send_whatsapp(self, phone, message):
"""Send WhatsApp message with comprehensive error handling"""
if not self.enabled or not self.client:
logger.info(f"📱 [DEMO MODE] WhatsApp to {phone}: {message[:50]}...")
return False
try:
# Format phone number
if not phone.startswith('+'):
phone = '+' + phone
to_whatsapp = f"whatsapp:{phone}"
# Send message
twilio_message = self.client.messages.create(
body=message[:1600], # WhatsApp message limit
from_=self.whatsapp_number,
to=to_whatsapp
)
logger.info(f"✅ WhatsApp sent: {twilio_message.sid}")
return True
except Exception as e:
error_msg = str(e).lower()
# Provide helpful error messages
if "not a valid phone number" in error_msg:
logger.error(f"❌ Invalid phone number format: {phone}")
elif "unverified" in error_msg or "sandbox" in error_msg:
logger.error(f"❌ WhatsApp not activated. User must send 'join catch-manner' to +14155238886")
elif "forbidden" in error_msg:
logger.error(f"❌ Twilio account issue. Check credentials and account status")
else:
logger.error(f"❌ WhatsApp send failed: {e}")
return False
# ========== E) HELPER FUNCTIONS ==========
def generate_spending_chart(phone, months=3):
"""Generate spending chart with error handling"""
try:
expenses = db.get_expenses(phone, months)
if not expenses:
return None
df = pd.DataFrame(expenses, columns=['Category', 'Allocated', 'Spent', 'Date', 'IsRecurring'])
df['Date'] = pd.to_datetime(df['Date'])
df['Month'] = df['Date'].dt.strftime('%Y-%m')
# Group by month and category
monthly_data = df.groupby(['Month', 'Category'])['Spent'].sum().unstack(fill_value=0)
# Create stacked bar chart
fig = go.Figure()
colors = px.colors.qualitative.Set3
for i, category in enumerate(monthly_data.columns):
fig.add_trace(go.Bar(
x=monthly_data.index,
y=monthly_data[category],
name=category,
marker_color=colors[i % len(colors)],
hovertemplate=f'{category}
Month: %{{x}}
Amount: %{{y:,}} PKR
Balance: %{y:,} PKR
Real-time balance monitoring with intelligent spending alerts and comprehensive financial insights
Get instant notifications for every expense, budget alert, and financial milestone directly on WhatsApp
Beautiful interactive charts and detailed insights to track spending patterns and financial trends
Revolutionary OCR technology to automatically extract expense data from receipt photos with high accuracy
Create family groups to collaboratively manage household finances, budgets, and shared expenses
Military-grade encryption, secure authentication, and privacy-first design to protect your financial data
Get instant notifications for all your financial activities. Follow these simple steps:
Add this Twilio WhatsApp Sandbox number to your contacts:
Save as "FinGenius Bot" in your phone
Send this exact message to the number above:
⚠️ Critical: You must send this exact code to activate the sandbox.
After sending the code, register your FinGenius account with the same phone number you used to message the bot.
The phone numbers must match exactly for notifications to work.
You'll receive instant WhatsApp notifications for:
Your comprehensive financial management solution is ready. Let's build your financial future together!
Set your monthly income and savings goals to create a personalized budget plan
50% needs, 30% wants, 20% savings & debt repayment
Aim for 3-6 months of expenses in emergency savings
Consider investing 10-15% of income for long-term growth
Distribute your monthly income across different expense categories for optimal financial management
Transform your receipt photos into digital expense records with advanced OCR technology!
Take or upload receipt image
Extract data automatically
Review and confirm details
Log your daily expenses with intelligent categorization and recurring expense management
Review all your financial transactions with detailed information and balance tracking
Identify spending habits and trends over time
See how each transaction affected your balance
Understand where your money goes each month
Track your investments and build long-term wealth with comprehensive portfolio monitoring
Create family groups to manage household budgets, shared expenses, and financial goals together
All family members can contribute to expense tracking and budget management
Everyone can see family spending patterns and financial goals
Work together towards shared financial objectives and savings targets