File size: 8,817 Bytes
19f420a
 
 
 
 
3cfc6a1
19f420a
f6146b6
19f420a
 
f6146b6
 
 
 
19f420a
 
 
f6146b6
 
 
 
19f420a
 
 
3cfc6a1
 
 
 
19f420a
 
f6146b6
 
3cfc6a1
f6146b6
 
 
3cfc6a1
 
 
f6146b6
 
 
 
 
 
 
f37553c
19f420a
3cfc6a1
 
 
 
 
19f420a
f6146b6
 
3cfc6a1
 
 
 
87a985c
3cfc6a1
 
 
f6146b6
 
 
209de52
f6146b6
 
3cfc6a1
f6146b6
 
 
 
 
f37553c
 
 
f6146b6
f37553c
 
929f736
f6146b6
 
f37553c
 
19f420a
f37553c
 
 
 
19f420a
f6146b6
f37553c
 
 
 
19f420a
f37553c
 
 
 
19f420a
f6146b6
3cfc6a1
 
 
22fab0d
3cfc6a1
 
f37553c
 
3cfc6a1
 
 
 
 
 
f37553c
3cfc6a1
 
 
 
 
 
929f736
f37553c
3cfc6a1
 
f37553c
26440c6
f37553c
3cfc6a1
f6146b6
929f736
f37553c
f6146b6
 
 
f37553c
f6146b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f37553c
f6146b6
 
 
 
 
 
 
 
 
 
 
 
19f420a
f37553c
 
 
19f420a
f6146b6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# drive_paddy/detection/strategies/geometric.py
import cv2
import mediapipe as mp
import numpy as np
import math
from src.detection.base_processor import BaseProcessor

# --- Helper Functions (Unchanged) ---
def calculate_ear(eye_landmarks, frame_shape):
    coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
    v1 = np.linalg.norm(coords[1] - coords[5])
    v2 = np.linalg.norm(coords[2] - coords[4])
    h1 = np.linalg.norm(coords[0] - coords[3])
    return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0

def calculate_mar(mouth_landmarks, frame_shape):
    coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
    v1 = np.linalg.norm(coords[1] - coords[7])
    v2 = np.linalg.norm(coords[2] - coords[6])
    v3 = np.linalg.norm(coords[3] - coords[5])
    h1 = np.linalg.norm(coords[0] - coords[4])
    return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0

class GeometricProcessor(BaseProcessor):
    L_EYE = [362, 385, 387, 263, 373, 380]
    R_EYE = [33, 160, 158, 133, 153, 144]
    MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]

    def __init__(self, config):
        self.settings = config['geometric_settings']
        self.face_mesh = mp.solutions.face_mesh.FaceMesh(
            max_num_faces=1,
            refine_landmarks=False,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5)

        self.downscale_factor = self.settings.get('downscale_factor', 0.35) 
        self.default_skip = max(1, self.settings.get("skip_frames", 2))

        
        # State variables for frame skipping
        self.frame_counter = 0
        self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
        self.last_landmarks = None

        # Landmark and counter definitions
        self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }

        # Pre-allocated buffers
        self.zeros_4x1 = np.zeros((4, 1), np.float32)
        self.landmark_ids = self.L_EYE + self.R_EYE + self.MOUTH

        
    def process_frame(self, frame):
        self.frame_counter += 1
        
        # adaptive skipping: fewer analyses when driver is awake
        last_level = self.last_indicators.get("drowsiness_level", "Awake")
        skip_n = 1 if last_level != "Awake" else self.default_skip

        if self.frame_counter % skip_n != 0:
            drawn = self.draw_visuals(frame.copy(), self.last_indicators, self.last_landmarks)
            return drawn, self.last_indicators, self.last_landmarks

        # --- FRAME PROCESSING ---
        original_frame = frame.copy()
        h_orig, w_orig, _ = original_frame.shape

        # --- OPTIMIZATION 1: DOWNSCALING ---
        # Create a small frame for fast processing
        small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
        h, w, _ = small_frame.shape
        
        # --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
        # All processing should be done on the `small_frame` to gain the speed advantage.
        gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
        brightness = np.mean(gray)
        is_low_light = brightness < self.settings['low_light_thresh']

        drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
        face_landmarks = None

        if not is_low_light:
            # Convert the SMALL frame to RGB for MediaPipe
            img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
            results = self.face_mesh.process(img_rgb)
            face_landmarks = results.multi_face_landmarks

            if face_landmarks:
                landmarks = face_landmarks[0].landmark
                score = 0
                weights = self.settings['indicator_weights']

                # --- Drowsiness Calculations (these correctly use the small frame's 'h' and 'w') ---
                ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
                if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
                else: self.counters['eye_closure']=0
                if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
                
                mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
                if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
                else: self.counters['yawning']=0
                if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']

                # --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
                face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32)
                face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float32)
                cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32)
                _, rvec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
                rmat, _ = cv2.Rodrigues(rvec)
                angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
                pitch, yaw = angles[0], angles[1]

                if pitch > self.settings['head_nod_thresh']: 
                    self.counters['head_nod']+=1
                else: 
                    self.counters['head_nod']=0
                if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: 
                    score += weights['head_nod']

                if abs(yaw) > self.settings['head_look_away_thresh']: 
                    self.counters['looking_away']+=1
                else: 
                    self.counters['looking_away']=0
                if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: 
                    score += weights['looking_away']
                
                levels = self.settings['drowsiness_levels']
                if score >= levels['very_drowsy_threshold']: 
                    drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
                elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
                
                drowsiness_indicators['details']['Score'] = score

        else:
            drowsiness_indicators["lighting"] = "Low"
        
        # --- Update state for skipped frames ---
        self.last_indicators = drowsiness_indicators
        self.last_landmarks = face_landmarks
        
        # --- Draw visuals on the ORIGINAL frame for a high-quality output ---
        processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks)

        return processed_frame, drowsiness_indicators, face_landmarks

    def draw_visuals(self, frame, indicators, landmarks_data=None):
        """Helper function to draw all visualizations on the frame."""
        # --- FIX 2: USE THE DIMENSIONS OF THE FRAME BEING DRAWN ON ---
        # We get the height and width from the input 'frame' itself, which is the original, full-sized one.
        h, w, _ = frame.shape
        level = indicators['drowsiness_level']
        score_val = indicators.get("details", {}).get("Score", 0)
        color = (0, 255, 0) # Green for Awake

        if indicators['lighting'] == "Low":
            color = (0, 165, 255) # Orange
            cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
        elif level == "Slightly Drowsy": color = (0, 255, 255) # Yellow
        elif level == "Very Drowsy": color = (0, 0, 255) # Red

        # Draw landmarks if they were detected
        if landmarks_data:
            landmarks = landmarks_data[0].landmark
            eye_mouth_landmarks = self.L_EYE + self.R_EYE + self.MOUTH
            for idx in eye_mouth_landmarks:
                lm = landmarks[idx]
                # Scale landmark coordinates to the full-sized frame using its 'w' and 'h'.
                x, y = int(lm.x * w), int(lm.y * h)
                cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)

        cv2.rectangle(frame, (0, 0), (w, h), color, 10)
        status_text = f"Status: {level} (Score: {score_val:.2f})"
        cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        return frame