Spaces:
Sleeping
Sleeping
import cv2 | |
import os | |
import numpy as np | |
from deepface import DeepFace | |
import logging | |
from typing import Dict, List, Tuple, Optional | |
import sqlite3 | |
from datetime import datetime | |
import pytz | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class EnhancedFaceRecognizer: | |
"""Enhanced face recognition system using DeepFace with optimizations""" | |
def __init__(self, known_faces_dir: str = 'static/known_faces', db_path: str = 'attendance.db'): | |
self.known_faces_dir = known_faces_dir | |
self.db_path = db_path | |
self.known_faces = {} | |
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
self.models = ['VGG-Face', 'Facenet', 'OpenFace'] # Multiple models for better accuracy | |
self.current_model = 'VGG-Face' | |
self.recognition_threshold = 0.4 # Cosine distance threshold | |
self.confidence_threshold = 65 # Minimum confidence percentage | |
# Create directories if they don't exist | |
os.makedirs(self.known_faces_dir, exist_ok=True) | |
# Load known faces | |
self.load_known_faces() | |
def load_known_faces(self) -> None: | |
"""Load known faces from database and file system""" | |
try: | |
self.known_faces = {} | |
# Connect to database | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
# Get all users with face images | |
cursor.execute('SELECT id, name, face_encoding_path FROM users WHERE face_encoding_path IS NOT NULL') | |
users = cursor.fetchall() | |
conn.close() | |
for user_id, name, face_path in users: | |
full_path = os.path.join(self.known_faces_dir, face_path) | |
if os.path.exists(full_path): | |
# Validate image file | |
if self._validate_image(full_path): | |
self.known_faces[name] = { | |
'user_id': user_id, | |
'image_path': full_path, | |
'embeddings': {} # Cache for embeddings | |
} | |
else: | |
logger.warning(f"Invalid image file for user {name}: {full_path}") | |
else: | |
logger.warning(f"Image file not found for user {name}: {full_path}") | |
logger.info(f"Loaded {len(self.known_faces)} known faces") | |
except Exception as e: | |
logger.error(f"Error loading known faces: {e}") | |
self.known_faces = {} | |
def _validate_image(self, image_path: str) -> bool: | |
"""Validate if image file is readable and contains a face""" | |
try: | |
image = cv2.imread(image_path) | |
if image is None: | |
return False | |
# Check if image contains at least one face | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
faces = self.face_cascade.detectMultiScale(gray, 1.1, 4) | |
return len(faces) > 0 | |
except Exception as e: | |
logger.error(f"Error validating image {image_path}: {e}") | |
return False | |
def preprocess_image(self, image: np.ndarray) -> np.ndarray: | |
"""Preprocess image for better recognition""" | |
try: | |
# Convert to RGB if needed | |
if len(image.shape) == 3 and image.shape[2] == 3: | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
# Enhance image quality | |
# 1. Histogram equalization for better contrast | |
if len(image.shape) == 3: | |
# Convert to LAB color space | |
lab = cv2.cvtColor(image, cv2.COLOR_RGB2LAB) | |
l, a, b = cv2.split(lab) | |
# Apply CLAHE to L channel | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
l = clahe.apply(l) | |
# Merge channels | |
enhanced = cv2.merge([l, a, b]) | |
image = cv2.cvtColor(enhanced, cv2.COLOR_LAB2RGB) | |
else: | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
image = clahe.apply(image) | |
# 2. Gaussian blur to reduce noise | |
image = cv2.GaussianBlur(image, (1, 1), 0) | |
return image | |
except Exception as e: | |
logger.error(f"Error preprocessing image: {e}") | |
return image | |
def detect_faces(self, image: np.ndarray) -> List[Tuple[int, int, int, int]]: | |
"""Detect faces in image using Haar cascade""" | |
try: | |
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape) == 3 else image | |
faces = self.face_cascade.detectMultiScale( | |
gray, | |
scaleFactor=1.1, | |
minNeighbors=5, | |
minSize=(30, 30), | |
flags=cv2.CASCADE_SCALE_IMAGE | |
) | |
return faces.tolist() | |
except Exception as e: | |
logger.error(f"Error detecting faces: {e}") | |
return [] | |
def extract_face_region(self, image: np.ndarray, face_coords: Tuple[int, int, int, int]) -> np.ndarray: | |
"""Extract face region from image with padding""" | |
try: | |
x, y, w, h = face_coords | |
# Add padding around face | |
padding = int(min(w, h) * 0.2) | |
# Calculate padded coordinates | |
x1 = max(0, x - padding) | |
y1 = max(0, y - padding) | |
x2 = min(image.shape[1], x + w + padding) | |
y2 = min(image.shape[0], y + h + padding) | |
# Extract face region | |
face_region = image[y1:y2, x1:x2] | |
return face_region | |
except Exception as e: | |
logger.error(f"Error extracting face region: {e}") | |
return image | |
def get_face_embedding(self, image_path: str, model_name: str = None) -> Optional[np.ndarray]: | |
"""Get face embedding using DeepFace""" | |
try: | |
if model_name is None: | |
model_name = self.current_model | |
# Use DeepFace to get embedding | |
embedding = DeepFace.represent( | |
img_path=image_path, | |
model_name=model_name, | |
enforce_detection=False, | |
detector_backend='opencv' | |
) | |
if isinstance(embedding, list) and len(embedding) > 0: | |
return np.array(embedding[0]['embedding']) | |
elif isinstance(embedding, dict): | |
return np.array(embedding['embedding']) | |
else: | |
return None | |
except Exception as e: | |
logger.debug(f"Error getting embedding for {image_path} with {model_name}: {e}") | |
return None | |
def compare_faces(self, img1_path: str, img2_path: str, model_name: str = None) -> Dict: | |
"""Compare two faces using DeepFace""" | |
try: | |
if model_name is None: | |
model_name = self.current_model | |
result = DeepFace.verify( | |
img1_path=img1_path, | |
img2_path=img2_path, | |
model_name=model_name, | |
distance_metric='cosine', | |
enforce_detection=False, | |
detector_backend='opencv' | |
) | |
return result | |
except Exception as e: | |
logger.debug(f"Error comparing faces: {e}") | |
return {'verified': False, 'distance': 1.0} | |
def recognize_face_advanced(self, frame: np.ndarray, use_multiple_models: bool = True) -> Tuple[Optional[Dict], float]: | |
"""Advanced face recognition with multiple models and preprocessing""" | |
try: | |
if not self.known_faces: | |
return None, 0 | |
# Preprocess the frame | |
processed_frame = self.preprocess_image(frame.copy()) | |
# Detect faces in the frame | |
faces = self.detect_faces(processed_frame) | |
if not faces: | |
return None, 0 | |
# Use the largest detected face | |
largest_face = max(faces, key=lambda f: f[2] * f[3]) | |
# Extract face region | |
face_region = self.extract_face_region(processed_frame, largest_face) | |
# Save temporary frame for DeepFace | |
temp_path = 'temp_recognition_frame.jpg' | |
# Convert back to BGR for saving | |
if len(face_region.shape) == 3: | |
face_bgr = cv2.cvtColor(face_region, cv2.COLOR_RGB2BGR) | |
else: | |
face_bgr = face_region | |
cv2.imwrite(temp_path, face_bgr) | |
best_match = None | |
highest_confidence = 0 | |
# Models to try | |
models_to_use = self.models if use_multiple_models else [self.current_model] | |
for name, face_data in self.known_faces.items(): | |
best_model_result = None | |
best_model_confidence = 0 | |
# Try multiple models for this face | |
for model in models_to_use: | |
try: | |
result = self.compare_faces(temp_path, face_data['image_path'], model) | |
if result['verified'] and result['distance'] < self.recognition_threshold: | |
confidence = (1 - result['distance']) * 100 | |
if confidence > best_model_confidence: | |
best_model_confidence = confidence | |
best_model_result = { | |
'name': name, | |
'user_id': face_data['user_id'], | |
'confidence': confidence, | |
'model_used': model, | |
'distance': result['distance'] | |
} | |
except Exception as e: | |
logger.debug(f"Model {model} failed for {name}: {e}") | |
continue | |
# Check if this is the best match overall | |
if best_model_result and best_model_confidence > highest_confidence and best_model_confidence > self.confidence_threshold: | |
highest_confidence = best_model_confidence | |
best_match = best_model_result | |
# Clean up temp file | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
return best_match, highest_confidence | |
except Exception as e: | |
logger.error(f"Advanced face recognition error: {e}") | |
return None, 0 | |
def recognize_face(self, frame: np.ndarray) -> Tuple[Optional[Dict], float]: | |
"""Main face recognition method (backward compatibility)""" | |
return self.recognize_face_advanced(frame, use_multiple_models=False) | |
def add_known_face(self, name: str, image_path: str) -> bool: | |
"""Add a new known face""" | |
try: | |
if not os.path.exists(image_path): | |
logger.error(f"Image file not found: {image_path}") | |
return False | |
if not self._validate_image(image_path): | |
logger.error(f"Invalid image file: {image_path}") | |
return False | |
# Add to database (assuming it's already added) | |
# Just update our known_faces dictionary | |
self.load_known_faces() | |
return name in self.known_faces | |
except Exception as e: | |
logger.error(f"Error adding known face: {e}") | |
return False | |
def update_model_settings(self, model_name: str = None, threshold: float = None, confidence_threshold: float = None): | |
"""Update recognition settings""" | |
if model_name and model_name in self.models: | |
self.current_model = model_name | |
logger.info(f"Model changed to: {model_name}") | |
if threshold is not None: | |
self.recognition_threshold = threshold | |
logger.info(f"Recognition threshold changed to: {threshold}") | |
if confidence_threshold is not None: | |
self.confidence_threshold = confidence_threshold | |
logger.info(f"Confidence threshold changed to: {confidence_threshold}") | |
def get_recognition_stats(self) -> Dict: | |
"""Get recognition system statistics""" | |
return { | |
'total_known_faces': len(self.known_faces), | |
'current_model': self.current_model, | |
'available_models': self.models, | |
'recognition_threshold': self.recognition_threshold, | |
'confidence_threshold': self.confidence_threshold, | |
'known_faces_dir': self.known_faces_dir | |
} | |
# Utility functions for standalone usage | |
def recognize_from_webcam(recognizer: EnhancedFaceRecognizer, camera_index: int = 1): # changed camera index to 1 | |
"""Recognize faces from webcam feed""" | |
cap = cv2.VideoCapture(camera_index) | |
if not cap.isOpened(): | |
logger.error("Could not open webcam") | |
return | |
logger.info("Starting webcam recognition. Press 'q' to quit.") | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Recognize face | |
result, confidence = recognizer.recognize_face_advanced(frame) | |
# Draw results on frame | |
if result: | |
# Draw bounding box and name | |
faces = recognizer.detect_faces(frame) | |
if faces: | |
for (x, y, w, h) in faces: | |
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2) | |
label = f"{result['name']} ({confidence:.1f}%)" | |
cv2.putText(frame, label, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) | |
# Show frame | |
cv2.imshow('Face Recognition', frame) | |
# Break on 'q' key | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
cap.release() | |
cv2.destroyAllWindows() | |
if __name__ == "__main__": | |
# Test the recognizer | |
recognizer = EnhancedFaceRecognizer() | |
print("Enhanced Face Recognizer Test") | |
print(f"Stats: {recognizer.get_recognition_stats()}") | |
# Uncomment to test with webcam | |
# recognize_from_webcam(recognizer) |