Spaces:
Sleeping
Sleeping
import os | |
import cv2 | |
import json | |
import base64 | |
import sqlite3 | |
from datetime import datetime, timedelta | |
import pytz | |
from flask import Flask, render_template, request, jsonify, Response | |
from deepface import DeepFace | |
import numpy as np | |
import threading | |
import logging | |
from werkzeug.utils import secure_filename | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = 'your-secret-key-here' | |
app.config['UPLOAD_FOLDER'] = 'static/known_faces' | |
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size | |
# Create upload directory if it doesn't exist | |
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
# Indian Standard Time | |
IST = pytz.timezone('Asia/Kolkata') | |
class DatabaseManager: | |
def __init__(self, db_path='attendance.db'): | |
self.db_path = db_path | |
self.init_database() | |
def init_database(self): | |
"""Initialize the database with required tables""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
# Users table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT NOT NULL UNIQUE, | |
face_encoding_path TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
# Attendance table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS attendance ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id INTEGER, | |
date DATE, | |
check_in_time TIMESTAMP, | |
check_out_time TIMESTAMP, | |
status TEXT DEFAULT 'present', | |
FOREIGN KEY (user_id) REFERENCES users (id), | |
UNIQUE(user_id, date) | |
) | |
''') | |
conn.commit() | |
conn.close() | |
def add_user(self, name, face_image_path): | |
"""Add a new user to the database""" | |
try: | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute('INSERT INTO users (name, face_encoding_path) VALUES (?, ?)', | |
(name, face_image_path)) | |
user_id = cursor.lastrowid | |
conn.commit() | |
conn.close() | |
return user_id | |
except sqlite3.IntegrityError: | |
return None | |
def get_all_users(self): | |
"""Get all users from database""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute('SELECT id, name, face_encoding_path FROM users') | |
users = cursor.fetchall() | |
conn.close() | |
return users | |
def mark_attendance(self, user_id, attendance_type='check_in'): | |
"""Mark attendance for a user""" | |
try: | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
# Get current date and time in IST | |
now = datetime.now(IST) | |
today = now.date() | |
current_time = now | |
if attendance_type == 'check_in': | |
# Check if user already checked in today | |
cursor.execute(''' | |
SELECT id, check_in_time FROM attendance | |
WHERE user_id = ? AND date = ? | |
''', (user_id, today)) | |
existing = cursor.fetchone() | |
if existing: | |
conn.close() | |
return {'success': False, 'message': 'Already checked in today'} | |
# Insert new attendance record | |
cursor.execute(''' | |
INSERT INTO attendance (user_id, date, check_in_time, status) | |
VALUES (?, ?, ?, 'present') | |
''', (user_id, today, current_time)) | |
elif attendance_type == 'check_out': | |
# Update existing record with check-out time | |
cursor.execute(''' | |
UPDATE attendance | |
SET check_out_time = ? | |
WHERE user_id = ? AND date = ? AND check_out_time IS NULL | |
''', (current_time, user_id, today)) | |
if cursor.rowcount == 0: | |
conn.close() | |
return {'success': False, 'message': 'No check-in record found or already checked out'} | |
conn.commit() | |
conn.close() | |
return {'success': True, 'message': f'Successfully {attendance_type.replace("_", "-")}'} | |
except Exception as e: | |
logger.error(f"Error marking attendance: {e}") | |
return {'success': False, 'message': 'Database error'} | |
def get_today_stats(self): | |
"""Get today's attendance statistics""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
today = datetime.now(IST).date() | |
# Total users | |
cursor.execute('SELECT COUNT(*) FROM users') | |
total_users = cursor.fetchone()[0] | |
# Present today (checked in) | |
cursor.execute(''' | |
SELECT COUNT(*) FROM attendance | |
WHERE date = ? AND check_in_time IS NOT NULL | |
''', (today,)) | |
present_today = cursor.fetchone()[0] | |
# Absent today | |
absent_today = total_users - present_today | |
# Get today's attendance records with user names | |
cursor.execute(''' | |
SELECT u.name, a.check_in_time, a.check_out_time | |
FROM attendance a | |
JOIN users u ON a.user_id = u.id | |
WHERE a.date = ? | |
ORDER BY a.check_in_time DESC | |
''', (today,)) | |
today_attendance = cursor.fetchall() | |
conn.close() | |
return { | |
'total_users': total_users, | |
'present_today': present_today, | |
'absent_today': absent_today, | |
'today_attendance': today_attendance | |
} | |
class FaceRecognizer: | |
def __init__(self, known_faces_dir='static/known_faces'): | |
self.known_faces_dir = known_faces_dir | |
self.known_faces = {} | |
self.load_known_faces() | |
def load_known_faces(self): | |
"""Load known faces from the directory""" | |
self.known_faces = {} | |
if not os.path.exists(self.known_faces_dir): | |
os.makedirs(self.known_faces_dir) | |
return | |
db_manager = DatabaseManager() | |
users = db_manager.get_all_users() | |
for user_id, name, face_path in users: | |
full_path = os.path.join(self.known_faces_dir, face_path) if face_path else None | |
if full_path and os.path.exists(full_path): | |
self.known_faces[name] = { | |
'user_id': user_id, | |
'image_path': full_path | |
} | |
logger.info(f"Loaded {len(self.known_faces)} known faces") | |
def recognize_face(self, frame): | |
"""Recognize face in the given frame""" | |
try: | |
if not self.known_faces: | |
return None, 0 | |
# Convert frame to RGB for DeepFace | |
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Save temporary frame for DeepFace | |
temp_path = 'temp_frame.jpg' | |
cv2.imwrite(temp_path, frame) | |
best_match = None | |
highest_confidence = 0 | |
for name, face_data in self.known_faces.items(): | |
try: | |
# Use DeepFace to verify faces | |
result = DeepFace.verify( | |
img1_path=temp_path, | |
img2_path=face_data['image_path'], | |
model_name='VGG-Face', | |
distance_metric='cosine', | |
enforce_detection=False | |
) | |
if result['verified']: | |
confidence = (1 - result['distance']) * 100 | |
if confidence > highest_confidence and confidence > 60: # 60% threshold | |
highest_confidence = confidence | |
best_match = { | |
'name': name, | |
'user_id': face_data['user_id'], | |
'confidence': confidence | |
} | |
except Exception as e: | |
logger.debug(f"Recognition error for {name}: {e}") | |
continue | |
# Clean up temp file | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
if best_match: | |
return best_match, highest_confidence | |
return None, 0 | |
except Exception as e: | |
logger.error(f"Face recognition error: {e}") | |
return None, 0 | |
# Global instances | |
db_manager = DatabaseManager() | |
face_recognizer = FaceRecognizer() | |
def index(): | |
"""Main page""" | |
stats = db_manager.get_today_stats() | |
return render_template('index.html', stats=stats) | |
def get_stats(): | |
"""Get current statistics""" | |
stats = db_manager.get_today_stats() | |
# Format attendance records for display | |
formatted_attendance = [] | |
for name, check_in, check_out in stats['today_attendance']: | |
record = { | |
'name': name, | |
'check_in': None, | |
'check_out': None | |
} | |
if check_in: | |
check_in_dt = datetime.fromisoformat(check_in.replace('Z', '+00:00')) | |
if check_in_dt.tzinfo is None: | |
check_in_dt = pytz.utc.localize(check_in_dt) | |
check_in_ist = check_in_dt.astimezone(IST) | |
record['check_in'] = check_in_ist.strftime('%I:%M %p') | |
if check_out: | |
check_out_dt = datetime.fromisoformat(check_out.replace('Z', '+00:00')) | |
if check_out_dt.tzinfo is None: | |
check_out_dt = pytz.utc.localize(check_out_dt) | |
check_out_ist = check_out_dt.astimezone(IST) | |
record['check_out'] = check_out_ist.strftime('%I:%M %p') | |
formatted_attendance.append(record) | |
stats['today_attendance'] = formatted_attendance | |
return jsonify(stats) | |
def add_user(): | |
"""Add a new user with face image""" | |
try: | |
name = request.form.get('name') | |
if not name: | |
return jsonify({'success': False, 'message': 'Name is required'}) | |
if 'face_image' not in request.files: | |
return jsonify({'success': False, 'message': 'Face image is required'}) | |
file = request.files['face_image'] | |
if file.filename == '': | |
return jsonify({'success': False, 'message': 'No file selected'}) | |
# Save the uploaded file | |
filename = secure_filename(f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg") | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
file.save(filepath) | |
# Add user to database | |
user_id = db_manager.add_user(name, filename) | |
if user_id: | |
# Reload known faces | |
face_recognizer.load_known_faces() | |
return jsonify({'success': True, 'message': f'User {name} added successfully'}) | |
else: | |
# Remove the uploaded file if database insertion failed | |
if os.path.exists(filepath): | |
os.remove(filepath) | |
return jsonify({'success': False, 'message': 'User already exists'}) | |
except Exception as e: | |
logger.error(f"Error adding user: {e}") | |
return jsonify({'success': False, 'message': 'Server error'}) | |
def recognize_and_mark(): | |
"""Recognize face and mark attendance""" | |
try: | |
data = request.get_json() | |
image_data = data.get('image') | |
attendance_type = data.get('type', 'check_in') # 'check_in' or 'check_out' | |
if not image_data: | |
return jsonify({'success': False, 'message': 'No image data provided'}) | |
# Decode base64 image | |
image_data = image_data.split(',')[1] # Remove data:image/jpeg;base64, | |
image_bytes = base64.b64decode(image_data) | |
# Convert to numpy array | |
nparr = np.frombuffer(image_bytes, np.uint8) | |
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
# Recognize face | |
result, confidence = face_recognizer.recognize_face(frame) | |
if result and confidence > 70: # Higher threshold for attendance marking | |
# Mark attendance | |
attendance_result = db_manager.mark_attendance(result['user_id'], attendance_type) | |
if attendance_result['success']: | |
current_time = datetime.now(IST).strftime('%I:%M %p') | |
return jsonify({ | |
'success': True, | |
'name': result['name'], | |
'confidence': round(confidence, 2), | |
'time': current_time, | |
'type': attendance_type, | |
'message': attendance_result['message'] | |
}) | |
else: | |
return jsonify({ | |
'success': False, | |
'name': result['name'], | |
'confidence': round(confidence, 2), | |
'message': attendance_result['message'] | |
}) | |
else: | |
return jsonify({'success': False, 'message': 'Face not recognized or confidence too low'}) | |
except Exception as e: | |
logger.error(f"Recognition error: {e}") | |
return jsonify({'success': False, 'message': 'Recognition failed'}) | |
def reload_faces(): | |
"""Reload known faces""" | |
try: | |
face_recognizer.load_known_faces() | |
return jsonify({'success': True, 'message': 'Faces reloaded successfully'}) | |
except Exception as e: | |
logger.error(f"Error reloading faces: {e}") | |
return jsonify({'success': False, 'message': 'Failed to reload faces'}) | |
if __name__ == '__main__': | |
app.run(debug=True, host='0.0.0.0', port=7860) |