Spaces:
Runtime error
Runtime error
Testimony Adekoya
commited on
Commit
Β·
f37553c
1
Parent(s):
5030574
Abeg work
Browse files- config.yaml +22 -12
- pages/1_Live_Detection.py +18 -14
- src/alerting/alert_system.py +33 -83
- src/detection/strategies/geometric.py +75 -96
config.yaml
CHANGED
@@ -1,5 +1,7 @@
|
|
1 |
# config.yaml
|
|
|
2 |
# Main configuration file for the Drive Paddy application.
|
|
|
3 |
|
4 |
# -- Detection Strategy --
|
5 |
# Sets the active drowsiness detection method.
|
@@ -18,31 +20,39 @@ geometric_settings:
|
|
18 |
yawn_consec_frames: 20
|
19 |
|
20 |
# Head Pose Estimation for look-away/nod-off detection
|
21 |
-
head_nod_thresh: 15.0
|
22 |
-
head_look_away_thresh: 20.0
|
23 |
head_pose_consec_frames: 20
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
24 |
|
25 |
-
# -- CNN Model Settings --
|
26 |
cnn_model_settings:
|
27 |
model_path: "models/best_model_efficientnet_b7.pth"
|
28 |
confidence_thresh: 0.8
|
29 |
|
30 |
-
# -- Hybrid Strategy Settings --
|
31 |
-
# Defines weights for combining signals into a single drowsiness score.
|
32 |
-
# The system triggers an alert if the total score exceeds 'alert_threshold'.
|
33 |
hybrid_settings:
|
34 |
alert_threshold: 1.0
|
35 |
weights:
|
36 |
-
eye_closure: 0.45
|
37 |
-
yawning: 0.30
|
38 |
-
head_nod: 0.55
|
39 |
-
looking_away: 0.25
|
40 |
-
cnn_prediction: 0.60 # Weight for the deep learning model's output
|
41 |
|
42 |
# -- Alerting System --
|
43 |
alerting:
|
44 |
alert_sound_path: "assets/alert.wav"
|
45 |
-
alert_cooldown_seconds:
|
46 |
|
47 |
# -- Gemini API (Optional) --
|
48 |
gemini_api:
|
|
|
1 |
# config.yaml
|
2 |
+
# -----------------------------------------------------------------------------
|
3 |
# Main configuration file for the Drive Paddy application.
|
4 |
+
# -----------------------------------------------------------------------------
|
5 |
|
6 |
# -- Detection Strategy --
|
7 |
# Sets the active drowsiness detection method.
|
|
|
20 |
yawn_consec_frames: 20
|
21 |
|
22 |
# Head Pose Estimation for look-away/nod-off detection
|
23 |
+
head_nod_thresh: 15.0
|
24 |
+
head_look_away_thresh: 20.0
|
25 |
head_pose_consec_frames: 20
|
26 |
+
|
27 |
+
# Low Light Detection
|
28 |
+
low_light_thresh: 70 # Average frame brightness below which a warning is shown (0-255).
|
29 |
+
|
30 |
+
# Drowsiness Level Scoring
|
31 |
+
# The system will sum the weights of active indicators (eyes, mouth, head).
|
32 |
+
drowsiness_levels:
|
33 |
+
very_drowsy_threshold: 0.8 # e.g., Eyes + Head Nod (0.45 + 0.55 = 1.0)
|
34 |
+
slightly_drowsy_threshold: 0.4 # e.g., Just Yawning (0.30) or Eyes Closed (0.45)
|
35 |
+
indicator_weights:
|
36 |
+
eye_closure: 0.45
|
37 |
+
yawning: 0.30
|
38 |
+
head_nod: 0.55
|
39 |
+
looking_away: 0.25
|
40 |
|
41 |
+
# -- CNN Model Settings (Not used in 'geometric' mode) --
|
42 |
cnn_model_settings:
|
43 |
model_path: "models/best_model_efficientnet_b7.pth"
|
44 |
confidence_thresh: 0.8
|
45 |
|
46 |
+
# -- Hybrid Strategy Settings (Not used in 'geometric' mode) --
|
|
|
|
|
47 |
hybrid_settings:
|
48 |
alert_threshold: 1.0
|
49 |
weights:
|
50 |
+
eye_closure: 0.45; yawning: 0.30; head_nod: 0.55; looking_away: 0.25; cnn_prediction: 0.60
|
|
|
|
|
|
|
|
|
51 |
|
52 |
# -- Alerting System --
|
53 |
alerting:
|
54 |
alert_sound_path: "assets/alert.wav"
|
55 |
+
alert_cooldown_seconds: 7 # Increased cooldown to prevent alert fatigue
|
56 |
|
57 |
# -- Gemini API (Optional) --
|
58 |
gemini_api:
|
pages/1_Live_Detection.py
CHANGED
@@ -16,8 +16,8 @@ if "status_queue" not in st.session_state:
|
|
16 |
st.session_state.status_queue = queue.Queue()
|
17 |
if "audio_queue" not in st.session_state:
|
18 |
st.session_state.audio_queue = queue.Queue()
|
19 |
-
if "last_status" not in st.session_state:
|
20 |
-
st.session_state.last_status = {"
|
21 |
|
22 |
|
23 |
# --- Load Configuration and Environment Variables ---
|
@@ -73,6 +73,8 @@ class VideoProcessor(VideoProcessorBase):
|
|
73 |
processed_frame, indicators, _ = self._detector.process_frame(img)
|
74 |
alert_triggered = any(v for k, v in indicators.items() if k not in ['low_light', 'details'])
|
75 |
self.status_queue.put(indicators if alert_triggered or indicators.get('low_light') else {"status": "Awake"})
|
|
|
|
|
76 |
elif strategy == 'cnn_model':
|
77 |
# The cnn_model processor returns frame and indicators.
|
78 |
processed_frame, indicators = self._detector.process_frame(img)
|
@@ -99,7 +101,6 @@ st.info("Press 'START' to activate your camera and begin monitoring.")
|
|
99 |
# --- Dynamically Build RTC Configuration ---
|
100 |
ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}]
|
101 |
if secrets["turn_username"] and secrets["turn_credential"]:
|
102 |
-
print("TURN credentials found, adding TURN servers to config.")
|
103 |
turn_servers = [
|
104 |
{'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]},
|
105 |
{'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}
|
@@ -108,7 +109,6 @@ if secrets["turn_username"] and secrets["turn_credential"]:
|
|
108 |
|
109 |
RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers})
|
110 |
|
111 |
-
|
112 |
col1, col2 = st.columns([3, 1])
|
113 |
|
114 |
with col1:
|
@@ -144,16 +144,18 @@ if webrtc_ctx.state.playing:
|
|
144 |
|
145 |
with status_placeholder.container():
|
146 |
last_status = st.session_state.last_status
|
147 |
-
|
148 |
-
|
149 |
-
|
150 |
-
|
151 |
-
|
152 |
-
|
153 |
-
|
154 |
-
|
155 |
-
|
156 |
-
|
|
|
|
|
157 |
try:
|
158 |
audio_data = st.session_state.audio_queue.get(timeout=0.1)
|
159 |
with audio_placeholder.container():
|
@@ -166,3 +168,5 @@ if webrtc_ctx.state.playing:
|
|
166 |
else:
|
167 |
with status_placeholder.container():
|
168 |
st.info("βοΈ Driver is Awake")
|
|
|
|
|
|
16 |
st.session_state.status_queue = queue.Queue()
|
17 |
if "audio_queue" not in st.session_state:
|
18 |
st.session_state.audio_queue = queue.Queue()
|
19 |
+
if "last_status" not in st.session_state:
|
20 |
+
st.session_state.last_status = {"drowsiness_level": "Awake", "lighting": "Good"}
|
21 |
|
22 |
|
23 |
# --- Load Configuration and Environment Variables ---
|
|
|
73 |
processed_frame, indicators, _ = self._detector.process_frame(img)
|
74 |
alert_triggered = any(v for k, v in indicators.items() if k not in ['low_light', 'details'])
|
75 |
self.status_queue.put(indicators if alert_triggered or indicators.get('low_light') else {"status": "Awake"})
|
76 |
+
print(f"Indicators: {indicators}") # Debugging line
|
77 |
+
print(self.status_queue) # Debugging line
|
78 |
elif strategy == 'cnn_model':
|
79 |
# The cnn_model processor returns frame and indicators.
|
80 |
processed_frame, indicators = self._detector.process_frame(img)
|
|
|
101 |
# --- Dynamically Build RTC Configuration ---
|
102 |
ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}]
|
103 |
if secrets["turn_username"] and secrets["turn_credential"]:
|
|
|
104 |
turn_servers = [
|
105 |
{'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]},
|
106 |
{'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}
|
|
|
109 |
|
110 |
RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers})
|
111 |
|
|
|
112 |
col1, col2 = st.columns([3, 1])
|
113 |
|
114 |
with col1:
|
|
|
144 |
|
145 |
with status_placeholder.container():
|
146 |
last_status = st.session_state.last_status
|
147 |
+
drowsiness_level = last_status.get("drowsiness_level", "Awake")
|
148 |
+
lighting = last_status.get("lighting", "Good")
|
149 |
+
score = last_status.get("details", {}).get("Score", 0)
|
150 |
+
|
151 |
+
st.metric(label="Lighting Condition", value=lighting)
|
152 |
+
|
153 |
+
if lighting == "Low": st.warning("Detection paused due to low light.")
|
154 |
+
|
155 |
+
if drowsiness_level == "Awake": st.info(f"βοΈ Awake (Score: {score:.2f})")
|
156 |
+
elif drowsiness_level == "Slightly Drowsy": st.warning(f"β οΈ Slightly Drowsy (Score: {score:.2f})")
|
157 |
+
elif drowsiness_level == "Very Drowsy": st.error(f"π¨ Very Drowsy! (Score: {score:.2f})")
|
158 |
+
|
159 |
try:
|
160 |
audio_data = st.session_state.audio_queue.get(timeout=0.1)
|
161 |
with audio_placeholder.container():
|
|
|
168 |
else:
|
169 |
with status_placeholder.container():
|
170 |
st.info("βοΈ Driver is Awake")
|
171 |
+
|
172 |
+
|
src/alerting/alert_system.py
CHANGED
@@ -1,110 +1,60 @@
|
|
1 |
# drive_paddy/alerting/alert_system.py
|
2 |
-
import time
|
3 |
-
import os
|
4 |
-
import io
|
5 |
from gtts import gTTS
|
6 |
-
|
7 |
-
|
8 |
|
9 |
-
load_dotenv() # Load environment variables from .env file
|
10 |
-
|
11 |
-
api_key = os.getenv("GEMINI_API_KEY")
|
12 |
class BaseAlerter:
|
13 |
-
"""Base class for alerter systems."""
|
14 |
def __init__(self, config):
|
15 |
self.config = config['alerting']
|
16 |
self.cooldown = self.config['alert_cooldown_seconds']
|
17 |
self.last_alert_time = 0
|
18 |
self.alert_on = False
|
19 |
-
|
20 |
-
def trigger_alert(self):
|
21 |
-
raise NotImplementedError
|
22 |
-
|
23 |
def reset_alert(self):
|
24 |
-
if self.alert_on:
|
25 |
-
print("Resetting Alert.")
|
26 |
-
self.alert_on = False
|
27 |
|
28 |
class FileAlertSystem(BaseAlerter):
|
29 |
-
"""Loads a static audio file from disk into memory."""
|
30 |
def __init__(self, config):
|
31 |
super().__init__(config)
|
32 |
-
self.sound_path = self.config['alert_sound_path']
|
33 |
self.audio_bytes = None
|
34 |
try:
|
35 |
-
if os.path.exists(
|
36 |
-
with open(
|
37 |
-
|
38 |
-
|
39 |
-
print(f"Warning: Alert sound file not found at '{self.sound_path}'.")
|
40 |
-
except Exception as e:
|
41 |
-
print(f"Warning: Could not load audio file. Error: {e}.")
|
42 |
-
|
43 |
-
def trigger_alert(self):
|
44 |
current_time = time.time()
|
45 |
-
if (current_time - self.last_alert_time) > self.cooldown:
|
46 |
-
|
47 |
-
|
48 |
-
|
49 |
-
self.alert_on = True
|
50 |
-
return self.audio_bytes # Return the audio data
|
51 |
return None
|
52 |
|
53 |
-
|
54 |
class GeminiAlertSystem(BaseAlerter):
|
55 |
-
"""Generates dynamic audio data using Gemini and gTTS."""
|
56 |
def __init__(self, config, api_key):
|
57 |
super().__init__(config)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
58 |
try:
|
59 |
-
|
60 |
-
|
61 |
-
print("
|
62 |
-
|
63 |
-
|
64 |
-
|
65 |
-
|
66 |
-
def _generate_audio_data(self):
|
67 |
-
"""Generates a unique alert message and returns it as audio bytes."""
|
68 |
-
if not self.model:
|
69 |
-
alert_text = "Stay alert!"
|
70 |
-
else:
|
71 |
-
prompt = "You are an AI driving assistant. Generate a short, friendly, but firm audio alert (under 10 words) for a driver showing signs of drowsiness."
|
72 |
-
try:
|
73 |
-
response = self.model.generate_content(prompt)
|
74 |
-
alert_text = response.text.strip().replace('*', '')
|
75 |
-
except Exception as e:
|
76 |
-
print(f"Error generating alert text with Gemini: {e}")
|
77 |
-
alert_text = "Wake up please!"
|
78 |
-
|
79 |
-
print(f"Generated Alert Text: '{alert_text}'")
|
80 |
-
try:
|
81 |
-
# Generate TTS audio in memory
|
82 |
-
mp3_fp = io.BytesIO()
|
83 |
-
tts = gTTS(text=alert_text, lang='en')
|
84 |
-
tts.write_to_fp(mp3_fp)
|
85 |
-
mp3_fp.seek(0)
|
86 |
-
return mp3_fp.getvalue()
|
87 |
-
except Exception as e:
|
88 |
-
print(f"Error generating TTS audio: {e}")
|
89 |
-
return None
|
90 |
-
|
91 |
-
def trigger_alert(self):
|
92 |
current_time = time.time()
|
93 |
-
if (current_time - self.last_alert_time) > self.cooldown:
|
94 |
-
|
95 |
-
|
96 |
-
self.alert_on = True
|
97 |
-
return self._generate_audio_data() # Return the audio data
|
98 |
return None
|
99 |
|
100 |
-
|
101 |
def get_alerter(config, api_key=None):
|
102 |
-
|
103 |
-
|
104 |
-
|
105 |
-
if use_gemini and api_key:
|
106 |
-
print("Initializing Gemini Alert System.")
|
107 |
-
return GeminiAlertSystem(config, api_key)
|
108 |
-
else:
|
109 |
-
print("Initializing standard File Alert System.")
|
110 |
-
return FileAlertSystem(config)
|
|
|
1 |
# drive_paddy/alerting/alert_system.py
|
2 |
+
import time, os, io, google.generativeai as genai
|
|
|
|
|
3 |
from gtts import gTTS
|
4 |
+
from pydub import AudioSegment
|
5 |
+
import simpleaudio as sa # Keep for fallback
|
6 |
|
|
|
|
|
|
|
7 |
class BaseAlerter:
|
|
|
8 |
def __init__(self, config):
|
9 |
self.config = config['alerting']
|
10 |
self.cooldown = self.config['alert_cooldown_seconds']
|
11 |
self.last_alert_time = 0
|
12 |
self.alert_on = False
|
13 |
+
def trigger_alert(self, level="Very Drowsy"): raise NotImplementedError
|
|
|
|
|
|
|
14 |
def reset_alert(self):
|
15 |
+
if self.alert_on: print("Resetting Alert."); self.alert_on = False
|
|
|
|
|
16 |
|
17 |
class FileAlertSystem(BaseAlerter):
|
|
|
18 |
def __init__(self, config):
|
19 |
super().__init__(config)
|
|
|
20 |
self.audio_bytes = None
|
21 |
try:
|
22 |
+
if os.path.exists(config['alerting']['alert_sound_path']):
|
23 |
+
with open(config['alerting']['alert_sound_path'], "rb") as f: self.audio_bytes = f.read()
|
24 |
+
except Exception as e: print(f"Warning: Could not load audio file. Error: {e}.")
|
25 |
+
def trigger_alert(self, level="Very Drowsy"):
|
|
|
|
|
|
|
|
|
|
|
26 |
current_time = time.time()
|
27 |
+
if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.audio_bytes:
|
28 |
+
self.last_alert_time = current_time; self.alert_on = True
|
29 |
+
print("Triggering Static Alert!")
|
30 |
+
return self.audio_bytes
|
|
|
|
|
31 |
return None
|
32 |
|
|
|
33 |
class GeminiAlertSystem(BaseAlerter):
|
|
|
34 |
def __init__(self, config, api_key):
|
35 |
super().__init__(config)
|
36 |
+
try: genai.configure(api_key=api_key); self.model = genai.GenerativeModel('gemini-pro')
|
37 |
+
except Exception as e: print(f"Error initializing Gemini: {e}."); self.model = None
|
38 |
+
def _generate_audio_data(self, level):
|
39 |
+
if not self.model: return None
|
40 |
+
if level == "Slightly Drowsy":
|
41 |
+
prompt = "You are an AI driving assistant. Generate a short, gentle reminder (under 10 words) for a driver showing minor signs of fatigue."
|
42 |
+
else: # Very Drowsy
|
43 |
+
prompt = "You are an AI driving assistant. Generate a short, firm, and urgent alert (under 10 words) for a driver who is very drowsy."
|
44 |
try:
|
45 |
+
response = self.model.generate_content(prompt)
|
46 |
+
alert_text = response.text.strip().replace('*', '')
|
47 |
+
print(f"Generated Alert Text ({level}): '{alert_text}'")
|
48 |
+
mp3_fp = io.BytesIO(); tts = gTTS(text=alert_text, lang='en'); tts.write_to_fp(mp3_fp)
|
49 |
+
mp3_fp.seek(0); return mp3_fp.getvalue()
|
50 |
+
except Exception as e: print(f"Error generating TTS audio: {e}"); return None
|
51 |
+
def trigger_alert(self, level="Very Drowsy"):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
52 |
current_time = time.time()
|
53 |
+
if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.model:
|
54 |
+
self.last_alert_time = current_time; self.alert_on = True
|
55 |
+
return self._generate_audio_data(level)
|
|
|
|
|
56 |
return None
|
57 |
|
|
|
58 |
def get_alerter(config, api_key=None):
|
59 |
+
if config.get('gemini_api', {}).get('enabled', False) and api_key: return GeminiAlertSystem(config, api_key)
|
60 |
+
return FileAlertSystem(config)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
src/detection/strategies/geometric.py
CHANGED
@@ -5,123 +5,102 @@ import numpy as np
|
|
5 |
import math
|
6 |
from ..base_processor import BaseProcessor
|
7 |
|
8 |
-
# --- Helper Functions ---
|
9 |
def calculate_ear(eye_landmarks, frame_shape):
|
10 |
-
"""Calculates the Eye Aspect Ratio (EAR)."""
|
11 |
-
# ... (implementation remains the same)
|
12 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
|
13 |
v1 = np.linalg.norm(coords[1] - coords[5]); v2 = np.linalg.norm(coords[2] - coords[4])
|
14 |
-
h1 = np.linalg.norm(coords[0] - coords[3])
|
15 |
-
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
|
16 |
|
17 |
def calculate_mar(mouth_landmarks, frame_shape):
|
18 |
-
"""Calculates the Mouth Aspect Ratio (MAR) for yawn detection."""
|
19 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
|
20 |
-
v1 = np.linalg.norm(coords[1] - coords[7])
|
21 |
-
|
22 |
-
v3 = np.linalg.norm(coords[3] - coords[5])
|
23 |
-
h1 = np.linalg.norm(coords[0] - coords[4]) # Horizontal distance
|
24 |
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
|
25 |
|
26 |
class GeometricProcessor(BaseProcessor):
|
27 |
-
"""
|
28 |
-
Drowsiness detection using a combination of facial landmarks:
|
29 |
-
- Eye Aspect Ratio (EAR) for eye closure.
|
30 |
-
- Mouth Aspect Ratio (MAR) for yawning.
|
31 |
-
- Head Pose Estimation for nodding off or looking away.
|
32 |
-
"""
|
33 |
def __init__(self, config):
|
34 |
self.settings = config['geometric_settings']
|
35 |
-
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
|
36 |
-
|
37 |
-
|
38 |
-
|
39 |
-
# State counters
|
40 |
-
self.counters = {
|
41 |
-
"eye_closure": 0, "yawning": 0,
|
42 |
-
"head_nod": 0, "looking_away": 0
|
43 |
-
}
|
44 |
-
|
45 |
-
# Landmark indices
|
46 |
-
self.L_EYE = [362, 385, 387, 263, 373, 380]
|
47 |
-
self.R_EYE = [33, 160, 158, 133, 153, 144]
|
48 |
self.MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
|
49 |
|
50 |
def process_frame(self, frame):
|
51 |
-
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
52 |
h, w, _ = frame.shape
|
53 |
-
|
54 |
-
|
|
|
|
|
55 |
drowsiness_indicators = {
|
56 |
-
"
|
57 |
-
"head_nod": False, "looking_away": False, "details": {}
|
58 |
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
59 |
|
60 |
-
|
61 |
-
|
62 |
-
|
63 |
-
|
64 |
-
left_ear = calculate_ear([landmarks[i] for i in self.L_EYE], (h, w))
|
65 |
-
right_ear = calculate_ear([landmarks[i] for i in self.R_EYE], (h, w))
|
66 |
-
ear = (left_ear + right_ear) / 2.0
|
67 |
-
if ear < self.settings['eye_ar_thresh']:
|
68 |
-
self.counters['eye_closure'] += 1
|
69 |
-
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']:
|
70 |
-
drowsiness_indicators['eye_closure'] = True
|
71 |
-
else:
|
72 |
-
self.counters['eye_closure'] = 0
|
73 |
-
drowsiness_indicators['details']['EAR'] = ear
|
74 |
|
75 |
-
|
76 |
-
|
77 |
-
|
78 |
-
self.counters['
|
79 |
-
if self.counters['
|
80 |
-
drowsiness_indicators['yawning'] = True
|
81 |
-
else:
|
82 |
-
self.counters['yawning'] = 0
|
83 |
-
drowsiness_indicators['details']['MAR'] = mar
|
84 |
|
85 |
-
|
86 |
-
|
87 |
-
|
88 |
-
|
89 |
-
|
90 |
-
[225.0, 170.0, -135.0], # Right eye right corner
|
91 |
-
[-150.0, -150.0, -125.0], # Left Mouth corner
|
92 |
-
[150.0, -150.0, -125.0] # Right mouth corner
|
93 |
-
], dtype=np.float64)
|
94 |
-
face_2d = np.array([
|
95 |
-
(landmarks[1].x * w, landmarks[1].y * h), # Nose tip
|
96 |
-
(landmarks[152].x * w, landmarks[152].y * h), # Chin
|
97 |
-
(landmarks[263].x * w, landmarks[263].y * h), # Left eye corner
|
98 |
-
(landmarks[33].x * w, landmarks[33].y * h), # Right eye corner
|
99 |
-
(landmarks[287].x * w, landmarks[287].y * h), # Left mouth corner
|
100 |
-
(landmarks[57].x * w, landmarks[57].y * h) # Right mouth corner
|
101 |
-
], dtype=np.float64)
|
102 |
|
103 |
-
|
104 |
-
|
105 |
-
|
106 |
-
|
107 |
-
|
108 |
-
|
109 |
-
|
110 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
111 |
|
112 |
-
|
113 |
-
|
114 |
-
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']:
|
115 |
-
drowsiness_indicators['head_nod'] = True
|
116 |
-
else:
|
117 |
-
self.counters['head_nod'] = 0
|
118 |
|
119 |
-
|
120 |
-
|
121 |
-
|
122 |
-
drowsiness_indicators['looking_away'] = True
|
123 |
-
else:
|
124 |
-
self.counters['looking_away'] = 0
|
125 |
|
126 |
-
|
127 |
-
return frame, drowsiness_indicators
|
|
|
5 |
import math
|
6 |
from ..base_processor import BaseProcessor
|
7 |
|
8 |
+
# --- Helper Functions (No changes here) ---
|
9 |
def calculate_ear(eye_landmarks, frame_shape):
|
|
|
|
|
10 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
|
11 |
v1 = np.linalg.norm(coords[1] - coords[5]); v2 = np.linalg.norm(coords[2] - coords[4])
|
12 |
+
h1 = np.linalg.norm(coords[0] - coords[3]); return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
|
|
|
13 |
|
14 |
def calculate_mar(mouth_landmarks, frame_shape):
|
|
|
15 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
|
16 |
+
v1 = np.linalg.norm(coords[1] - coords[7]); v2 = np.linalg.norm(coords[2] - coords[6])
|
17 |
+
v3 = np.linalg.norm(coords[3] - coords[5]); h1 = np.linalg.norm(coords[0] - coords[4])
|
|
|
|
|
18 |
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
|
19 |
|
20 |
class GeometricProcessor(BaseProcessor):
|
|
|
|
|
|
|
|
|
|
|
|
|
21 |
def __init__(self, config):
|
22 |
self.settings = config['geometric_settings']
|
23 |
+
self.face_mesh = mp.solutions.face_mesh.FaceMesh(max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5)
|
24 |
+
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
|
25 |
+
self.L_EYE = [362, 385, 387, 263, 373, 380]; self.R_EYE = [33, 160, 158, 133, 153, 144]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
26 |
self.MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
|
27 |
|
28 |
def process_frame(self, frame):
|
|
|
29 |
h, w, _ = frame.shape
|
30 |
+
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
31 |
+
brightness = np.mean(gray)
|
32 |
+
is_low_light = brightness < self.settings['low_light_thresh']
|
33 |
+
|
34 |
drowsiness_indicators = {
|
35 |
+
"drowsiness_level": "Awake", "lighting": "Good", "details": {}
|
|
|
36 |
}
|
37 |
+
face_landmarks = None
|
38 |
+
|
39 |
+
if is_low_light:
|
40 |
+
drowsiness_indicators["lighting"] = "Low"
|
41 |
+
else:
|
42 |
+
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
43 |
+
results = self.face_mesh.process(img_rgb)
|
44 |
+
face_landmarks = results.multi_face_landmarks
|
45 |
|
46 |
+
if face_landmarks:
|
47 |
+
landmarks = face_landmarks[0].landmark
|
48 |
+
score = 0
|
49 |
+
weights = self.settings['indicator_weights']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
50 |
|
51 |
+
# Eye Closure
|
52 |
+
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
|
53 |
+
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
|
54 |
+
else: self.counters['eye_closure']=0
|
55 |
+
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
|
|
|
|
|
|
|
|
|
56 |
|
57 |
+
# Yawning
|
58 |
+
mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
|
59 |
+
if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
|
60 |
+
else: self.counters['yawning']=0
|
61 |
+
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
62 |
|
63 |
+
# Head Pose
|
64 |
+
face_3d = np.array([[0.0,0.0,0.0], [0.0,-330.0,-65.0], [-225.0,170.0,-135.0], [225.0,170.0,-135.0], [-150.0,-150.0,-125.0], [150.0,-150.0,-125.0]], dtype=np.float64)
|
65 |
+
face_2d = np.array([(landmarks[1].x*w, landmarks[1].y*h), (landmarks[152].x*w, landmarks[152].y*h), (landmarks[263].x*w, landmarks[263].y*h), (landmarks[33].x*w, landmarks[33].y*h), (landmarks[287].x*w, landmarks[287].y*h), (landmarks[57].x*w, landmarks[57].y*h)], dtype=np.float64)
|
66 |
+
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]], dtype=np.float64)
|
67 |
+
_, rot_vec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, np.zeros((4,1),dtype=np.float64))
|
68 |
+
rmat, _ = cv2.Rodrigues(rot_vec); angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
|
69 |
+
pitch, yaw = angles[0], angles[1]
|
70 |
+
|
71 |
+
if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
|
72 |
+
else: self.counters['head_nod']=0
|
73 |
+
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
|
74 |
+
|
75 |
+
if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
|
76 |
+
else: self.counters['looking_away']=0
|
77 |
+
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
|
78 |
+
|
79 |
+
# Determine Drowsiness Level
|
80 |
+
levels = self.settings['drowsiness_levels']
|
81 |
+
if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
|
82 |
+
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
|
83 |
+
|
84 |
+
drowsiness_indicators['details']['Score'] = score
|
85 |
+
|
86 |
+
# --- Visualization on Video Frame ---
|
87 |
+
level = drowsiness_indicators['drowsiness_level']
|
88 |
+
score_val = drowsiness_indicators.get("details", {}).get("Score", 0)
|
89 |
+
color = (0, 255, 0) # Green for Awake
|
90 |
+
|
91 |
+
if drowsiness_indicators['lighting'] == "Low":
|
92 |
+
color = (0, 165, 255) # Orange for low light
|
93 |
+
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
|
94 |
+
elif level == "Slightly Drowsy":
|
95 |
+
color = (0, 255, 255) # Yellow
|
96 |
+
elif level == "Very Drowsy":
|
97 |
+
color = (0, 0, 255) # Red
|
98 |
|
99 |
+
# Draw a colored border around the frame
|
100 |
+
cv2.rectangle(frame, (0, 0), (w, h), color, 10)
|
|
|
|
|
|
|
|
|
101 |
|
102 |
+
# Display status text
|
103 |
+
status_text = f"Status: {level} (Score: {score_val:.2f})"
|
104 |
+
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
|
|
|
|
|
|
|
105 |
|
106 |
+
return frame, drowsiness_indicators, face_landmarks
|
|