import os import re import sqlite3 import time import hashlib import base64 import json from datetime import datetime, timedelta from io import BytesIO import pandas as pd import plotly.express as px import plotly.graph_objects as go import gradio as gr from dateutil.relativedelta import relativedelta # Image processing imports try: from PIL import Image, ImageEnhance, ImageFilter import cv2 import numpy as np PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False print("⚠️ PIL/OpenCV not installed. Run: pip install Pillow opencv-python") # OCR imports try: import pytesseract TESSERACT_AVAILABLE = True except ImportError: TESSERACT_AVAILABLE = False print("⚠️ Pytesseract not installed. Run: pip install pytesseract") # Google Vision API (optional) try: from google.cloud import vision VISION_API_AVAILABLE = True except ImportError: VISION_API_AVAILABLE = False print("⚠️ Google Vision API not available. Install with: pip install google-cloud-vision") # Twilio Integration try: from twilio.rest import Client TWILIO_AVAILABLE = True except ImportError: TWILIO_AVAILABLE = False print("⚠️ Twilio not installed. Run: pip install twilio") # Constants EXPENSE_CATEGORIES = [ "Housing (Rent/Mortgage)", "Utilities (Electricity/Water)", "Groceries", "Dining Out", "Transportation", "Healthcare", "Entertainment", "Education", "Personal Care", "Debt Payments", "Savings", "Investments", "Charity", "Miscellaneous" ] INVESTMENT_TYPES = [ "Stocks", "Bonds", "Mutual Funds", "Real Estate", "Cryptocurrency", "Retirement Accounts", "Other" ] RECURRENCE_PATTERNS = [ "Daily", "Weekly", "Monthly", "Quarterly", "Yearly" ] # Rate limiting setup MAX_ATTEMPTS = 5 ATTEMPT_WINDOW = 300 # 5 minutes in seconds # Receipt processing constants RECEIPTS_DIR = "receipts" if not os.path.exists(RECEIPTS_DIR): os.makedirs(RECEIPTS_DIR) # Security functions def hash_password(password): """Hash password using SHA-256 with salt""" salt = "fingenius_secure_salt_2024" return hashlib.sha256((password + salt).encode()).hexdigest() def verify_password(password, hashed): """Verify password against hash""" return hash_password(password) == hashed # ========== A) IMAGE PROCESSING FUNCTIONS ========== class ImageProcessor: """Handles image preprocessing for better OCR results""" @staticmethod def process_receipt_image(image_file, phone): """ Complete receipt processing pipeline that handles Gradio file objects Returns: (success, status_message, extracted_data, image_preview) """ try: if not phone: return False, "❌ Please sign in first", {}, None if not image_file: return False, "❌ No image uploaded", {}, None # Debug input type print(f"\n📁 Input type: {type(image_file)}") # Handle different input types if isinstance(image_file, str): # Case 1: Direct file path (local testing) image_path = image_file elif hasattr(image_file, 'name'): # Case 2: Gradio NamedString object (Hugging Face Spaces) image_path = image_file.name else: return False, "❌ Unsupported file input type", {}, None # Create receipts directory if needed os.makedirs(RECEIPTS_DIR, exist_ok=True) # Generate unique filename timestamp = int(time.time()) filename = f"receipt_{phone}_{timestamp}{os.path.splitext(image_path)[1]}" save_path = os.path.join(RECEIPTS_DIR, filename) # Copy the uploaded file (works for both Gradio and direct paths) with open(image_path, 'rb') as src, open(save_path, 'wb') as dst: dst.write(src.read()) print(f"📄 Saved receipt to: {save_path}") # Preprocess image processed_path, preprocessing_info = ImageProcessor.preprocess_receipt_image(save_path) print(f"🖼️ Preprocessing: {preprocessing_info}") # Extract text using OCR raw_text, confidence, extracted_data = ocr_service.extract_text_from_receipt(processed_path) print(f"🔍 OCR Confidence: {confidence:.1%}") # Auto-categorize if extracted_data.get('merchant'): suggested_category = db.auto_categorize_receipt( phone, extracted_data['merchant'], extracted_data.get('total_amount', 0) ) extracted_data['suggested_category'] = suggested_category print(f"🏷️ Suggested category: {suggested_category}") # Prepare receipt data for database receipt_data = { 'image_path': save_path, 'processed_image_path': processed_path, 'merchant': extracted_data.get('merchant', ''), 'amount': extracted_data.get('total_amount', 0.0), 'date': extracted_data.get('date', ''), 'category': extracted_data.get('suggested_category', 'Miscellaneous'), 'confidence': confidence, 'raw_text': raw_text, 'extracted_data': extracted_data, 'is_validated': False } # Save to database receipt_id = db.save_receipt(phone, receipt_data) extracted_data['receipt_id'] = receipt_id print(f"💾 Saved to DB with ID: {receipt_id}") # Create image preview try: image_preview = Image.open(save_path) image_preview.thumbnail((400, 600)) # Resize for display except Exception as e: print(f"⚠️ Preview generation failed: {e}") image_preview = None status_msg = f"✅ Receipt processed! Confidence: {confidence:.1%}" if confidence < 0.7: status_msg += " ⚠️ Low confidence - please verify" return True, status_msg, extracted_data, image_preview except Exception as e: print(f"❌ Processing error: {traceback.format_exc()}") return False, f"❌ Processing failed: {str(e)}", {}, None @staticmethod def extract_text_regions(image_path): """Extract text regions from receipt image""" try: if not PIL_AVAILABLE: return [] image = Image.open(image_path) # This is a simplified version - in production, you'd use more advanced techniques # to detect and extract specific regions (header, items, total, etc.) return ["Full image processed"] except Exception as e: print(f"Text region extraction error: {e}") return [] # ========== B) OCR SERVICE CLASS ========== class OCRService: """Handles OCR processing with multiple backends""" def __init__(self): self.tesseract_available = TESSERACT_AVAILABLE self.vision_api_available = VISION_API_AVAILABLE and os.getenv('GOOGLE_APPLICATION_CREDENTIALS') # Initialize Google Vision client if available if self.vision_api_available: try: self.vision_client = vision.ImageAnnotatorClient() except Exception as e: print(f"Google Vision API initialization failed: {e}") self.vision_api_available = False def extract_text_from_receipt(self, image_path): """ Extract text from receipt using available OCR service Returns: (raw_text, confidence_score, extracted_data) """ try: # Try Google Vision API first if available if self.vision_api_available: return self._extract_with_vision_api(image_path) # Fallback to Tesseract elif self.tesseract_available: return self._extract_with_tesseract(image_path) else: return "OCR not available", 0.0, self._create_empty_data() except Exception as e: print(f"OCR extraction error: {e}") return f"OCR failed: {str(e)}", 0.0, self._create_empty_data() def _extract_with_vision_api(self, image_path): """Extract text using Google Vision API""" try: with open(image_path, 'rb') as image_file: content = image_file.read() image = vision.Image(content=content) response = self.vision_client.text_detection(image=image) texts = response.text_annotations if texts: raw_text = texts[0].description confidence = min([vertex.confidence for vertex in texts if hasattr(vertex, 'confidence')] or [0.8]) extracted_data = self._parse_receipt_text(raw_text) return raw_text, confidence, extracted_data else: return "No text detected", 0.0, self._create_empty_data() except Exception as e: print(f"Vision API error: {e}") return f"Vision API failed: {str(e)}", 0.0, self._create_empty_data() def _extract_with_tesseract(self, image_path): """Extract text using Tesseract OCR""" try: # Preprocess image first processed_path, _ = ImageProcessor.preprocess_receipt_image(image_path) # Extract text with Tesseract raw_text = pytesseract.image_to_string( Image.open(processed_path), config='--oem 3 --psm 6' # OCR Engine Mode 3, Page Segmentation Mode 6 ) # Get confidence data data = pytesseract.image_to_data(Image.open(processed_path), output_type=pytesseract.Output.DICT) confidences = [int(conf) for conf in data['conf'] if int(conf) > 0] avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 extracted_data = self._parse_receipt_text(raw_text) return raw_text, avg_confidence / 100.0, extracted_data except Exception as e: print(f"Tesseract error: {e}") return f"Tesseract failed: {str(e)}", 0.0, self._create_empty_data() def _parse_receipt_text(self, raw_text): """Parse raw OCR text to extract structured data""" extracted_data = self._create_empty_data() lines = raw_text.split('\n') # Extract merchant name (usually first non-empty line) for line in lines: if line.strip() and len(line.strip()) > 2: extracted_data['merchant'] = line.strip() break # Extract date using regex patterns date_patterns = [ r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}', r'\d{4}[/-]\d{1,2}[/-]\d{1,2}', r'\d{1,2}\s+\w+\s+\d{4}' ] for line in lines: for pattern in date_patterns: match = re.search(pattern, line) if match: extracted_data['date'] = match.group() break if extracted_data['date']: break # Extract total amount amount_patterns = [ r'total[:\s]*\$?(\d+\.?\d*)', r'amount[:\s]*\$?(\d+\.?\d*)', r'sum[:\s]*\$?(\d+\.?\d*)', r'\$(\d+\.?\d*)' ] for line in lines: line_lower = line.lower() for pattern in amount_patterns: match = re.search(pattern, line_lower) if match: try: amount = float(match.group(1)) if amount > 0: extracted_data['total_amount'] = amount break except ValueError: continue if extracted_data['total_amount']: break # Extract line items (simplified approach) line_items = [] for line in lines: # Look for lines with item and price pattern item_match = re.search(r'(.+?)\s+(\d+\.?\d*)', line) if item_match and len(item_match.group(1)) > 2: try: item_name = item_match.group(1).strip() item_price = float(item_match.group(2)) if item_price > 0: line_items.append([item_name, item_price]) except ValueError: continue extracted_data['line_items'] = line_items[:10] # Limit to 10 items return extracted_data def _create_empty_data(self): """Create empty extracted data structure""" return { 'merchant': '', 'date': '', 'total_amount': 0.0, 'line_items': [] } # ========== C) ENHANCED DATABASE SERVICE ========== class DatabaseService: def __init__(self, db_name='fin_genius.db'): self.conn = sqlite3.connect(db_name, check_same_thread=False) self.cursor = self.conn.cursor() self._initialize_db() def _initialize_db(self): # Existing tables... self.cursor.execute('''CREATE TABLE IF NOT EXISTS users (phone TEXT PRIMARY KEY, name TEXT, password_hash TEXT, monthly_income INTEGER DEFAULT 0, savings_goal INTEGER DEFAULT 0, current_balance INTEGER DEFAULT 0, is_verified BOOLEAN DEFAULT FALSE, family_group TEXT DEFAULT NULL, last_balance_alert TIMESTAMP DEFAULT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS expenses (id INTEGER PRIMARY KEY AUTOINCREMENT, phone TEXT, category TEXT, allocated INTEGER DEFAULT 0, spent INTEGER DEFAULT 0, date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_recurring BOOLEAN DEFAULT FALSE, recurrence_pattern TEXT DEFAULT NULL, next_occurrence TIMESTAMP DEFAULT NULL, FOREIGN KEY(phone) REFERENCES users(phone))''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS spending_log (id INTEGER PRIMARY KEY AUTOINCREMENT, phone TEXT, category TEXT, amount INTEGER, description TEXT, date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, balance_after INTEGER, receipt_id TEXT DEFAULT NULL, FOREIGN KEY(phone) REFERENCES users(phone))''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS investments (id INTEGER PRIMARY KEY AUTOINCREMENT, phone TEXT, type TEXT, name TEXT, amount INTEGER, date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, notes TEXT, FOREIGN KEY(phone) REFERENCES users(phone))''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS auth_attempts (phone TEXT PRIMARY KEY, attempts INTEGER DEFAULT 1, last_attempt TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS family_groups (group_id TEXT PRIMARY KEY, name TEXT, admin_phone TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS alerts (id INTEGER PRIMARY KEY AUTOINCREMENT, phone TEXT, alert_type TEXT, message TEXT, sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(phone) REFERENCES users(phone))''') # NEW: Receipts table self.cursor.execute('''CREATE TABLE IF NOT EXISTS receipts (receipt_id TEXT PRIMARY KEY, user_phone TEXT, image_path TEXT, processed_image_path TEXT, merchant TEXT, amount REAL, receipt_date TEXT, category TEXT, ocr_confidence REAL, raw_text TEXT, extracted_data TEXT, is_validated BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(user_phone) REFERENCES users(phone))''') self.conn.commit() # Existing methods remain the same... def get_user(self, phone): self.cursor.execute('''SELECT name, monthly_income, savings_goal, family_group, current_balance FROM users WHERE phone=?''', (phone,)) return self.cursor.fetchone() def authenticate_user(self, phone, password): self.cursor.execute('''SELECT name, password_hash FROM users WHERE phone=?''', (phone,)) result = self.cursor.fetchone() if result and verify_password(password, result[1]): return result[0] return None def create_user(self, phone, name, password): try: password_hash = hash_password(password) self.cursor.execute('''INSERT INTO users (phone, name, password_hash, current_balance) VALUES (?, ?, ?, ?)''', (phone, name, password_hash, 0)) self.conn.commit() return True except sqlite3.IntegrityError: return False def update_user_balance(self, phone, new_balance): self.cursor.execute('''UPDATE users SET current_balance=? WHERE phone=?''', (new_balance, phone)) self.conn.commit() def get_current_balance(self, phone): self.cursor.execute('''SELECT current_balance FROM users WHERE phone=?''', (phone,)) result = self.cursor.fetchone() return result[0] if result else 0 def add_income(self, phone, amount, description="Income added"): current_balance = self.get_current_balance(phone) new_balance = current_balance + amount self.cursor.execute('''INSERT INTO spending_log (phone, category, amount, description, balance_after) VALUES (?, ?, ?, ?, ?)''', (phone, "Income", -amount, description, new_balance)) self.update_user_balance(phone, new_balance) self.conn.commit() return new_balance def update_financials(self, phone, income, savings): self.cursor.execute('''UPDATE users SET monthly_income=?, savings_goal=? WHERE phone=?''', (income, savings, phone)) self.conn.commit() def get_expenses(self, phone, months_back=3): end_date = datetime.now() start_date = end_date - relativedelta(months=months_back) self.cursor.execute('''SELECT category, allocated, spent, date(date) as exp_date, is_recurring FROM expenses WHERE phone=? AND date BETWEEN ? AND ? ORDER BY allocated DESC''', (phone, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'))) return self.cursor.fetchall() def update_expense_allocations(self, phone, allocations): self.cursor.execute('''DELETE FROM expenses WHERE phone=? AND allocated > 0 AND is_recurring=FALSE''', (phone,)) for category, alloc in zip(EXPENSE_CATEGORIES, allocations): if alloc > 0: self.cursor.execute('''INSERT INTO expenses (phone, category, allocated) VALUES (?, ?, ?)''', (phone, category, alloc)) self.conn.commit() def log_spending(self, phone, category, amount, description="", receipt_id=None): current_balance = self.get_current_balance(phone) new_balance = current_balance - amount self.cursor.execute('''INSERT INTO spending_log (phone, category, amount, description, balance_after, receipt_id) VALUES (?, ?, ?, ?, ?, ?)''', (phone, category, amount, description, new_balance, receipt_id)) self.update_user_balance(phone, new_balance) self.conn.commit() return new_balance def record_expense(self, phone, category, amount, description="", is_recurring=False, recurrence_pattern=None, receipt_id=None): new_balance = self.log_spending(phone, category, amount, description, receipt_id) self.cursor.execute('''SELECT allocated, spent FROM expenses WHERE phone=? AND category=? AND is_recurring=FALSE''', (phone, category)) result = self.cursor.fetchone() if is_recurring: next_occurrence = self._calculate_next_occurrence(datetime.now(), recurrence_pattern) self.cursor.execute('''INSERT INTO expenses (phone, category, spent, is_recurring, recurrence_pattern, next_occurrence) VALUES (?, ?, ?, ?, ?, ?)''', (phone, category, amount, True, recurrence_pattern, next_occurrence)) elif result: alloc, spent = result new_spent = spent + amount self.cursor.execute('''UPDATE expenses SET spent=? WHERE phone=? AND category=? AND is_recurring=FALSE''', (new_spent, phone, category)) else: self.cursor.execute('''INSERT INTO expenses (phone, category, spent) VALUES (?, ?, ?)''', (phone, category, amount)) self.conn.commit() return True, new_balance def _calculate_next_occurrence(self, current_date, pattern): if pattern == "Daily": return current_date + timedelta(days=1) elif pattern == "Weekly": return current_date + timedelta(weeks=1) elif pattern == "Monthly": return current_date + relativedelta(months=1) elif pattern == "Quarterly": return current_date + relativedelta(months=3) elif pattern == "Yearly": return current_date + relativedelta(years=1) return current_date def record_investment(self, phone, inv_type, name, amount, notes): self.cursor.execute('''INSERT INTO investments (phone, type, name, amount, notes) VALUES (?, ?, ?, ?, ?)''', (phone, inv_type, name, amount, notes)) self.conn.commit() return True def get_investments(self, phone): self.cursor.execute('''SELECT type, name, amount, date(date) as inv_date, notes FROM investments WHERE phone=? ORDER BY date DESC''', (phone,)) return self.cursor.fetchall() def get_spending_log(self, phone, limit=50): self.cursor.execute('''SELECT category, amount, description, date, balance_after FROM spending_log WHERE phone=? ORDER BY date DESC LIMIT ?''', (phone, limit)) return self.cursor.fetchall() def create_family_group(self, group_name, admin_phone): group_id = f"FG-{admin_phone[-4:]}-{int(time.time())}" try: self.cursor.execute('''INSERT INTO family_groups (group_id, name, admin_phone) VALUES (?, ?, ?)''', (group_id, group_name, admin_phone)) self.cursor.execute('''UPDATE users SET family_group=? WHERE phone=?''', (group_id, admin_phone)) self.conn.commit() return group_id except sqlite3.IntegrityError: return None def join_family_group(self, phone, group_id): self.cursor.execute('''UPDATE users SET family_group=? WHERE phone=?''', (group_id, phone)) self.conn.commit() return True def get_family_group(self, group_id): self.cursor.execute('''SELECT name, admin_phone FROM family_groups WHERE group_id=?''', (group_id,)) return self.cursor.fetchone() def get_family_members(self, group_id): self.cursor.execute('''SELECT phone, name FROM users WHERE family_group=?''', (group_id,)) return self.cursor.fetchall() # NEW: Receipt-related methods def save_receipt(self, phone, receipt_data): """Save receipt data to database""" receipt_id = f"REC-{phone[-4:]}-{int(time.time())}" try: self.cursor.execute('''INSERT INTO receipts (receipt_id, user_phone, image_path, processed_image_path, merchant, amount, receipt_date, category, ocr_confidence, raw_text, extracted_data, is_validated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', (receipt_id, phone, receipt_data.get('image_path', ''), receipt_data.get('processed_image_path', ''), receipt_data.get('merchant', ''), receipt_data.get('amount', 0.0), receipt_data.get('date', ''), receipt_data.get('category', ''), receipt_data.get('confidence', 0.0), receipt_data.get('raw_text', ''), json.dumps(receipt_data.get('extracted_data', {})), receipt_data.get('is_validated', False))) self.conn.commit() return receipt_id except sqlite3.Error as e: print(f"Database error saving receipt: {e}") return None def get_receipts(self, phone, limit=20): """Get user's receipts""" self.cursor.execute('''SELECT receipt_id, merchant, amount, receipt_date, category, ocr_confidence, is_validated, created_at FROM receipts WHERE user_phone=? ORDER BY created_at DESC LIMIT ?''', (phone, limit)) return self.cursor.fetchall() def update_receipt(self, receipt_id, updates): """Update receipt information""" set_clause = ", ".join([f"{key}=?" for key in updates.keys()]) values = list(updates.values()) + [receipt_id] self.cursor.execute(f'''UPDATE receipts SET {set_clause} WHERE receipt_id=?''', values) self.conn.commit() def auto_categorize_receipt(self, phone, merchant, amount): """Auto-categorize based on user's spending patterns""" # Get user's most common category for similar merchants self.cursor.execute('''SELECT category, COUNT(*) as count FROM spending_log WHERE phone=? AND (description LIKE ? OR description LIKE ?) GROUP BY category ORDER BY count DESC LIMIT 1''', (phone, f'%{merchant}%', f'%{merchant.split()[0]}%')) result = self.cursor.fetchone() if result: return result[0] # Fallback categorization based on merchant keywords merchant_lower = merchant.lower() if any(word in merchant_lower for word in ['grocery', 'market', 'food', 'super']): return "Groceries" elif any(word in merchant_lower for word in ['restaurant', 'cafe', 'pizza', 'burger']): return "Dining Out" elif any(word in merchant_lower for word in ['gas', 'fuel', 'shell', 'bp']): return "Transportation" elif any(word in merchant_lower for word in ['pharmacy', 'medical', 'hospital']): return "Healthcare" else: return "Miscellaneous" # Fixed Twilio WhatsApp Service class TwilioWhatsAppService: def __init__(self): # Set your Twilio credentials here self.account_sid = os.getenv('TWILIO_ACCOUNT_SID', 'your_account_sid_here') self.auth_token = os.getenv('TWILIO_AUTH_TOKEN', 'your_auth_token_here') # For Twilio Sandbox - use this number self.whatsapp_number = 'whatsapp:+14155238886' # For production Twilio (after approval) - use your approved number # self.whatsapp_number = 'whatsapp:+1234567890' # Your approved WhatsApp Business number print(f"🔧 Twilio Configuration:") print(f" Account SID: {self.account_sid[:10]}... (masked)") print(f" Auth Token: {'*' * len(self.auth_token) if self.auth_token != 'your_auth_token_here' else 'Not set'}") print(f" WhatsApp Number: {self.whatsapp_number}") if self.account_sid != 'your_account_sid_here' and self.auth_token != 'your_auth_token_here' and TWILIO_AVAILABLE: try: self.client = Client(self.account_sid, self.auth_token) # Test the connection by getting account info account = self.client.api.accounts(self.account_sid).fetch() print(f"✅ Twilio WhatsApp Service initialized successfully") print(f" Account Status: {account.status}") print(f" Account Name: {account.friendly_name}") self.enabled = True except Exception as e: print(f"❌ Failed to initialize Twilio: {e}") print(f" Please check your Account SID and Auth Token") self.client = None self.enabled = False else: print("❌ Twilio credentials not configured or Twilio not installed") print(" Please set TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN environment variables") print(" Or modify the credentials directly in the code") self.client = None self.enabled = False def send_whatsapp(self, phone, message): """Send WhatsApp message with proper error handling""" if not self.enabled or not self.client: print(f"📱 [DEMO MODE] WhatsApp to {phone}: {message}") return False try: # Ensure phone number has correct format if not phone.startswith('+'): phone = '+' + phone to_whatsapp = f"whatsapp:{phone}" print(f"📤 Attempting to send WhatsApp message:") print(f" From: {self.whatsapp_number}") print(f" To: {to_whatsapp}") print(f" Message: {message[:50]}...") twilio_message = self.client.messages.create( body=message, from_=self.whatsapp_number, to=to_whatsapp ) print(f"✅ WhatsApp sent successfully!") print(f" Message SID: {twilio_message.sid}") print(f" Status: {twilio_message.status}") return True except Exception as e: print(f"❌ Failed to send WhatsApp to {phone}") print(f" Error: {str(e)}") print(f" Error Type: {type(e).__name__}") # Common error messages and solutions if "not a valid phone number" in str(e).lower(): print(f" 💡 Solution: Check phone number format. Should be +country_code + number") elif "unverified" in str(e).lower(): print(f" 💡 Solution: For Twilio Sandbox, recipient must send 'join catch-manner' to +14155238886 first") elif "forbidden" in str(e).lower(): print(f" 💡 Solution: Check your Twilio credentials and account status") elif "unauthorized" in str(e).lower(): print(f" 💡 Solution: Verify your Account SID and Auth Token are correct") print(f"📱 [FALLBACK] Message content: {message}") return False # Initialize services db = DatabaseService() twilio = TwilioWhatsAppService() ocr_service = OCRService() # Helper functions (unchanged) def validate_phone_number(phone): pattern = r'^\+\d{1,3}\d{6,14}$' # Added missing closing quote and $ return re.match(pattern, phone) is not None def validate_password(password): if len(password) < 6: return False, "Password must be at least 6 characters long" if not re.search(r'[A-Za-z]', password): return False, "Password must contain at least one letter" if not re.search(r'\d', password): return False, "Password must contain at least one number" return True, "Password is valid" def format_currency(amount): return f"{int(amount):,} PKR" if amount else "0 PKR" def generate_spending_chart(phone, months=3): expenses = db.get_expenses(phone, months) if not expenses: return None df = pd.DataFrame(expenses, columns=['Category', 'Allocated', 'Spent', 'Date', 'IsRecurring']) df['Date'] = pd.to_datetime(df['Date']) df['Month'] = df['Date'].dt.strftime('%Y-%m') monthly_data = df.groupby(['Month', 'Category'])['Spent'].sum().unstack().fillna(0) fig = go.Figure() colors = px.colors.qualitative.Set3 for i, category in enumerate(monthly_data.columns): fig.add_trace(go.Bar( x=monthly_data.index, y=monthly_data[category], name=category, marker_color=colors[i % len(colors)], hoverinfo='y+name', textposition='auto' )) fig.update_layout( barmode='stack', title=f'📊 Spending Trends (Last {months} Months)', xaxis_title='Month', yaxis_title='Amount (PKR)', height=500, plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)' ) return fig def generate_balance_chart(phone): spending_log = db.get_spending_log(phone, 100) if not spending_log: return None df = pd.DataFrame(spending_log, columns=['Category', 'Amount', 'Description', 'Date', 'Balance']) df['Date'] = pd.to_datetime(df['Date']) df = df.sort_values('Date') fig = go.Figure() fig.add_trace(go.Scatter( x=df['Date'], y=df['Balance'], mode='lines+markers', name='Balance', line=dict(color='#00CC96', width=3), marker=dict(size=6), hovertemplate='Date: %{x}
Balance: %{y:,} PKR' )) fig.update_layout( title='💰 Balance Trend Over Time', xaxis_title='Date', yaxis_title='Balance (PKR)', height=400, plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)' ) return fig # ========== D) RECEIPT PROCESSING FUNCTIONS ========== def process_receipt_image(image_file, phone): """ Complete receipt processing pipeline Returns: (success, status_message, extracted_data, image_preview) """ try: if not image_file: return False, "❌ No image uploaded", {}, None # Save uploaded image timestamp = int(time.time()) filename = f"receipt_{phone}_{timestamp}.jpg" image_path = os.path.join(RECEIPTS_DIR, filename) # Handle different input types if hasattr(image_file, 'name'): # File upload with open(image_path, 'wb') as f: f.write(image_file.read()) else: # Direct file path image_path = image_file # Preprocess image processed_path, preprocessing_info = ImageProcessor.preprocess_receipt_image(image_path) # Extract text using OCR raw_text, confidence, extracted_data = ocr_service.extract_text_from_receipt(processed_path) # Auto-categorize if extracted_data.get('merchant'): suggested_category = db.auto_categorize_receipt( phone, extracted_data['merchant'], extracted_data.get('total_amount', 0) ) extracted_data['suggested_category'] = suggested_category # Prepare receipt data for database receipt_data = { 'image_path': image_path, 'processed_image_path': processed_path, 'merchant': extracted_data.get('merchant', ''), 'amount': extracted_data.get('total_amount', 0.0), 'date': extracted_data.get('date', ''), 'category': extracted_data.get('suggested_category', 'Miscellaneous'), 'confidence': confidence, 'raw_text': raw_text, 'extracted_data': extracted_data, 'is_validated': False } # Save to database receipt_id = db.save_receipt(phone, receipt_data) extracted_data['receipt_id'] = receipt_id status_msg = f"✅ Receipt processed successfully! Confidence: {confidence:.1%}" if confidence < 0.7: status_msg += " ⚠️ Low confidence - please verify extracted data" return True, status_msg, extracted_data, image_path except Exception as e: print(f"Receipt processing error: {e}") return False, f"❌ Processing failed: {str(e)}", {}, None def validate_and_save_receipt(phone, receipt_id, merchant, amount, date, category, line_items_data): """ Validate edited receipt data and save as expense """ try: if not phone or not receipt_id: return "❌ Session expired. Please sign in again.", "", [], [] if not merchant.strip(): return "❌ Merchant name is required", "", [], [] if amount <= 0: return "❌ Amount must be positive", "", [], [] # Check balance current_balance = db.get_current_balance(phone) if current_balance < amount: return "❌ Insufficient balance for this expense", "", [], [] # Update receipt in database receipt_updates = { 'merchant': merchant, 'amount': amount, 'receipt_date': date, 'category': category, 'is_validated': True } db.update_receipt(receipt_id, receipt_updates) # Record as expense description = f"Receipt: {merchant}" if date: description += f" ({date})" success, new_balance = db.record_expense( phone, category, amount, description, receipt_id=receipt_id ) if not success: return "❌ Failed to record expense", "", [], [] # Send WhatsApp confirmation user_data = db.get_user(phone) name = user_data[0] if user_data else "User" msg = f"🧾 Receipt Expense - Hi {name}! Merchant: {merchant}, Amount: {format_currency(amount)}, Category: {category}, Remaining Balance: {format_currency(new_balance)}" twilio.send_whatsapp(phone, msg) # Get updated data for UI expenses = db.get_expenses(phone) formatted_expenses = [] if expenses: for cat, alloc, spent, date, _ in expenses: formatted_expenses.append([ cat, alloc, spent, alloc - spent, date.split()[0] if date else "" ]) spending_log = db.get_spending_log(phone, 10) formatted_spending_log = [] if spending_log: for cat, amt, desc, date, balance_after in spending_log: formatted_spending_log.append([ cat, amt, desc[:50] + "..." if len(desc) > 50 else desc, date.split()[0] if date else "", balance_after ]) status_msg = f"✅ Receipt saved! Recorded {format_currency(amount)} for {category}" balance_html = f"
💰 {format_currency(new_balance)}
" return status_msg, balance_html, formatted_expenses, formatted_spending_log except Exception as e: print(f"Receipt validation error: {e}") return f"❌ Error saving receipt: {str(e)}", "", [], [] # ========== PAGE NAVIGATION FUNCTIONS ========== def show_signin(): return [ gr.update(visible=False), # landing_page gr.update(visible=True), # signin_page gr.update(visible=False), # signup_page gr.update(visible=False), # dashboard_page "", # Clear signin inputs "" ] def show_signup(): return [ gr.update(visible=False), # landing_page gr.update(visible=False), # signin_page gr.update(visible=True), # signup_page gr.update(visible=False), # dashboard_page "", # Clear signup inputs "", "", "" ] def show_dashboard(phone, name): user_data = db.get_user(phone) current_balance = user_data[4] if user_data else 0 monthly_income = user_data[1] if user_data else 0 savings_goal = user_data[2] if user_data else 0 # Get expense data expenses = db.get_expenses(phone) formatted_expenses = [] if expenses: for cat, alloc, spent, date, _ in expenses: formatted_expenses.append([ cat, alloc, spent, alloc - spent, date.split()[0] if date else "" ]) # Get investment data investments = db.get_investments(phone) formatted_investments = [] if investments: for inv_type, name, amount, date, notes in investments: formatted_investments.append([ inv_type, name, amount, date.split()[0] if date else "", notes or "" ]) # Get spending log spending_log = db.get_spending_log(phone, 10) formatted_spending_log = [] if spending_log: for category, amount, description, date, balance_after in spending_log: formatted_spending_log.append([ category, amount, description[:50] + "..." if len(description) > 50 else description, date.split()[0] if date else "", balance_after ]) # Get family info family_info = "No family group" family_members = [] if user_data and user_data[3]: group_data = db.get_family_group(user_data[3]) if group_data: family_info = f"Family Group: {group_data[0]} (Admin: {group_data[1]})" members = db.get_family_members(user_data[3]) family_members = [[m[0], m[1]] for m in members] # Get receipt data receipts = db.get_receipts(phone) formatted_receipts = [] if receipts: for receipt_id, merchant, amount, date, category, confidence, is_validated, created_at in receipts: status = "✅ Validated" if is_validated else "⏳ Pending" formatted_receipts.append([ receipt_id, merchant or "Unknown", format_currency(amount), date or "N/A", category or "N/A", f"{confidence:.1%}", status, created_at.split()[0] if created_at else "" ]) # Prepare allocation inputs alloc_inputs = [] if expenses: alloc_dict = {cat: alloc for cat, alloc, _, _, _ in expenses} alloc_inputs = [alloc_dict.get(cat, 0) for cat in EXPENSE_CATEGORIES] else: alloc_inputs = [0] * len(EXPENSE_CATEGORIES) return [ gr.update(visible=False), # landing_page gr.update(visible=False), # signin_page gr.update(visible=False), # signup_page gr.update(visible=True), # dashboard_page f"Welcome back, {name}! 👋", # welcome message f"
💰 {format_currency(current_balance)}
", # balance display monthly_income, # income savings_goal, # savings_goal *alloc_inputs, # allocation inputs formatted_expenses, # expense_table formatted_investments, # investments_table formatted_spending_log, # spending_log_table generate_spending_chart(phone), # spending_chart generate_balance_chart(phone), # balance_chart family_info, # family_info family_members, # family_members formatted_receipts # receipts_table ] def return_to_landing(): return [ gr.update(visible=True), # landing_page gr.update(visible=False), # signin_page gr.update(visible=False), # signup_page gr.update(visible=False), # dashboard_page "", # Clear welcome "
💰 0 PKR
" # Clear balance ] # ========== AUTHENTICATION FUNCTIONS ========== def authenticate_user(phone, password): if not phone or not password: return "❌ Please fill all fields" if not validate_phone_number(phone): return "❌ Invalid phone format. Use +92XXXXXXXXXX" user_name = db.authenticate_user(phone, password) if not user_name: return "❌ Invalid phone number or password." return f"✅ Signed in as {user_name}" def register_user(name, phone, password, confirm_password): if not name or not phone or not password or not confirm_password: return "❌ Please fill all fields" if not validate_phone_number(phone): return "❌ Invalid phone format. Use +92XXXXXXXXXX" if password != confirm_password: return "❌ Passwords don't match" is_valid, password_msg = validate_password(password) if not is_valid: return f"❌ {password_msg}" success = db.create_user(phone, name, password) if not success: return "⚠️ This number is already registered" msg = f"🏦 Welcome to FinGenius Pro, {name}! Your account has been created successfully. You can now track expenses, manage budgets, and receive instant financial alerts. Start by adding your first balance! 💰" whatsapp_sent = twilio.send_whatsapp(phone, msg) if whatsapp_sent: return "✅ Registration complete! Check WhatsApp for confirmation and sign in to continue." else: return "✅ Registration complete! WhatsApp alerts are not configured, but you can still use all features. Sign in to continue." def add_balance(phone, amount_val, description=""): if not phone: return "❌ Session expired. Please sign in again.", "" if amount_val <= 0: return "❌ Amount must be positive", "" new_balance = db.add_income(phone, amount_val, description or "Balance added") user_data = db.get_user(phone) if user_data: name = user_data[0] msg = f"💰 Balance Added - Hi {name}! Added: {format_currency(amount_val)}. New Balance: {format_currency(new_balance)}. Description: {description or 'Balance update'}" twilio.send_whatsapp(phone, msg) return f"✅ Added {format_currency(amount_val)} to balance!", f"
💰 {format_currency(new_balance)}
" def update_financials(phone, income_val, savings_val): if not phone: return "❌ Session expired. Please sign in again." if income_val < 0 or savings_val < 0: return "❌ Values cannot be negative" db.update_financials(phone, income_val, savings_val) user_data = db.get_user(phone) if user_data: name = user_data[0] msg = f"📊 Financial Goals Updated - Hi {name}! Monthly Income: {format_currency(income_val)}, Savings Goal: {format_currency(savings_val)}. Your budget planning is now ready! 🎯" twilio.send_whatsapp(phone, msg) return f"✅ Updated! Monthly Income: {format_currency(income_val)}, Savings Goal: {format_currency(savings_val)}" def save_allocations(phone, *allocations): if not phone: return "❌ Session expired. Please sign in again.", [] if any(alloc < 0 for alloc in allocations): return "❌ Allocations cannot be negative", [] total_alloc = sum(allocations) user_data = db.get_user(phone) if not user_data: return "❌ User not found", [] if total_alloc + user_data[2] > user_data[1]: return "❌ Total allocations exceed available income!", [] db.update_expense_allocations(phone, allocations) name = user_data[0] msg = f"📋 Budget Allocated - Hi {name}! Your monthly budget has been set. Total allocated: {format_currency(total_alloc)}. Start tracking your expenses now! 💳" twilio.send_whatsapp(phone, msg) expenses = db.get_expenses(phone) formatted_expenses = [] if expenses: for cat, alloc, spent, date, _ in expenses: formatted_expenses.append([ cat, alloc, spent, alloc - spent, date.split()[0] if date else "" ]) return "✅ Budget allocations saved!", formatted_expenses def record_expense(phone, category, amount, description="", is_recurring=False, recurrence_pattern=None): if not phone: return "❌ Session expired. Please sign in again.", "", [], [] if amount <= 0: return "❌ Amount must be positive", "", [], [] current_balance = db.get_current_balance(phone) if current_balance < amount: return "❌ Insufficient balance for this expense", "", [], [] success, new_balance = db.record_expense(phone, category, amount, description, is_recurring, recurrence_pattern) if not success: return "❌ Failed to record expense", "", [], [] user_data = db.get_user(phone) name = user_data[0] if user_data else "User" msg = f"💸 Expense Recorded - Hi {name}! Category: {category}, Amount: {format_currency(amount)}, Remaining Balance: {format_currency(new_balance)}" if description: msg += f", Note: {description}" if is_recurring: msg += f" (Recurring: {recurrence_pattern})" twilio.send_whatsapp(phone, msg) expenses = db.get_expenses(phone) formatted_expenses = [] if expenses: for cat, alloc, spent, date, _ in expenses: formatted_expenses.append([ cat, alloc, spent, alloc - spent, date.split()[0] if date else "" ]) spending_log = db.get_spending_log(phone, 10) formatted_spending_log = [] if spending_log: for cat, amt, desc, date, balance_after in spending_log: formatted_spending_log.append([ cat, amt, desc[:50] + "..." if len(desc) > 50 else desc, date.split()[0] if date else "", balance_after ]) status_msg = f"✅ Recorded {format_currency(amount)} for {category}" balance_html = f"
💰 {format_currency(new_balance)}
" return status_msg, balance_html, formatted_expenses, formatted_spending_log def add_investment(phone, inv_type, name, amount, notes): if not phone: return "❌ Session expired. Please sign in again.", "", [] if amount <= 0: return "❌ Amount must be positive", "", [] current_balance = db.get_current_balance(phone) if current_balance < amount: return "❌ Insufficient balance for investment", "", [] new_balance = db.log_spending(phone, "Investment", amount, f"Investment: {name}") db.record_investment(phone, inv_type, name, amount, notes) user_data = db.get_user(phone) if user_data: user_name = user_data[0] msg = f"📈 Investment Added - Hi {user_name}! Type: {inv_type}, Name: {name}, Amount: {format_currency(amount)}, Remaining Balance: {format_currency(new_balance)}" if notes: msg += f", Notes: {notes}" twilio.send_whatsapp(phone, msg) investments = db.get_investments(phone) formatted_investments = [] if investments: for inv_type, name, amount, date, notes in investments: formatted_investments.append([ inv_type, name, amount, date.split()[0] if date else "", notes or "" ]) balance_html = f"
💰 {format_currency(new_balance)}
" return f"✅ Added investment: {name} ({format_currency(amount)})", balance_html, formatted_investments def create_family_group(phone, group_name): if not phone or not group_name: return "❌ Group name required", "", [] group_id = db.create_family_group(group_name, phone) if not group_id: return "❌ Failed to create group", "", [] user_data = db.get_user(phone) if user_data: name = user_data[0] msg = f"👪 Family Group Created - Hi {name}! You've created '{group_name}' (ID: {group_id}). Share this ID with family members to join. Manage finances together! 🏠" twilio.send_whatsapp(phone, msg) return f"✅ Created group: {group_name} (ID: {group_id})", f"Family Group: {group_name} (Admin: {phone})", [[phone, db.get_user(phone)[0] if db.get_user(phone) else "You"]] def join_family_group(phone, group_id): if not phone or not group_id: return "❌ Group ID required", "", [] success = db.join_family_group(phone, group_id) if not success: return "❌ Failed to join group", "", [] group_data = db.get_family_group(group_id) if not group_data: return "❌ Group not found", "", [] user_data = db.get_user(phone) if user_data: name = user_data[0] msg = f"👪 Joined Family Group - Hi {name}! You've joined '{group_data[0]}'. Start collaborating on family finances together! 🤝" twilio.send_whatsapp(phone, msg) members = db.get_family_members(group_id) member_list = [[m[0], m[1]] for m in members] return f"✅ Joined group: {group_data[0]}", f"Family Group: {group_data[0]} (Admin: {group_data[1]})", member_list # ========== E) RECEIPT PROCESSING EVENT HANDLERS ========== def handle_receipt_upload(image_file, phone): """Handle receipt image upload and processing""" if not phone: return "❌ Please sign in first", {}, "", "", "", [], None, "" if not image_file: return "❌ Please upload an image", {}, "", "", "", [], None, "" # Process the receipt success, status, extracted_data, image_path = process_receipt_image(image_file, phone) if not success: return status, {}, "", "", "", [], None, "" # Prepare UI updates merchant = extracted_data.get('merchant', '') amount = extracted_data.get('total_amount', 0.0) date = extracted_data.get('date', '') category = extracted_data.get('suggested_category', 'Miscellaneous') line_items = extracted_data.get('line_items', []) # Create image preview try: image_preview = Image.open(image_path) # Resize for preview image_preview.thumbnail((400, 600)) except: image_preview = None return ( status, {"receipt_id": extracted_data.get('receipt_id', ''), "confidence": extracted_data.get('confidence', 0.0)}, merchant, amount, date, line_items, image_preview, category ) def handle_receipt_save(phone, receipt_data, merchant, amount, date, category, line_items_data): """Save validated receipt as expense""" if not phone or not receipt_data: return "❌ No receipt data to save", "", [], [] receipt_id = receipt_data.get('receipt_id') if not receipt_id: return "❌ Invalid receipt data", "", [], [] return validate_and_save_receipt(phone, receipt_id, merchant, amount, date, category, line_items_data) # ========== F) INTERFACE ========== custom_css = """ /* Fixed CSS for proper page transitions */ .gradio-container { max-width: 1200px !important; margin: 0 auto !important; } .landing-hero { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 80vh; padding: 3rem 2rem; color: white; text-align: center; border-radius: 20px; margin: 2rem 0; } .hero-title { font-size: 3.5rem; font-weight: 700; margin-bottom: 1rem; text-shadow: 2px 2px 4px rgba(0,0,0,0.3); } .hero-subtitle { font-size: 1.5rem; margin-bottom: 2rem; opacity: 0.9; } .features-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(280px, 1fr)); gap: 2rem; margin: 3rem 0; } .feature-card { background: rgba(255,255,255,0.1); backdrop-filter: blur(10px); border-radius: 15px; padding: 2rem; text-align: center; border: 1px solid rgba(255,255,255,0.2); transition: transform 0.3s ease; } .feature-card:hover { transform: translateY(-5px); } .feature-icon { font-size: 3rem; margin-bottom: 1rem; } .auth-container { max-width: 450px; margin: 2rem auto; background: white; border-radius: 20px; padding: 3rem; box-shadow: 0 20px 40px rgba(0,0,0,0.1); border: 1px solid #e2e8f0; } .whatsapp-setup { background: linear-gradient(135deg, #25D366 0%, #128C7E 100%); color: white; padding: 2rem; border-radius: 15px; margin: 2rem 0; text-align: center; box-shadow: 0 10px 25px rgba(37, 211, 102, 0.3); } .whatsapp-steps { background: rgba(255, 255, 255, 0.1); backdrop-filter: blur(10px); border-radius: 10px; padding: 1.5rem; margin: 1rem 0; border: 1px solid rgba(255, 255, 255, 0.2); } .phone-highlight { background: rgba(255, 255, 255, 0.2); padding: 0.5rem 1rem; border-radius: 8px; font-family: monospace; font-size: 1.1rem; font-weight: bold; display: inline-block; margin: 0.5rem 0; } .code-highlight { background: rgba(255, 255, 255, 0.15); padding: 0.5rem 1rem; border-radius: 8px; font-family: monospace; font-size: 1rem; font-weight: bold; display: inline-block; margin: 0.5rem 0; border-left: 3px solid #fff; } .dashboard-header { background: linear-gradient(135deg, #2d3748 0%, #4a5568 100%); color: white; padding: 2rem; border-radius: 15px; margin-bottom: 2rem; text-align: center; font-size: 1.5rem; } .balance-card { background: linear-gradient(135deg, #48bb78 0%, #38a169 100%); color: white; padding: 2rem; border-radius: 15px; text-align: center; margin-bottom: 2rem; box-shadow: 0 10px 25px rgba(72, 187, 120, 0.3); } .balance-amount { font-size: 2.5rem; font-weight: bold; margin: 1rem 0; } .receipt-upload-area { border: 2px dashed #cbd5e0; border-radius: 15px; padding: 2rem; text-align: center; background: #f7fafc; transition: all 0.3s ease; } .receipt-upload-area:hover { border-color: #4299e1; background: #ebf8ff; } .receipt-preview { max-width: 100%; max-height: 400px; border-radius: 10px; box-shadow: 0 4px 15px rgba(0,0,0,0.1); } .low-confidence { background-color: #fff3cd !important; border: 1px solid #ffc107 !important; } .high-confidence { background-color: #d4edda !important; border: 1px solid #28a745 !important; } /* Button styling */ .primary-btn { background: linear-gradient(45deg, #ff6b6b, #ee5a24) !important; border: none !important; border-radius: 25px !important; padding: 1rem 2rem !important; font-size: 1.1rem !important; font-weight: 600 !important; color: white !important; transition: all 0.3s ease !important; box-shadow: 0 4px 15px rgba(238, 90, 36, 0.4) !important; } .secondary-btn { background: linear-gradient(45deg, #74b9ff, #0984e3) !important; border: none !important; border-radius: 25px !important; padding: 1rem 2rem !important; font-size: 1.1rem !important; font-weight: 600 !important; color: white !important; transition: all 0.3s ease !important; box-shadow: 0 4px 15px rgba(116, 185, 255, 0.4) !important; } /* Tab styling */ .tab-nav { background: #f8fafc; border-radius: 10px; padding: 0.5rem; margin-bottom: 2rem; } /* Hide elements properly */ .hide { display: none !important; } /* Ensure proper spacing */ .gradio-row { margin: 1rem 0; } .gradio-column { padding: 0 1rem; } /* Custom button hover effects */ button:hover { transform: translateY(-2px); box-shadow: 0 6px 20px rgba(0,0,0,0.15); } /* Status message styling */ .status-success { color: #38a169; font-weight: 600; } .status-error { color: #e53e3e; font-weight: 600; } /* Table styling */ .dataframe { border-radius: 10px; overflow: hidden; box-shadow: 0 4px 15px rgba(0,0,0,0.1); } """ with gr.Blocks(title="FinGenius Pro", theme=gr.themes.Soft(), css=custom_css) as demo: # State to track current user current_user = gr.State() receipt_data = gr.State({}) # ===== LANDING PAGE ===== with gr.Column(visible=True) as landing_page: gr.HTML("""
🏦 FinGenius Pro
Your Complete Personal Finance Manager with Smart AI Alerts
💰

Smart Balance Tracking

Real-time balance monitoring with intelligent spending alerts

📱

WhatsApp Integration

Get instant notifications for every expense and budget alert

📊

Advanced Analytics

Beautiful charts and insights to track your spending patterns

🧾

Receipt Scanning

AI-powered OCR to automatically extract expense data from receipts

👪

Family Finance

Create family groups to manage household finances together

🔒

Secure & Private

Password-protected accounts with encrypted data storage

""") with gr.Row(): with gr.Column(scale=1): signin_btn = gr.Button("🔑 Sign In", variant="primary", elem_classes="primary-btn", size="lg") with gr.Column(scale=1): signup_btn = gr.Button("✨ Create Account", variant="secondary", elem_classes="secondary-btn", size="lg") # ===== SIGN IN PAGE ===== with gr.Column(visible=False) as signin_page: with gr.Column(elem_classes="auth-container"): gr.HTML("

🔑 Welcome Back

") signin_phone = gr.Textbox( label="📱 WhatsApp Number", placeholder="+92XXXXXXXXXX", info="Enter your registered WhatsApp number" ) signin_password = gr.Textbox( label="🔒 Password", type="password", placeholder="Enter your secure password" ) with gr.Row(): submit_signin = gr.Button("Sign In", variant="primary", elem_classes="primary-btn", scale=2) back_to_landing_1 = gr.Button("← Back", variant="secondary", scale=1) signin_status = gr.Textbox(label="Status", interactive=False) # ===== SIGN UP PAGE ===== with gr.Column(visible=False) as signup_page: with gr.Column(elem_classes="auth-container"): gr.HTML("

✨ Create Your Account

") signup_name = gr.Textbox( label="👤 Full Name", placeholder="Enter your full name" ) signup_phone = gr.Textbox( label="📱 WhatsApp Number", placeholder="+92XXXXXXXXXX", info="This will be used for notifications" ) signup_password = gr.Textbox( label="🔒 Create Password", type="password", placeholder="Minimum 6 characters with letters and numbers" ) signup_confirm_password = gr.Textbox( label="🔒 Confirm Password", type="password", placeholder="Re-enter your password" ) # WhatsApp Setup Instructions gr.HTML("""

📱 Enable WhatsApp Alerts

To receive instant notifications for your financial activities, follow these steps:

Step 1: Save the Bot Number

Add this Twilio WhatsApp Sandbox number to your contacts:

+1 (415) 523-8886

Step 2: Send Activation Code

Send this exact message to the number above:

join catch-manner

⚠️ Important: You must send this exact code to activate the sandbox.

Step 3: Confirm Registration

After sending the code, register your FinGenius account with the same phone number you used to message the bot.

Step 4: Start Receiving Alerts

You'll receive instant WhatsApp notifications for:

""") with gr.Row(): submit_signup = gr.Button("Complete Registration", variant="primary", elem_classes="primary-btn", scale=2) back_to_landing_2 = gr.Button("← Back", variant="secondary", scale=1) signup_status = gr.Textbox(label="Status", interactive=False) # ===== DASHBOARD PAGE ===== with gr.Column(visible=False) as dashboard_page: # Dashboard Header welcome_message = gr.HTML("", elem_classes="dashboard-header") # Current Balance Display with gr.Column(elem_classes="balance-card"): balance_display = gr.HTML("
💰 0 PKR
") with gr.Row(): with gr.Column(scale=2): balance_amount = gr.Number(label="💰 Add to Balance (PKR)", minimum=1, step=100, value=0) balance_description = gr.Textbox(label="Description", placeholder="Salary, gift, bonus, etc.") with gr.Column(scale=1): add_balance_btn = gr.Button("Add Balance", variant="primary", elem_classes="primary-btn") balance_status = gr.Textbox(label="Balance Status", interactive=False) with gr.Tabs(elem_classes="tab-nav"): # Dashboard Overview Tab with gr.Tab("📊 Dashboard Overview"): gr.HTML("""

🎉 Welcome to FinGenius Pro!

Your personal finance management just got smarter. Start by adding some balance and setting up your budget allocations.

""") with gr.Row(): gr.HTML("""

🚀 Quick Start Guide:

  1. Add Balance: Use the balance card above to add your initial funds
  2. Set Income & Goals: Go to Income & Goals tab to set your monthly income and savings target
  3. Plan Budget: Use Budget Planner to allocate money to different expense categories
  4. Track Expenses: Log your daily expenses in the Expense Tracker
  5. Scan Receipts: Use Receipt Scan to automatically extract expense data from photos
  6. Monitor Investments: Keep track of your investment portfolio
""") # Income & Goals Tab with gr.Tab("📥 Income & Goals"): gr.HTML("

💵 Set Your Financial Goals

") with gr.Row(): income = gr.Number(label="💵 Monthly Income (PKR)", minimum=0, step=1000, value=0) savings_goal = gr.Number(label="🎯 Savings Goal (PKR)", minimum=0, step=1000, value=0) update_btn = gr.Button("💾 Update Financial Info", variant="primary", elem_classes="primary-btn") income_status = gr.Textbox(label="Status", interactive=False) # Budget Planner Tab with gr.Tab("📊 Budget Planner"): gr.HTML("

💼 Allocate Your Monthly Budget

") with gr.Column(): allocation_inputs = [] with gr.Row(): for i, category in enumerate(EXPENSE_CATEGORIES[:7]): alloc = gr.Number(label=f"🏷️ {category}", minimum=0, step=100, value=0) allocation_inputs.append(alloc) with gr.Row(): for i, category in enumerate(EXPENSE_CATEGORIES[7:]): alloc = gr.Number(label=f"🏷️ {category}", minimum=0, step=100, value=0) allocation_inputs.append(alloc) allocate_btn = gr.Button("💾 Save Budget Allocations", variant="primary", elem_classes="primary-btn", size="lg") allocation_status = gr.Textbox(label="Status", interactive=False) gr.HTML("

📊 Current Budget Allocations

") expense_table = gr.Dataframe( headers=["Category", "Allocated", "Spent", "Remaining", "Date"], interactive=False, wrap=True ) # Receipt Scan Tab - NEW! with gr.Tab("📷 Receipt Scan"): gr.HTML("""

🧾 AI-Powered Receipt Scanner

Upload receipt photos and let AI extract expense data automatically!

""") with gr.Row(): with gr.Column(scale=1): gr.HTML("

📤 Upload Receipt

") receipt_image = gr.File( label="📷 Receipt Image", file_types=["image"], elem_classes="receipt-upload-area" ) process_receipt_btn = gr.Button( "🔍 Process Receipt", variant="primary", elem_classes="primary-btn", size="lg" ) receipt_status = gr.Textbox(label="Processing Status", interactive=False) # Image Preview gr.HTML("

📸 Receipt Preview

") receipt_preview = gr.Image( label="Receipt Preview", type="pil", elem_classes="receipt-preview" ) with gr.Column(scale=1): gr.HTML("

✏️ Verify & Edit Extracted Data

") extracted_merchant = gr.Textbox( label="🏪 Merchant Name", placeholder="Store/Restaurant name", info="Edit if incorrectly detected" ) with gr.Row(): extracted_amount = gr.Number( label="💰 Total Amount (PKR)", minimum=0, step=0.01, value=0 ) extracted_date = gr.Textbox( label="📅 Date", placeholder="YYYY-MM-DD or DD/MM/YYYY" ) extracted_category = gr.Dropdown( choices=EXPENSE_CATEGORIES, label="🏷️ Category", value="Miscellaneous", info="AI-suggested category (you can change it)" ) gr.HTML("

📝 Line Items (Optional)

") line_items_table = gr.Dataframe( headers=["Item", "Price"], datatype=["str", "number"], row_count=5, col_count=2, interactive=True, label="Receipt Items" ) save_receipt_btn = gr.Button( "💾 Save as Expense", variant="primary", elem_classes="primary-btn", size="lg" ) # Receipt History gr.HTML("

🧾 Recent Receipts

") receipts_table = gr.Dataframe( headers=["Receipt ID", "Merchant", "Amount", "Date", "Category", "Confidence", "Status", "Processed"], interactive=False, wrap=True ) # Expense Tracker Tab with gr.Tab("💸 Expense Tracker"): with gr.Row(): with gr.Column(): gr.HTML("

➕ Log New Expense

") expense_category = gr.Dropdown(choices=EXPENSE_CATEGORIES, label="🏷️ Category") expense_amount = gr.Number(label="💰 Amount (PKR)", minimum=1, step=100, value=0) expense_description = gr.Textbox(label="📝 Description", placeholder="What did you buy?") with gr.Accordion("🔄 Recurring Expense Settings", open=False): is_recurring = gr.Checkbox(label="This is a recurring expense") recurrence_pattern = gr.Dropdown(choices=RECURRENCE_PATTERNS, label="Frequency") record_expense_btn = gr.Button("💸 Record Expense", variant="primary", elem_classes="primary-btn", size="lg") expense_status = gr.Textbox(label="Status", interactive=False) with gr.Column(): gr.HTML("

📈 Spending Analytics

") spending_chart = gr.Plot(label="📊 Spending Analysis") balance_chart = gr.Plot(label="💰 Balance Trend") with gr.Row(): months_history = gr.Slider(1, 12, value=3, step=1, label="📅 Months History") update_charts_btn = gr.Button("🔄 Update Analytics", variant="secondary") # Spending History Tab with gr.Tab("📝 Spending History"): gr.HTML("

💳 Recent Transaction History

") spending_log_table = gr.Dataframe( headers=["Category", "Amount", "Description", "Date", "Balance After"], interactive=False, wrap=True ) # Investment Portfolio Tab with gr.Tab("📈 Investment Portfolio"): with gr.Row(): with gr.Column(): gr.HTML("

➕ Add New Investment

") investment_type = gr.Dropdown(choices=INVESTMENT_TYPES, label="🏢 Investment Type") investment_name = gr.Textbox(label="📝 Name/Description") investment_amount = gr.Number(label="💰 Amount (PKR)", minimum=1, step=1000, value=0) investment_notes = gr.Textbox(label="📋 Notes", lines=2, placeholder="Additional details...") add_investment_btn = gr.Button("📈 Add Investment", variant="primary", elem_classes="primary-btn") investment_status = gr.Textbox(label="Status", interactive=False) with gr.Column(): gr.HTML("

💼 Your Investment Portfolio

") investments_table = gr.Dataframe( headers=["Type", "Name", "Amount", "Date", "Notes"], interactive=False, wrap=True ) # Family Finance Tab with gr.Tab("👪 Family Finance"): gr.HTML("

👨‍👩‍👧‍👦 Family Financial Management

") family_info = gr.Textbox(label="👥 Current Family Group", interactive=False) with gr.Row(): with gr.Column(): gr.HTML("

➕ Create New Family Group

") create_group_name = gr.Textbox(label="👪 Group Name", placeholder="Smith Family Budget") create_group_btn = gr.Button("Create Family Group", variant="primary", elem_classes="primary-btn") with gr.Column(): gr.HTML("

🔗 Join Existing Group

") join_group_id = gr.Textbox(label="🆔 Group ID", placeholder="FG-XXXX-XXXXXXXX") join_group_btn = gr.Button("Join Family Group", variant="secondary", elem_classes="secondary-btn") family_status = gr.Textbox(label="Status", interactive=False) gr.HTML("

👥 Family Members

") family_members = gr.Dataframe( headers=["Phone", "Name"], interactive=False, wrap=True ) with gr.Row(): with gr.Column(scale=3): pass with gr.Column(scale=1): sign_out_btn = gr.Button("🚪 Sign Out", variant="stop", elem_classes="secondary-btn", size="lg") # ===== EVENT HANDLERS ===== # Navigation signin_btn.click( show_signin, outputs=[landing_page, signin_page, signup_page, dashboard_page, signin_phone, signin_password] ) signup_btn.click( show_signup, outputs=[landing_page, signin_page, signup_page, dashboard_page, signup_name, signup_phone, signup_password, signup_confirm_password] ) back_to_landing_1.click( return_to_landing, outputs=[landing_page, signin_page, signup_page, dashboard_page, welcome_message, balance_display] ) back_to_landing_2.click( return_to_landing, outputs=[landing_page, signin_page, signup_page, dashboard_page, welcome_message, balance_display] ) sign_out_btn.click( return_to_landing, outputs=[landing_page, signin_page, signup_page, dashboard_page, welcome_message, balance_display] ) # Authentication def handle_signin(phone, password): status = authenticate_user(phone, password) if "✅" in status: user_name = status.split("as ")[1] current_user.value = phone pages = show_dashboard(phone, user_name) return [status] + pages else: empty_alloc = [0] * len(EXPENSE_CATEGORIES) return [status, gr.update(), gr.update(), gr.update(), gr.update(), "", "
💰 0 PKR
", 0, 0] + empty_alloc + [[], [], [], None, None, "No family group", [], []] submit_signin.click( handle_signin, inputs=[signin_phone, signin_password], outputs=[signin_status, landing_page, signin_page, signup_page, dashboard_page, welcome_message, balance_display, income, savings_goal] + allocation_inputs + [expense_table, investments_table, spending_log_table, spending_chart, balance_chart, family_info, family_members, receipts_table] ) def handle_signup(name, phone, password, confirm_password): status = register_user(name, phone, password, confirm_password) return status submit_signup.click( handle_signup, inputs=[signup_name, signup_phone, signup_password, signup_confirm_password], outputs=[signup_status] ) # Balance Management def handle_add_balance(amount_val, description): if current_user.value: status, balance_html = add_balance(current_user.value, amount_val, description) return status, balance_html else: return "❌ Please sign in first", "
💰 0 PKR
" add_balance_btn.click( handle_add_balance, inputs=[balance_amount, balance_description], outputs=[balance_status, balance_display] ) # Financial Operations def handle_update_financials(income_val, savings_val): if current_user.value: return update_financials(current_user.value, income_val, savings_val) else: return "❌ Please sign in first" update_btn.click( handle_update_financials, inputs=[income, savings_goal], outputs=[income_status] ) def handle_save_allocations(*allocations): if current_user.value: return save_allocations(current_user.value, *allocations) else: return "❌ Please sign in first", [] allocate_btn.click( handle_save_allocations, inputs=allocation_inputs, outputs=[allocation_status, expense_table] ) def handle_record_expense(category, amount, description, is_recurring, recurrence_pattern): if current_user.value: return record_expense(current_user.value, category, amount, description, is_recurring, recurrence_pattern) else: return "❌ Please sign in first", "
💰 0 PKR
", [], [] record_expense_btn.click( handle_record_expense, inputs=[expense_category, expense_amount, expense_description, is_recurring, recurrence_pattern], outputs=[expense_status, balance_display, expense_table, spending_log_table] ) def handle_add_investment(inv_type, name, amount, notes): if current_user.value: return add_investment(current_user.value, inv_type, name, amount, notes) else: return "❌ Please sign in first", "
💰 0 PKR
", [] add_investment_btn.click( handle_add_investment, inputs=[investment_type, investment_name, investment_amount, investment_notes], outputs=[investment_status, balance_display, investments_table] ) def handle_create_family_group(group_name): if current_user.value: return create_family_group(current_user.value, group_name) else: return "❌ Please sign in first", "", [] create_group_btn.click( handle_create_family_group, inputs=[create_group_name], outputs=[family_status, family_info, family_members] ) def handle_join_family_group(group_id): if current_user.value: return join_family_group(current_user.value, group_id) else: return "❌ Please sign in first", "", [] join_group_btn.click( handle_join_family_group, inputs=[join_group_id], outputs=[family_status, family_info, family_members] ) def handle_update_charts(months_history): if current_user.value: return generate_spending_chart(current_user.value, months_history), generate_balance_chart(current_user.value) else: return None, None update_charts_btn.click( handle_update_charts, inputs=[months_history], outputs=[spending_chart, balance_chart] ) # Receipt Processing Event Handlers - NEW! process_receipt_btn.click( handle_receipt_upload, inputs=[receipt_image, current_user], outputs=[receipt_status, receipt_data, extracted_merchant, extracted_amount, extracted_date, line_items_table, receipt_preview, extracted_category] ) save_receipt_btn.click( handle_receipt_save, inputs=[current_user, receipt_data, extracted_merchant, extracted_amount, extracted_date, extracted_category, line_items_table], outputs=[receipt_status, balance_display, expense_table, spending_log_table] ) if __name__ == "__main__": print("🚀 Starting FinGenius Pro...") print("📱 WhatsApp Integration Status:") print(f" Twilio Available: {TWILIO_AVAILABLE}") print(f" Service Enabled: {twilio.enabled}") print("🔍 OCR Services Status:") print(f" Tesseract Available: {TESSERACT_AVAILABLE}") print(f" Google Vision Available: {VISION_API_AVAILABLE}") print("🖼️ Image Processing Status:") print(f" PIL/OpenCV Available: {PIL_AVAILABLE}") print("") print("📋 Setup Instructions:") if not twilio.enabled: print(" 1. Set TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN environment variables") print(" 2. Or modify credentials directly in TwilioWhatsAppService class") print(" 3. Users must send 'join catch-manner' to +14155238886 to activate WhatsApp") print(" 4. Use the same phone number for both WhatsApp activation and app registration") print("") demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )