File size: 8,221 Bytes
19f420a
 
 
 
3cfc6a1
19f420a
f6146b6
19f420a
 
f6146b6
 
 
 
19f420a
 
 
f6146b6
 
 
 
19f420a
 
 
653fae1
3cfc6a1
 
 
653fae1
3cfc6a1
19f420a
 
f6146b6
 
653fae1
f6146b6
 
 
8054642
3cfc6a1
653fae1
 
f6146b6
 
 
653fae1
f37553c
3cfc6a1
 
653fae1
 
f6146b6
 
653fae1
3cfc6a1
 
 
87a985c
653fae1
 
8054642
653fae1
 
 
 
8054642
 
 
653fae1
f6146b6
 
8054642
3cfc6a1
f6146b6
8054642
f6146b6
f37553c
653fae1
f6146b6
8054642
f37553c
8054642
 
 
f6146b6
653fae1
f37553c
8054642
19f420a
8054642
 
 
f37553c
 
19f420a
653fae1
 
f37553c
 
 
8054642
f37553c
 
 
 
19f420a
653fae1
 
 
 
8054642
3cfc6a1
 
f37553c
653fae1
8054642
 
 
 
 
 
 
 
f37553c
653fae1
 
f37553c
653fae1
f6146b6
8054642
 
653fae1
 
 
 
 
 
 
 
f6146b6
 
 
 
 
653fae1
f6146b6
 
653fae1
f37553c
653fae1
 
f6146b6
 
8054642
 
 
f6146b6
 
 
19f420a
8054642
f37553c
 
19f420a
f6146b6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import cv2
import mediapipe as mp
import numpy as np
import math
from src.detection.base_processor import BaseProcessor

# --- Helper Functions (Unchanged) ---
def calculate_ear(eye_landmarks, frame_shape):
    coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
    v1 = np.linalg.norm(coords[1] - coords[5])
    v2 = np.linalg.norm(coords[2] - coords[4])
    h1 = np.linalg.norm(coords[0] - coords[3])
    return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0

def calculate_mar(mouth_landmarks, frame_shape):
    coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
    v1 = np.linalg.norm(coords[1] - coords[7])
    v2 = np.linalg.norm(coords[2] - coords[6])
    v3 = np.linalg.norm(coords[3] - coords[5])
    h1 = np.linalg.norm(coords[0] - coords[4])
    return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0

class GeometricProcessor(BaseProcessor):
    # Landmark indices
    L_EYE = [362, 385, 387, 263, 373, 380]
    R_EYE = [33, 160, 158, 133, 153, 144]
    MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
    HEAD_POSE_LANDMARKS = [1, 152, 263, 33, 287, 57]

    def __init__(self, config):
        self.settings = config['geometric_settings']
        self.face_mesh = mp.solutions.face_mesh.FaceMesh(
            max_num_faces=1,
            refine_landmarks=False,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5)

        self.downscale_factor = self.settings.get('downscale_factor', 0.35)
        self.default_skip = max(1, self.settings.get("skip_frames", 2))
        
        # Caching and state variables
        self.frame_counter = 0
        self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
        self.last_landmarks = None
        self.last_drawn_frame = None
        self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
        self.zeros_4x1 = np.zeros((4, 1), np.float32)

    # --- FIX: Combined process_frame and analyse_frame into one method ---
    def process_frame(self, frame, draw_visuals=True):
        self.frame_counter += 1
        
        # Adaptive frame skipping logic
        last_level = self.last_indicators.get("drowsiness_level", "Awake")
        skip_n = 1 if last_level != "Awake" else self.default_skip

        if self.frame_counter % skip_n != 0:
            # If we are drawing visuals and have a cached frame, return it.
            if draw_visuals and self.last_drawn_frame is not None:
                return self.last_drawn_frame, self.last_indicators
            # If we are not drawing, we can just return the last indicators.
            elif not draw_visuals:
                return None, self.last_indicators
            # Fallback for the first frame
            else:
                return frame.copy(), self.last_indicators

        # --- Core Frame Analysis (This part runs for both modes) ---
        original_frame = frame.copy()
        h_orig, w_orig, _ = original_frame.shape
        
        small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
        h, w, _ = small_frame.shape

        gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
        brightness = np.mean(gray)

        drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
        face_landmarks_data = None

        if brightness < self.settings['low_light_thresh']:
            drowsiness_indicators["lighting"] = "Low"
        else:
            img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
            img_rgb.flags.writeable = False
            results = self.face_mesh.process(img_rgb)
            img_rgb.flags.writeable = True

            if results.multi_face_landmarks:
                face_landmarks_data = results.multi_face_landmarks[0]
                landmarks = face_landmarks_data.landmark
                score = 0
                weights = self.settings['indicator_weights']

                # --- Drowsiness Calculations ---
                ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
                if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
                else: self.counters['eye_closure']=0
                if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']

                mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
                if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
                else: self.counters['yawning']=0
                if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']

                # --- Head Pose Estimation ---
                face_3d_model = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32)
                face_2d_points = np.array([(landmarks[i].x*w,landmarks[i].y*h) for i in self.HEAD_POSE_LANDMARKS],dtype=np.float32)
                cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32)
                _, rvec, _ = cv2.solvePnP(face_3d_model, face_2d_points, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
                rmat, _ = cv2.Rodrigues(rvec)
                angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
                pitch, yaw = angles[0], angles[1]
                
                if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
                else: self.counters['head_nod']=0
                if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']

                if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
                else: self.counters['looking_away']=0
                if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']

                levels = self.settings['drowsiness_levels']
                if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
                elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
                drowsiness_indicators['details']['Score'] = score
        
        self.last_indicators = drowsiness_indicators
        self.last_landmarks = face_landmarks_data

        # --- FIX: Conditional Drawing ---
        if draw_visuals:
            processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks_data)
            self.last_drawn_frame = processed_frame
            return processed_frame, drowsiness_indicators
        else:
            # For analysis-only, return None for the frame to satisfy the unpack
            return None, drowsiness_indicators

    def draw_visuals(self, frame, indicators, landmarks_data=None):
        h, w, _ = frame.shape
        level = indicators['drowsiness_level']
        score_val = indicators.get("details", {}).get("Score", 0)
        color = (0, 255, 0)

        if indicators['lighting'] == "Low":
            color = (0, 165, 255)
            cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
        elif level == "Slightly Drowsy": color = (0, 255, 255)
        elif level == "Very Drowsy": color = (0, 0, 255)

        if landmarks_data:
            landmarks = landmarks_data.landmark
            eye_mouth_landmarks_indices = self.L_EYE + self.R_EYE + self.MOUTH
            for idx in eye_mouth_landmarks_indices:
                lm = landmarks[idx]
                x, y = int(lm.x * w), int(lm.y * h)
                cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)

        cv2.rectangle(frame, (0, 0), (w - 1, h - 1), color, 10)
        status_text = f"Status: {level} (Score: {score_val:.2f})"
        cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        return frame