import os import cv2 import json import base64 import sqlite3 from datetime import datetime, timedelta import pytz from flask import Flask, render_template, request, jsonify, Response from deepface import DeepFace import numpy as np import threading import logging from werkzeug.utils import secure_filename # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key-here' app.config['UPLOAD_FOLDER'] = 'static/known_faces' app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size # Create upload directory if it doesn't exist os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # Indian Standard Time IST = pytz.timezone('Asia/Kolkata') class DatabaseManager: def __init__(self, db_path='attendance.db'): self.db_path = db_path self.init_database() def init_database(self): """Initialize the database with required tables""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Users table cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, face_encoding_path TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Attendance table cursor.execute(''' CREATE TABLE IF NOT EXISTS attendance ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, date DATE, check_in_time TIMESTAMP, check_out_time TIMESTAMP, status TEXT DEFAULT 'present', FOREIGN KEY (user_id) REFERENCES users (id), UNIQUE(user_id, date) ) ''') conn.commit() conn.close() def add_user(self, name, face_image_path): """Add a new user to the database""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('INSERT INTO users (name, face_encoding_path) VALUES (?, ?)', (name, face_image_path)) user_id = cursor.lastrowid conn.commit() conn.close() return user_id except sqlite3.IntegrityError: return None def get_all_users(self): """Get all users from database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT id, name, face_encoding_path FROM users') users = cursor.fetchall() conn.close() return users def mark_attendance(self, user_id, attendance_type='check_in'): """Mark attendance for a user""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get current date and time in IST now = datetime.now(IST) today = now.date() current_time = now if attendance_type == 'check_in': # Check if user already checked in today cursor.execute(''' SELECT id, check_in_time FROM attendance WHERE user_id = ? AND date = ? ''', (user_id, today)) existing = cursor.fetchone() if existing: conn.close() return {'success': False, 'message': 'Already checked in today'} # Insert new attendance record cursor.execute(''' INSERT INTO attendance (user_id, date, check_in_time, status) VALUES (?, ?, ?, 'present') ''', (user_id, today, current_time)) elif attendance_type == 'check_out': # Update existing record with check-out time cursor.execute(''' UPDATE attendance SET check_out_time = ? WHERE user_id = ? AND date = ? AND check_out_time IS NULL ''', (current_time, user_id, today)) if cursor.rowcount == 0: conn.close() return {'success': False, 'message': 'No check-in record found or already checked out'} conn.commit() conn.close() return {'success': True, 'message': f'Successfully {attendance_type.replace("_", "-")}'} except Exception as e: logger.error(f"Error marking attendance: {e}") return {'success': False, 'message': 'Database error'} def get_today_stats(self): """Get today's attendance statistics""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() today = datetime.now(IST).date() # Total users cursor.execute('SELECT COUNT(*) FROM users') total_users = cursor.fetchone()[0] # Present today (checked in) cursor.execute(''' SELECT COUNT(*) FROM attendance WHERE date = ? AND check_in_time IS NOT NULL ''', (today,)) present_today = cursor.fetchone()[0] # Absent today absent_today = total_users - present_today # Get today's attendance records with user names cursor.execute(''' SELECT u.name, a.check_in_time, a.check_out_time FROM attendance a JOIN users u ON a.user_id = u.id WHERE a.date = ? ORDER BY a.check_in_time DESC ''', (today,)) today_attendance = cursor.fetchall() conn.close() return { 'total_users': total_users, 'present_today': present_today, 'absent_today': absent_today, 'today_attendance': today_attendance } class FaceRecognizer: def __init__(self, known_faces_dir='static/known_faces'): self.known_faces_dir = known_faces_dir self.known_faces = {} self.load_known_faces() def load_known_faces(self): """Load known faces from the directory""" self.known_faces = {} if not os.path.exists(self.known_faces_dir): os.makedirs(self.known_faces_dir) return db_manager = DatabaseManager() users = db_manager.get_all_users() for user_id, name, face_path in users: full_path = os.path.join(self.known_faces_dir, face_path) if face_path else None if full_path and os.path.exists(full_path): self.known_faces[name] = { 'user_id': user_id, 'image_path': full_path } logger.info(f"Loaded {len(self.known_faces)} known faces") def recognize_face(self, frame): """Recognize face in the given frame""" try: if not self.known_faces: return None, 0 # Convert frame to RGB for DeepFace rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Save temporary frame for DeepFace temp_path = 'temp_frame.jpg' cv2.imwrite(temp_path, frame) best_match = None highest_confidence = 0 for name, face_data in self.known_faces.items(): try: # Use DeepFace to verify faces result = DeepFace.verify( img1_path=temp_path, img2_path=face_data['image_path'], model_name='VGG-Face', distance_metric='cosine', enforce_detection=False ) if result['verified']: confidence = (1 - result['distance']) * 100 if confidence > highest_confidence and confidence > 60: # 60% threshold highest_confidence = confidence best_match = { 'name': name, 'user_id': face_data['user_id'], 'confidence': confidence } except Exception as e: logger.debug(f"Recognition error for {name}: {e}") continue # Clean up temp file if os.path.exists(temp_path): os.remove(temp_path) if best_match: return best_match, highest_confidence return None, 0 except Exception as e: logger.error(f"Face recognition error: {e}") return None, 0 # Global instances db_manager = DatabaseManager() face_recognizer = FaceRecognizer() @app.route('/') def index(): """Main page""" stats = db_manager.get_today_stats() return render_template('index.html', stats=stats) @app.route('/api/stats') def get_stats(): """Get current statistics""" stats = db_manager.get_today_stats() # Format attendance records for display formatted_attendance = [] for name, check_in, check_out in stats['today_attendance']: record = { 'name': name, 'check_in': None, 'check_out': None } if check_in: check_in_dt = datetime.fromisoformat(check_in.replace('Z', '+00:00')) if check_in_dt.tzinfo is None: check_in_dt = pytz.utc.localize(check_in_dt) check_in_ist = check_in_dt.astimezone(IST) record['check_in'] = check_in_ist.strftime('%I:%M %p') if check_out: check_out_dt = datetime.fromisoformat(check_out.replace('Z', '+00:00')) if check_out_dt.tzinfo is None: check_out_dt = pytz.utc.localize(check_out_dt) check_out_ist = check_out_dt.astimezone(IST) record['check_out'] = check_out_ist.strftime('%I:%M %p') formatted_attendance.append(record) stats['today_attendance'] = formatted_attendance return jsonify(stats) @app.route('/api/add_user', methods=['POST']) def add_user(): """Add a new user with face image""" try: name = request.form.get('name') if not name: return jsonify({'success': False, 'message': 'Name is required'}) if 'face_image' not in request.files: return jsonify({'success': False, 'message': 'Face image is required'}) file = request.files['face_image'] if file.filename == '': return jsonify({'success': False, 'message': 'No file selected'}) # Save the uploaded file filename = secure_filename(f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg") filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # Add user to database user_id = db_manager.add_user(name, filename) if user_id: # Reload known faces face_recognizer.load_known_faces() return jsonify({'success': True, 'message': f'User {name} added successfully'}) else: # Remove the uploaded file if database insertion failed if os.path.exists(filepath): os.remove(filepath) return jsonify({'success': False, 'message': 'User already exists'}) except Exception as e: logger.error(f"Error adding user: {e}") return jsonify({'success': False, 'message': 'Server error'}) @app.route('/api/recognize', methods=['POST']) def recognize_and_mark(): """Recognize face and mark attendance""" try: data = request.get_json() image_data = data.get('image') attendance_type = data.get('type', 'check_in') # 'check_in' or 'check_out' if not image_data: return jsonify({'success': False, 'message': 'No image data provided'}) # Decode base64 image image_data = image_data.split(',')[1] # Remove data:image/jpeg;base64, image_bytes = base64.b64decode(image_data) # Convert to numpy array nparr = np.frombuffer(image_bytes, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # Recognize face result, confidence = face_recognizer.recognize_face(frame) if result and confidence > 70: # Higher threshold for attendance marking # Mark attendance attendance_result = db_manager.mark_attendance(result['user_id'], attendance_type) if attendance_result['success']: current_time = datetime.now(IST).strftime('%I:%M %p') return jsonify({ 'success': True, 'name': result['name'], 'confidence': round(confidence, 2), 'time': current_time, 'type': attendance_type, 'message': attendance_result['message'] }) else: return jsonify({ 'success': False, 'name': result['name'], 'confidence': round(confidence, 2), 'message': attendance_result['message'] }) else: return jsonify({'success': False, 'message': 'Face not recognized or confidence too low'}) except Exception as e: logger.error(f"Recognition error: {e}") return jsonify({'success': False, 'message': 'Recognition failed'}) @app.route('/api/reload_faces') def reload_faces(): """Reload known faces""" try: face_recognizer.load_known_faces() return jsonify({'success': True, 'message': 'Faces reloaded successfully'}) except Exception as e: logger.error(f"Error reloading faces: {e}") return jsonify({'success': False, 'message': 'Failed to reload faces'}) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=7860)