File size: 8,551 Bytes
19f420a
 
 
 
 
929f736
19f420a
f6146b6
19f420a
 
f6146b6
 
 
 
19f420a
 
 
f6146b6
 
 
 
19f420a
 
 
 
 
f6146b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f37553c
f6146b6
 
19f420a
 
 
f6146b6
 
 
 
 
 
 
 
 
 
 
 
209de52
f6146b6
 
 
 
 
 
 
 
f37553c
 
 
f6146b6
f37553c
 
929f736
f6146b6
 
f37553c
 
19f420a
f37553c
 
 
 
19f420a
f6146b6
f37553c
 
 
 
19f420a
f37553c
 
 
 
19f420a
f6146b6
929f736
 
 
f37553c
 
 
 
 
 
 
 
 
 
 
929f736
f37553c
 
 
26440c6
f37553c
f6146b6
929f736
f37553c
f6146b6
 
 
f37553c
f6146b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f37553c
f6146b6
 
 
 
 
 
 
 
 
 
 
 
19f420a
f37553c
 
 
19f420a
f6146b6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# drive_paddy/detection/strategies/geometric.py
import cv2
import mediapipe as mp
import numpy as np
import math
from ..base_processor import BaseProcessor

# --- Helper Functions (Unchanged) ---
def calculate_ear(eye_landmarks, frame_shape):
    coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
    v1 = np.linalg.norm(coords[1] - coords[5])
    v2 = np.linalg.norm(coords[2] - coords[4])
    h1 = np.linalg.norm(coords[0] - coords[3])
    return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0

def calculate_mar(mouth_landmarks, frame_shape):
    coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
    v1 = np.linalg.norm(coords[1] - coords[7])
    v2 = np.linalg.norm(coords[2] - coords[6])
    v3 = np.linalg.norm(coords[3] - coords[5])
    h1 = np.linalg.norm(coords[0] - coords[4])
    return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0

class GeometricProcessor(BaseProcessor):
    def __init__(self, config):
        self.settings = config['geometric_settings']
        self.face_mesh = mp.solutions.face_mesh.FaceMesh(
            max_num_faces=1,
            refine_landmarks=True,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5)

        self.downscale_factor = self.settings.get('downscale_factor', 0.5) 
        self.skip_frames = self.settings.get('skip_frames', 2)
        
        # State variables for frame skipping
        self.frame_counter = 0
        self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
        self.last_landmarks = None

        # Landmark and counter definitions
        self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
        self.L_EYE = [362, 385, 387, 263, 373, 380]
        self.R_EYE = [33, 160, 158, 133, 153, 144]
        self.MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]

    def process_frame(self, frame):
        self.frame_counter += 1
        
        # --- FRAME SKIPPING LOGIC ---
        # If we are skipping this frame, simply draw the last known status on the current frame and return.
        if self.frame_counter % self.skip_frames != 0:
            # We use the original, full-sized 'frame' for drawing to keep the video feed smooth.
            frame_with_last_results = self.draw_visuals(frame, self.last_indicators, self.last_landmarks)
            return frame_with_last_results, self.last_indicators, self.last_landmarks
            
        # --- FRAME PROCESSING ---
        original_frame = frame.copy()
        h_orig, w_orig, _ = original_frame.shape

        # --- OPTIMIZATION 1: DOWNSCALING ---
        # Create a small frame for fast processing
        small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor)
        h, w, _ = small_frame.shape
        
        # --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
        # All processing should be done on the `small_frame` to gain the speed advantage.
        gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
        brightness = np.mean(gray)
        is_low_light = brightness < self.settings['low_light_thresh']

        drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
        face_landmarks = None

        if not is_low_light:
            # Convert the SMALL frame to RGB for MediaPipe
            img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
            results = self.face_mesh.process(img_rgb)
            face_landmarks = results.multi_face_landmarks

            if face_landmarks:
                landmarks = face_landmarks[0].landmark
                score = 0
                weights = self.settings['indicator_weights']

                # --- Drowsiness Calculations (these correctly use the small frame's 'h' and 'w') ---
                ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
                if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
                else: self.counters['eye_closure']=0
                if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
                
                mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
                if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
                else: self.counters['yawning']=0
                if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']

                # --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
                face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float64)
                face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float64)
                cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float64)
                _, rot_vec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, np.zeros((4,1),dtype=np.float64))
                rmat, _ = cv2.Rodrigues(rot_vec); angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
                pitch, yaw = angles[0], angles[1]

                if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
                else: self.counters['head_nod']=0
                if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']

                if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
                else: self.counters['looking_away']=0
                if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
                
                levels = self.settings['drowsiness_levels']
                if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
                elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
                
                drowsiness_indicators['details']['Score'] = score
        else:
            drowsiness_indicators["lighting"] = "Low"
        
        # --- Update state for skipped frames ---
        self.last_indicators = drowsiness_indicators
        self.last_landmarks = face_landmarks
        
        # --- Draw visuals on the ORIGINAL frame for a high-quality output ---
        processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks)

        return processed_frame, drowsiness_indicators, face_landmarks

    def draw_visuals(self, frame, indicators, landmarks_data=None):
        """Helper function to draw all visualizations on the frame."""
        # --- FIX 2: USE THE DIMENSIONS OF THE FRAME BEING DRAWN ON ---
        # We get the height and width from the input 'frame' itself, which is the original, full-sized one.
        h, w, _ = frame.shape
        level = indicators['drowsiness_level']
        score_val = indicators.get("details", {}).get("Score", 0)
        color = (0, 255, 0) # Green for Awake

        if indicators['lighting'] == "Low":
            color = (0, 165, 255) # Orange
            cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
        elif level == "Slightly Drowsy": color = (0, 255, 255) # Yellow
        elif level == "Very Drowsy": color = (0, 0, 255) # Red

        # Draw landmarks if they were detected
        if landmarks_data:
            landmarks = landmarks_data[0].landmark
            eye_mouth_landmarks = self.L_EYE + self.R_EYE + self.MOUTH
            for idx in eye_mouth_landmarks:
                lm = landmarks[idx]
                # Scale landmark coordinates to the full-sized frame using its 'w' and 'h'.
                x, y = int(lm.x * w), int(lm.y * h)
                cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)

        cv2.rectangle(frame, (0, 0), (w, h), color, 10)
        status_text = f"Status: {level} (Score: {score_val:.2f})"
        cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        return frame