Subh775's picture
Update app.py
e858642 verified
import os
import cv2
import json
import base64
import sqlite3
from datetime import datetime, timedelta
import pytz
from flask import Flask, render_template, request, jsonify, Response
from deepface import DeepFace
import numpy as np
import threading
import logging
from werkzeug.utils import secure_filename
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['UPLOAD_FOLDER'] = 'static/known_faces'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
# Create upload directory if it doesn't exist
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# Indian Standard Time
IST = pytz.timezone('Asia/Kolkata')
class DatabaseManager:
def __init__(self, db_path='attendance.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the database with required tables"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
face_encoding_path TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Attendance table
cursor.execute('''
CREATE TABLE IF NOT EXISTS attendance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
date DATE,
check_in_time TIMESTAMP,
check_out_time TIMESTAMP,
status TEXT DEFAULT 'present',
FOREIGN KEY (user_id) REFERENCES users (id),
UNIQUE(user_id, date)
)
''')
conn.commit()
conn.close()
def add_user(self, name, face_image_path):
"""Add a new user to the database"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('INSERT INTO users (name, face_encoding_path) VALUES (?, ?)',
(name, face_image_path))
user_id = cursor.lastrowid
conn.commit()
conn.close()
return user_id
except sqlite3.IntegrityError:
return None
def get_all_users(self):
"""Get all users from database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT id, name, face_encoding_path FROM users')
users = cursor.fetchall()
conn.close()
return users
def mark_attendance(self, user_id, attendance_type='check_in'):
"""Mark attendance for a user"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get current date and time in IST
now = datetime.now(IST)
today = now.date()
current_time = now
if attendance_type == 'check_in':
# Check if user already checked in today
cursor.execute('''
SELECT id, check_in_time FROM attendance
WHERE user_id = ? AND date = ?
''', (user_id, today))
existing = cursor.fetchone()
if existing:
conn.close()
return {'success': False, 'message': 'Already checked in today'}
# Insert new attendance record
cursor.execute('''
INSERT INTO attendance (user_id, date, check_in_time, status)
VALUES (?, ?, ?, 'present')
''', (user_id, today, current_time))
elif attendance_type == 'check_out':
# Update existing record with check-out time
cursor.execute('''
UPDATE attendance
SET check_out_time = ?
WHERE user_id = ? AND date = ? AND check_out_time IS NULL
''', (current_time, user_id, today))
if cursor.rowcount == 0:
conn.close()
return {'success': False, 'message': 'No check-in record found or already checked out'}
conn.commit()
conn.close()
return {'success': True, 'message': f'Successfully {attendance_type.replace("_", "-")}'}
except Exception as e:
logger.error(f"Error marking attendance: {e}")
return {'success': False, 'message': 'Database error'}
def get_today_stats(self):
"""Get today's attendance statistics"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
today = datetime.now(IST).date()
# Total users
cursor.execute('SELECT COUNT(*) FROM users')
total_users = cursor.fetchone()[0]
# Present today (checked in)
cursor.execute('''
SELECT COUNT(*) FROM attendance
WHERE date = ? AND check_in_time IS NOT NULL
''', (today,))
present_today = cursor.fetchone()[0]
# Absent today
absent_today = total_users - present_today
# Get today's attendance records with user names
cursor.execute('''
SELECT u.name, a.check_in_time, a.check_out_time
FROM attendance a
JOIN users u ON a.user_id = u.id
WHERE a.date = ?
ORDER BY a.check_in_time DESC
''', (today,))
today_attendance = cursor.fetchall()
conn.close()
return {
'total_users': total_users,
'present_today': present_today,
'absent_today': absent_today,
'today_attendance': today_attendance
}
class FaceRecognizer:
def __init__(self, known_faces_dir='static/known_faces'):
self.known_faces_dir = known_faces_dir
self.known_faces = {}
self.load_known_faces()
def load_known_faces(self):
"""Load known faces from the directory"""
self.known_faces = {}
if not os.path.exists(self.known_faces_dir):
os.makedirs(self.known_faces_dir)
return
db_manager = DatabaseManager()
users = db_manager.get_all_users()
for user_id, name, face_path in users:
full_path = os.path.join(self.known_faces_dir, face_path) if face_path else None
if full_path and os.path.exists(full_path):
self.known_faces[name] = {
'user_id': user_id,
'image_path': full_path
}
logger.info(f"Loaded {len(self.known_faces)} known faces")
def recognize_face(self, frame):
"""Recognize face in the given frame"""
try:
if not self.known_faces:
return None, 0
# Convert frame to RGB for DeepFace
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Save temporary frame for DeepFace
temp_path = 'temp_frame.jpg'
cv2.imwrite(temp_path, frame)
best_match = None
highest_confidence = 0
for name, face_data in self.known_faces.items():
try:
# Use DeepFace to verify faces
result = DeepFace.verify(
img1_path=temp_path,
img2_path=face_data['image_path'],
model_name='VGG-Face',
distance_metric='cosine',
enforce_detection=False
)
if result['verified']:
confidence = (1 - result['distance']) * 100
if confidence > highest_confidence and confidence > 60: # 60% threshold
highest_confidence = confidence
best_match = {
'name': name,
'user_id': face_data['user_id'],
'confidence': confidence
}
except Exception as e:
logger.debug(f"Recognition error for {name}: {e}")
continue
# Clean up temp file
if os.path.exists(temp_path):
os.remove(temp_path)
if best_match:
return best_match, highest_confidence
return None, 0
except Exception as e:
logger.error(f"Face recognition error: {e}")
return None, 0
# Global instances
db_manager = DatabaseManager()
face_recognizer = FaceRecognizer()
@app.route('/')
def index():
"""Main page"""
stats = db_manager.get_today_stats()
return render_template('index.html', stats=stats)
@app.route('/api/stats')
def get_stats():
"""Get current statistics"""
stats = db_manager.get_today_stats()
# Format attendance records for display
formatted_attendance = []
for name, check_in, check_out in stats['today_attendance']:
record = {
'name': name,
'check_in': None,
'check_out': None
}
if check_in:
check_in_dt = datetime.fromisoformat(check_in.replace('Z', '+00:00'))
if check_in_dt.tzinfo is None:
check_in_dt = pytz.utc.localize(check_in_dt)
check_in_ist = check_in_dt.astimezone(IST)
record['check_in'] = check_in_ist.strftime('%I:%M %p')
if check_out:
check_out_dt = datetime.fromisoformat(check_out.replace('Z', '+00:00'))
if check_out_dt.tzinfo is None:
check_out_dt = pytz.utc.localize(check_out_dt)
check_out_ist = check_out_dt.astimezone(IST)
record['check_out'] = check_out_ist.strftime('%I:%M %p')
formatted_attendance.append(record)
stats['today_attendance'] = formatted_attendance
return jsonify(stats)
@app.route('/api/add_user', methods=['POST'])
def add_user():
"""Add a new user with face image"""
try:
name = request.form.get('name')
if not name:
return jsonify({'success': False, 'message': 'Name is required'})
if 'face_image' not in request.files:
return jsonify({'success': False, 'message': 'Face image is required'})
file = request.files['face_image']
if file.filename == '':
return jsonify({'success': False, 'message': 'No file selected'})
# Save the uploaded file
filename = secure_filename(f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg")
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
# Add user to database
user_id = db_manager.add_user(name, filename)
if user_id:
# Reload known faces
face_recognizer.load_known_faces()
return jsonify({'success': True, 'message': f'User {name} added successfully'})
else:
# Remove the uploaded file if database insertion failed
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({'success': False, 'message': 'User already exists'})
except Exception as e:
logger.error(f"Error adding user: {e}")
return jsonify({'success': False, 'message': 'Server error'})
@app.route('/api/recognize', methods=['POST'])
def recognize_and_mark():
"""Recognize face and mark attendance"""
try:
data = request.get_json()
image_data = data.get('image')
attendance_type = data.get('type', 'check_in') # 'check_in' or 'check_out'
if not image_data:
return jsonify({'success': False, 'message': 'No image data provided'})
# Decode base64 image
image_data = image_data.split(',')[1] # Remove data:image/jpeg;base64,
image_bytes = base64.b64decode(image_data)
# Convert to numpy array
nparr = np.frombuffer(image_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Recognize face
result, confidence = face_recognizer.recognize_face(frame)
if result and confidence > 70: # Higher threshold for attendance marking
# Mark attendance
attendance_result = db_manager.mark_attendance(result['user_id'], attendance_type)
if attendance_result['success']:
current_time = datetime.now(IST).strftime('%I:%M %p')
return jsonify({
'success': True,
'name': result['name'],
'confidence': round(confidence, 2),
'time': current_time,
'type': attendance_type,
'message': attendance_result['message']
})
else:
return jsonify({
'success': False,
'name': result['name'],
'confidence': round(confidence, 2),
'message': attendance_result['message']
})
else:
return jsonify({'success': False, 'message': 'Face not recognized or confidence too low'})
except Exception as e:
logger.error(f"Recognition error: {e}")
return jsonify({'success': False, 'message': 'Recognition failed'})
@app.route('/api/reload_faces')
def reload_faces():
"""Reload known faces"""
try:
face_recognizer.load_known_faces()
return jsonify({'success': True, 'message': 'Faces reloaded successfully'})
except Exception as e:
logger.error(f"Error reloading faces: {e}")
return jsonify({'success': False, 'message': 'Failed to reload faces'})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=7860)