import os
import re
import sqlite3
import time
import hashlib
import base64
import json
from datetime import datetime, timedelta
from io import BytesIO
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import gradio as gr
from dateutil.relativedelta import relativedelta
# Image processing imports
try:
from PIL import Image, ImageEnhance, ImageFilter
import cv2
import numpy as np
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
print("⚠️ PIL/OpenCV not installed. Run: pip install Pillow opencv-python")
# OCR imports
try:
import pytesseract
TESSERACT_AVAILABLE = True
except ImportError:
TESSERACT_AVAILABLE = False
print("⚠️ Pytesseract not installed. Run: pip install pytesseract")
# Google Vision API (optional)
try:
from google.cloud import vision
VISION_API_AVAILABLE = True
except ImportError:
VISION_API_AVAILABLE = False
print("⚠️ Google Vision API not available. Install with: pip install google-cloud-vision")
# Twilio Integration
try:
from twilio.rest import Client
TWILIO_AVAILABLE = True
except ImportError:
TWILIO_AVAILABLE = False
print("⚠️ Twilio not installed. Run: pip install twilio")
# Constants
EXPENSE_CATEGORIES = [
"Housing (Rent/Mortgage)",
"Utilities (Electricity/Water)",
"Groceries",
"Dining Out",
"Transportation",
"Healthcare",
"Entertainment",
"Education",
"Personal Care",
"Debt Payments",
"Savings",
"Investments",
"Charity",
"Miscellaneous"
]
INVESTMENT_TYPES = [
"Stocks",
"Bonds",
"Mutual Funds",
"Real Estate",
"Cryptocurrency",
"Retirement Accounts",
"Other"
]
RECURRENCE_PATTERNS = [
"Daily",
"Weekly",
"Monthly",
"Quarterly",
"Yearly"
]
# Rate limiting setup
MAX_ATTEMPTS = 5
ATTEMPT_WINDOW = 300 # 5 minutes in seconds
# Receipt processing constants
RECEIPTS_DIR = "receipts"
if not os.path.exists(RECEIPTS_DIR):
os.makedirs(RECEIPTS_DIR)
# Security functions
def hash_password(password):
"""Hash password using SHA-256 with salt"""
salt = "fingenius_secure_salt_2024"
return hashlib.sha256((password + salt).encode()).hexdigest()
def verify_password(password, hashed):
"""Verify password against hash"""
return hash_password(password) == hashed
# ========== A) IMAGE PROCESSING FUNCTIONS ==========
class ImageProcessor:
"""Handles image preprocessing for better OCR results"""
@staticmethod
def process_receipt_image(image_file, phone):
"""
Complete receipt processing pipeline that handles Gradio file objects
Returns: (success, status_message, extracted_data, image_preview)
"""
try:
if not phone:
return False, "❌ Please sign in first", {}, None
if not image_file:
return False, "❌ No image uploaded", {}, None
# Debug input type
print(f"\n📁 Input type: {type(image_file)}")
# Handle different input types
if isinstance(image_file, str):
# Case 1: Direct file path (local testing)
image_path = image_file
elif hasattr(image_file, 'name'):
# Case 2: Gradio NamedString object (Hugging Face Spaces)
image_path = image_file.name
else:
return False, "❌ Unsupported file input type", {}, None
# Create receipts directory if needed
os.makedirs(RECEIPTS_DIR, exist_ok=True)
# Generate unique filename
timestamp = int(time.time())
filename = f"receipt_{phone}_{timestamp}{os.path.splitext(image_path)[1]}"
save_path = os.path.join(RECEIPTS_DIR, filename)
# Copy the uploaded file (works for both Gradio and direct paths)
with open(image_path, 'rb') as src, open(save_path, 'wb') as dst:
dst.write(src.read())
print(f"📄 Saved receipt to: {save_path}")
# Preprocess image
processed_path, preprocessing_info = ImageProcessor.preprocess_receipt_image(save_path)
print(f"🖼️ Preprocessing: {preprocessing_info}")
# Extract text using OCR
raw_text, confidence, extracted_data = ocr_service.extract_text_from_receipt(processed_path)
print(f"🔍 OCR Confidence: {confidence:.1%}")
# Auto-categorize
if extracted_data.get('merchant'):
suggested_category = db.auto_categorize_receipt(
phone,
extracted_data['merchant'],
extracted_data.get('total_amount', 0)
)
extracted_data['suggested_category'] = suggested_category
print(f"🏷️ Suggested category: {suggested_category}")
# Prepare receipt data for database
receipt_data = {
'image_path': save_path,
'processed_image_path': processed_path,
'merchant': extracted_data.get('merchant', ''),
'amount': extracted_data.get('total_amount', 0.0),
'date': extracted_data.get('date', ''),
'category': extracted_data.get('suggested_category', 'Miscellaneous'),
'confidence': confidence,
'raw_text': raw_text,
'extracted_data': extracted_data,
'is_validated': False
}
# Save to database
receipt_id = db.save_receipt(phone, receipt_data)
extracted_data['receipt_id'] = receipt_id
print(f"💾 Saved to DB with ID: {receipt_id}")
# Create image preview
try:
image_preview = Image.open(save_path)
image_preview.thumbnail((400, 600)) # Resize for display
except Exception as e:
print(f"⚠️ Preview generation failed: {e}")
image_preview = None
status_msg = f"✅ Receipt processed! Confidence: {confidence:.1%}"
if confidence < 0.7:
status_msg += " ⚠️ Low confidence - please verify"
return True, status_msg, extracted_data, image_preview
except Exception as e:
print(f"❌ Processing error: {traceback.format_exc()}")
return False, f"❌ Processing failed: {str(e)}", {}, None
@staticmethod
def extract_text_regions(image_path):
"""Extract text regions from receipt image"""
try:
if not PIL_AVAILABLE:
return []
image = Image.open(image_path)
# This is a simplified version - in production, you'd use more advanced techniques
# to detect and extract specific regions (header, items, total, etc.)
return ["Full image processed"]
except Exception as e:
print(f"Text region extraction error: {e}")
return []
# ========== B) OCR SERVICE CLASS ==========
class OCRService:
"""Handles OCR processing with multiple backends"""
def __init__(self):
self.tesseract_available = TESSERACT_AVAILABLE
self.vision_api_available = VISION_API_AVAILABLE and os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
# Initialize Google Vision client if available
if self.vision_api_available:
try:
self.vision_client = vision.ImageAnnotatorClient()
except Exception as e:
print(f"Google Vision API initialization failed: {e}")
self.vision_api_available = False
def extract_text_from_receipt(self, image_path):
"""
Extract text from receipt using available OCR service
Returns: (raw_text, confidence_score, extracted_data)
"""
try:
# Try Google Vision API first if available
if self.vision_api_available:
return self._extract_with_vision_api(image_path)
# Fallback to Tesseract
elif self.tesseract_available:
return self._extract_with_tesseract(image_path)
else:
return "OCR not available", 0.0, self._create_empty_data()
except Exception as e:
print(f"OCR extraction error: {e}")
return f"OCR failed: {str(e)}", 0.0, self._create_empty_data()
def _extract_with_vision_api(self, image_path):
"""Extract text using Google Vision API"""
try:
with open(image_path, 'rb') as image_file:
content = image_file.read()
image = vision.Image(content=content)
response = self.vision_client.text_detection(image=image)
texts = response.text_annotations
if texts:
raw_text = texts[0].description
confidence = min([vertex.confidence for vertex in texts if hasattr(vertex, 'confidence')] or [0.8])
extracted_data = self._parse_receipt_text(raw_text)
return raw_text, confidence, extracted_data
else:
return "No text detected", 0.0, self._create_empty_data()
except Exception as e:
print(f"Vision API error: {e}")
return f"Vision API failed: {str(e)}", 0.0, self._create_empty_data()
def _extract_with_tesseract(self, image_path):
"""Extract text using Tesseract OCR"""
try:
# Preprocess image first
processed_path, _ = ImageProcessor.preprocess_receipt_image(image_path)
# Extract text with Tesseract
raw_text = pytesseract.image_to_string(
Image.open(processed_path),
config='--oem 3 --psm 6' # OCR Engine Mode 3, Page Segmentation Mode 6
)
# Get confidence data
data = pytesseract.image_to_data(Image.open(processed_path), output_type=pytesseract.Output.DICT)
confidences = [int(conf) for conf in data['conf'] if int(conf) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0
extracted_data = self._parse_receipt_text(raw_text)
return raw_text, avg_confidence / 100.0, extracted_data
except Exception as e:
print(f"Tesseract error: {e}")
return f"Tesseract failed: {str(e)}", 0.0, self._create_empty_data()
def _parse_receipt_text(self, raw_text):
"""Parse raw OCR text to extract structured data"""
extracted_data = self._create_empty_data()
lines = raw_text.split('\n')
# Extract merchant name (usually first non-empty line)
for line in lines:
if line.strip() and len(line.strip()) > 2:
extracted_data['merchant'] = line.strip()
break
# Extract date using regex patterns
date_patterns = [
r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}',
r'\d{4}[/-]\d{1,2}[/-]\d{1,2}',
r'\d{1,2}\s+\w+\s+\d{4}'
]
for line in lines:
for pattern in date_patterns:
match = re.search(pattern, line)
if match:
extracted_data['date'] = match.group()
break
if extracted_data['date']:
break
# Extract total amount
amount_patterns = [
r'total[:\s]*\$?(\d+\.?\d*)',
r'amount[:\s]*\$?(\d+\.?\d*)',
r'sum[:\s]*\$?(\d+\.?\d*)',
r'\$(\d+\.?\d*)'
]
for line in lines:
line_lower = line.lower()
for pattern in amount_patterns:
match = re.search(pattern, line_lower)
if match:
try:
amount = float(match.group(1))
if amount > 0:
extracted_data['total_amount'] = amount
break
except ValueError:
continue
if extracted_data['total_amount']:
break
# Extract line items (simplified approach)
line_items = []
for line in lines:
# Look for lines with item and price pattern
item_match = re.search(r'(.+?)\s+(\d+\.?\d*)', line)
if item_match and len(item_match.group(1)) > 2:
try:
item_name = item_match.group(1).strip()
item_price = float(item_match.group(2))
if item_price > 0:
line_items.append([item_name, item_price])
except ValueError:
continue
extracted_data['line_items'] = line_items[:10] # Limit to 10 items
return extracted_data
def _create_empty_data(self):
"""Create empty extracted data structure"""
return {
'merchant': '',
'date': '',
'total_amount': 0.0,
'line_items': []
}
# ========== C) ENHANCED DATABASE SERVICE ==========
class DatabaseService:
def __init__(self, db_name='fin_genius.db'):
self.conn = sqlite3.connect(db_name, check_same_thread=False)
self.cursor = self.conn.cursor()
self._initialize_db()
def _initialize_db(self):
# Existing tables...
self.cursor.execute('''CREATE TABLE IF NOT EXISTS users
(phone TEXT PRIMARY KEY,
name TEXT,
password_hash TEXT,
monthly_income INTEGER DEFAULT 0,
savings_goal INTEGER DEFAULT 0,
current_balance INTEGER DEFAULT 0,
is_verified BOOLEAN DEFAULT FALSE,
family_group TEXT DEFAULT NULL,
last_balance_alert TIMESTAMP DEFAULT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
self.cursor.execute('''CREATE TABLE IF NOT EXISTS expenses
(id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT,
category TEXT,
allocated INTEGER DEFAULT 0,
spent INTEGER DEFAULT 0,
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_recurring BOOLEAN DEFAULT FALSE,
recurrence_pattern TEXT DEFAULT NULL,
next_occurrence TIMESTAMP DEFAULT NULL,
FOREIGN KEY(phone) REFERENCES users(phone))''')
self.cursor.execute('''CREATE TABLE IF NOT EXISTS spending_log
(id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT,
category TEXT,
amount INTEGER,
description TEXT,
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
balance_after INTEGER,
receipt_id TEXT DEFAULT NULL,
FOREIGN KEY(phone) REFERENCES users(phone))''')
self.cursor.execute('''CREATE TABLE IF NOT EXISTS investments
(id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT,
type TEXT,
name TEXT,
amount INTEGER,
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
notes TEXT,
FOREIGN KEY(phone) REFERENCES users(phone))''')
self.cursor.execute('''CREATE TABLE IF NOT EXISTS auth_attempts
(phone TEXT PRIMARY KEY,
attempts INTEGER DEFAULT 1,
last_attempt TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
self.cursor.execute('''CREATE TABLE IF NOT EXISTS family_groups
(group_id TEXT PRIMARY KEY,
name TEXT,
admin_phone TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
self.cursor.execute('''CREATE TABLE IF NOT EXISTS alerts
(id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT,
alert_type TEXT,
message TEXT,
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(phone) REFERENCES users(phone))''')
# NEW: Receipts table
self.cursor.execute('''CREATE TABLE IF NOT EXISTS receipts
(receipt_id TEXT PRIMARY KEY,
user_phone TEXT,
image_path TEXT,
processed_image_path TEXT,
merchant TEXT,
amount REAL,
receipt_date TEXT,
category TEXT,
ocr_confidence REAL,
raw_text TEXT,
extracted_data TEXT,
is_validated BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_phone) REFERENCES users(phone))''')
self.conn.commit()
# Existing methods remain the same...
def get_user(self, phone):
self.cursor.execute('''SELECT name, monthly_income, savings_goal, family_group, current_balance
FROM users WHERE phone=?''', (phone,))
return self.cursor.fetchone()
def authenticate_user(self, phone, password):
self.cursor.execute('''SELECT name, password_hash FROM users WHERE phone=?''', (phone,))
result = self.cursor.fetchone()
if result and verify_password(password, result[1]):
return result[0]
return None
def create_user(self, phone, name, password):
try:
password_hash = hash_password(password)
self.cursor.execute('''INSERT INTO users (phone, name, password_hash, current_balance) VALUES (?, ?, ?, ?)''',
(phone, name, password_hash, 0))
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def update_user_balance(self, phone, new_balance):
self.cursor.execute('''UPDATE users SET current_balance=? WHERE phone=?''',
(new_balance, phone))
self.conn.commit()
def get_current_balance(self, phone):
self.cursor.execute('''SELECT current_balance FROM users WHERE phone=?''', (phone,))
result = self.cursor.fetchone()
return result[0] if result else 0
def add_income(self, phone, amount, description="Income added"):
current_balance = self.get_current_balance(phone)
new_balance = current_balance + amount
self.cursor.execute('''INSERT INTO spending_log
(phone, category, amount, description, balance_after)
VALUES (?, ?, ?, ?, ?)''',
(phone, "Income", -amount, description, new_balance))
self.update_user_balance(phone, new_balance)
self.conn.commit()
return new_balance
def update_financials(self, phone, income, savings):
self.cursor.execute('''UPDATE users
SET monthly_income=?, savings_goal=?
WHERE phone=?''',
(income, savings, phone))
self.conn.commit()
def get_expenses(self, phone, months_back=3):
end_date = datetime.now()
start_date = end_date - relativedelta(months=months_back)
self.cursor.execute('''SELECT category, allocated, spent, date(date) as exp_date, is_recurring
FROM expenses
WHERE phone=? AND date BETWEEN ? AND ?
ORDER BY allocated DESC''',
(phone, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d')))
return self.cursor.fetchall()
def update_expense_allocations(self, phone, allocations):
self.cursor.execute('''DELETE FROM expenses WHERE phone=? AND allocated > 0 AND is_recurring=FALSE''', (phone,))
for category, alloc in zip(EXPENSE_CATEGORIES, allocations):
if alloc > 0:
self.cursor.execute('''INSERT INTO expenses
(phone, category, allocated)
VALUES (?, ?, ?)''',
(phone, category, alloc))
self.conn.commit()
def log_spending(self, phone, category, amount, description="", receipt_id=None):
current_balance = self.get_current_balance(phone)
new_balance = current_balance - amount
self.cursor.execute('''INSERT INTO spending_log
(phone, category, amount, description, balance_after, receipt_id)
VALUES (?, ?, ?, ?, ?, ?)''',
(phone, category, amount, description, new_balance, receipt_id))
self.update_user_balance(phone, new_balance)
self.conn.commit()
return new_balance
def record_expense(self, phone, category, amount, description="", is_recurring=False, recurrence_pattern=None, receipt_id=None):
new_balance = self.log_spending(phone, category, amount, description, receipt_id)
self.cursor.execute('''SELECT allocated, spent
FROM expenses
WHERE phone=? AND category=? AND is_recurring=FALSE''',
(phone, category))
result = self.cursor.fetchone()
if is_recurring:
next_occurrence = self._calculate_next_occurrence(datetime.now(), recurrence_pattern)
self.cursor.execute('''INSERT INTO expenses
(phone, category, spent, is_recurring, recurrence_pattern, next_occurrence)
VALUES (?, ?, ?, ?, ?, ?)''',
(phone, category, amount, True, recurrence_pattern, next_occurrence))
elif result:
alloc, spent = result
new_spent = spent + amount
self.cursor.execute('''UPDATE expenses
SET spent=?
WHERE phone=? AND category=? AND is_recurring=FALSE''',
(new_spent, phone, category))
else:
self.cursor.execute('''INSERT INTO expenses
(phone, category, spent)
VALUES (?, ?, ?)''',
(phone, category, amount))
self.conn.commit()
return True, new_balance
def _calculate_next_occurrence(self, current_date, pattern):
if pattern == "Daily":
return current_date + timedelta(days=1)
elif pattern == "Weekly":
return current_date + timedelta(weeks=1)
elif pattern == "Monthly":
return current_date + relativedelta(months=1)
elif pattern == "Quarterly":
return current_date + relativedelta(months=3)
elif pattern == "Yearly":
return current_date + relativedelta(years=1)
return current_date
def record_investment(self, phone, inv_type, name, amount, notes):
self.cursor.execute('''INSERT INTO investments
(phone, type, name, amount, notes)
VALUES (?, ?, ?, ?, ?)''',
(phone, inv_type, name, amount, notes))
self.conn.commit()
return True
def get_investments(self, phone):
self.cursor.execute('''SELECT type, name, amount, date(date) as inv_date, notes
FROM investments
WHERE phone=?
ORDER BY date DESC''', (phone,))
return self.cursor.fetchall()
def get_spending_log(self, phone, limit=50):
self.cursor.execute('''SELECT category, amount, description, date, balance_after
FROM spending_log
WHERE phone=?
ORDER BY date DESC
LIMIT ?''', (phone, limit))
return self.cursor.fetchall()
def create_family_group(self, group_name, admin_phone):
group_id = f"FG-{admin_phone[-4:]}-{int(time.time())}"
try:
self.cursor.execute('''INSERT INTO family_groups
(group_id, name, admin_phone)
VALUES (?, ?, ?)''',
(group_id, group_name, admin_phone))
self.cursor.execute('''UPDATE users
SET family_group=?
WHERE phone=?''',
(group_id, admin_phone))
self.conn.commit()
return group_id
except sqlite3.IntegrityError:
return None
def join_family_group(self, phone, group_id):
self.cursor.execute('''UPDATE users
SET family_group=?
WHERE phone=?''',
(group_id, phone))
self.conn.commit()
return True
def get_family_group(self, group_id):
self.cursor.execute('''SELECT name, admin_phone FROM family_groups WHERE group_id=?''', (group_id,))
return self.cursor.fetchone()
def get_family_members(self, group_id):
self.cursor.execute('''SELECT phone, name FROM users WHERE family_group=?''', (group_id,))
return self.cursor.fetchall()
# NEW: Receipt-related methods
def save_receipt(self, phone, receipt_data):
"""Save receipt data to database"""
receipt_id = f"REC-{phone[-4:]}-{int(time.time())}"
try:
self.cursor.execute('''INSERT INTO receipts
(receipt_id, user_phone, image_path, processed_image_path,
merchant, amount, receipt_date, category, ocr_confidence,
raw_text, extracted_data, is_validated)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(receipt_id, phone, receipt_data.get('image_path', ''),
receipt_data.get('processed_image_path', ''),
receipt_data.get('merchant', ''),
receipt_data.get('amount', 0.0),
receipt_data.get('date', ''),
receipt_data.get('category', ''),
receipt_data.get('confidence', 0.0),
receipt_data.get('raw_text', ''),
json.dumps(receipt_data.get('extracted_data', {})),
receipt_data.get('is_validated', False)))
self.conn.commit()
return receipt_id
except sqlite3.Error as e:
print(f"Database error saving receipt: {e}")
return None
def get_receipts(self, phone, limit=20):
"""Get user's receipts"""
self.cursor.execute('''SELECT receipt_id, merchant, amount, receipt_date, category,
ocr_confidence, is_validated, created_at
FROM receipts
WHERE user_phone=?
ORDER BY created_at DESC
LIMIT ?''', (phone, limit))
return self.cursor.fetchall()
def update_receipt(self, receipt_id, updates):
"""Update receipt information"""
set_clause = ", ".join([f"{key}=?" for key in updates.keys()])
values = list(updates.values()) + [receipt_id]
self.cursor.execute(f'''UPDATE receipts SET {set_clause} WHERE receipt_id=?''', values)
self.conn.commit()
def auto_categorize_receipt(self, phone, merchant, amount):
"""Auto-categorize based on user's spending patterns"""
# Get user's most common category for similar merchants
self.cursor.execute('''SELECT category, COUNT(*) as count
FROM spending_log
WHERE phone=? AND (description LIKE ? OR description LIKE ?)
GROUP BY category
ORDER BY count DESC
LIMIT 1''',
(phone, f'%{merchant}%', f'%{merchant.split()[0]}%'))
result = self.cursor.fetchone()
if result:
return result[0]
# Fallback categorization based on merchant keywords
merchant_lower = merchant.lower()
if any(word in merchant_lower for word in ['grocery', 'market', 'food', 'super']):
return "Groceries"
elif any(word in merchant_lower for word in ['restaurant', 'cafe', 'pizza', 'burger']):
return "Dining Out"
elif any(word in merchant_lower for word in ['gas', 'fuel', 'shell', 'bp']):
return "Transportation"
elif any(word in merchant_lower for word in ['pharmacy', 'medical', 'hospital']):
return "Healthcare"
else:
return "Miscellaneous"
# Fixed Twilio WhatsApp Service
class TwilioWhatsAppService:
def __init__(self):
# Set your Twilio credentials here
self.account_sid = os.getenv('TWILIO_ACCOUNT_SID', 'your_account_sid_here')
self.auth_token = os.getenv('TWILIO_AUTH_TOKEN', 'your_auth_token_here')
# For Twilio Sandbox - use this number
self.whatsapp_number = 'whatsapp:+14155238886'
# For production Twilio (after approval) - use your approved number
# self.whatsapp_number = 'whatsapp:+1234567890' # Your approved WhatsApp Business number
print(f"🔧 Twilio Configuration:")
print(f" Account SID: {self.account_sid[:10]}... (masked)")
print(f" Auth Token: {'*' * len(self.auth_token) if self.auth_token != 'your_auth_token_here' else 'Not set'}")
print(f" WhatsApp Number: {self.whatsapp_number}")
if self.account_sid != 'your_account_sid_here' and self.auth_token != 'your_auth_token_here' and TWILIO_AVAILABLE:
try:
self.client = Client(self.account_sid, self.auth_token)
# Test the connection by getting account info
account = self.client.api.accounts(self.account_sid).fetch()
print(f"✅ Twilio WhatsApp Service initialized successfully")
print(f" Account Status: {account.status}")
print(f" Account Name: {account.friendly_name}")
self.enabled = True
except Exception as e:
print(f"❌ Failed to initialize Twilio: {e}")
print(f" Please check your Account SID and Auth Token")
self.client = None
self.enabled = False
else:
print("❌ Twilio credentials not configured or Twilio not installed")
print(" Please set TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN environment variables")
print(" Or modify the credentials directly in the code")
self.client = None
self.enabled = False
def send_whatsapp(self, phone, message):
"""Send WhatsApp message with proper error handling"""
if not self.enabled or not self.client:
print(f"📱 [DEMO MODE] WhatsApp to {phone}: {message}")
return False
try:
# Ensure phone number has correct format
if not phone.startswith('+'):
phone = '+' + phone
to_whatsapp = f"whatsapp:{phone}"
print(f"📤 Attempting to send WhatsApp message:")
print(f" From: {self.whatsapp_number}")
print(f" To: {to_whatsapp}")
print(f" Message: {message[:50]}...")
twilio_message = self.client.messages.create(
body=message,
from_=self.whatsapp_number,
to=to_whatsapp
)
print(f"✅ WhatsApp sent successfully!")
print(f" Message SID: {twilio_message.sid}")
print(f" Status: {twilio_message.status}")
return True
except Exception as e:
print(f"❌ Failed to send WhatsApp to {phone}")
print(f" Error: {str(e)}")
print(f" Error Type: {type(e).__name__}")
# Common error messages and solutions
if "not a valid phone number" in str(e).lower():
print(f" 💡 Solution: Check phone number format. Should be +country_code + number")
elif "unverified" in str(e).lower():
print(f" 💡 Solution: For Twilio Sandbox, recipient must send 'join catch-manner' to +14155238886 first")
elif "forbidden" in str(e).lower():
print(f" 💡 Solution: Check your Twilio credentials and account status")
elif "unauthorized" in str(e).lower():
print(f" 💡 Solution: Verify your Account SID and Auth Token are correct")
print(f"📱 [FALLBACK] Message content: {message}")
return False
# Initialize services
db = DatabaseService()
twilio = TwilioWhatsAppService()
ocr_service = OCRService()
# Helper functions (unchanged)
def validate_phone_number(phone):
pattern = r'^\+\d{1,3}\d{6,14}$' # Added missing closing quote and $
return re.match(pattern, phone) is not None
def validate_password(password):
if len(password) < 6:
return False, "Password must be at least 6 characters long"
if not re.search(r'[A-Za-z]', password):
return False, "Password must contain at least one letter"
if not re.search(r'\d', password):
return False, "Password must contain at least one number"
return True, "Password is valid"
def format_currency(amount):
return f"{int(amount):,} PKR" if amount else "0 PKR"
def generate_spending_chart(phone, months=3):
expenses = db.get_expenses(phone, months)
if not expenses:
return None
df = pd.DataFrame(expenses, columns=['Category', 'Allocated', 'Spent', 'Date', 'IsRecurring'])
df['Date'] = pd.to_datetime(df['Date'])
df['Month'] = df['Date'].dt.strftime('%Y-%m')
monthly_data = df.groupby(['Month', 'Category'])['Spent'].sum().unstack().fillna(0)
fig = go.Figure()
colors = px.colors.qualitative.Set3
for i, category in enumerate(monthly_data.columns):
fig.add_trace(go.Bar(
x=monthly_data.index,
y=monthly_data[category],
name=category,
marker_color=colors[i % len(colors)],
hoverinfo='y+name',
textposition='auto'
))
fig.update_layout(
barmode='stack',
title=f'📊 Spending Trends (Last {months} Months)',
xaxis_title='Month',
yaxis_title='Amount (PKR)',
height=500,
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)'
)
return fig
def generate_balance_chart(phone):
spending_log = db.get_spending_log(phone, 100)
if not spending_log:
return None
df = pd.DataFrame(spending_log, columns=['Category', 'Amount', 'Description', 'Date', 'Balance'])
df['Date'] = pd.to_datetime(df['Date'])
df = df.sort_values('Date')
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df['Date'],
y=df['Balance'],
mode='lines+markers',
name='Balance',
line=dict(color='#00CC96', width=3),
marker=dict(size=6),
hovertemplate='Date: %{x}
Balance: %{y:,} PKR
Real-time balance monitoring with intelligent spending alerts
Get instant notifications for every expense and budget alert
Beautiful charts and insights to track your spending patterns
AI-powered OCR to automatically extract expense data from receipts
Create family groups to manage household finances together
Password-protected accounts with encrypted data storage
To receive instant notifications for your financial activities, follow these steps:
Add this Twilio WhatsApp Sandbox number to your contacts:
Send this exact message to the number above:
⚠️ Important: You must send this exact code to activate the sandbox.
After sending the code, register your FinGenius account with the same phone number you used to message the bot.
You'll receive instant WhatsApp notifications for:
Your personal finance management just got smarter. Start by adding some balance and setting up your budget allocations.
Upload receipt photos and let AI extract expense data automatically!