File size: 13,939 Bytes
8ae9de4
 
 
 
 
 
 
 
60b383e
 
 
 
 
 
 
 
a7ec466
8ae9de4
 
a7ec466
ba6a8ea
935769d
b9d2bb7
 
935769d
b9d2bb7
 
 
935769d
 
 
 
b9d2bb7
935769d
ba6a8ea
8ae9de4
 
 
a7ec466
8ae9de4
7c9f785
a7ec466
8ae9de4
 
7c9f785
 
8ae9de4
 
 
 
 
 
a7ec466
7c9f785
e0dcec0
8ae9de4
 
 
a7ec466
 
8ae9de4
 
 
7c9f785
 
 
 
 
 
 
 
 
 
 
 
 
 
8ae9de4
 
7c9f785
8ae9de4
7c9f785
8ae9de4
 
7c9f785
 
 
 
8ae9de4
a7ec466
7c9f785
8ae9de4
 
ba6a8ea
8ae9de4
 
a7ec466
3a175cd
60b383e
 
 
8ae9de4
60b383e
8ae9de4
935769d
8ae9de4
60b383e
8ae9de4
a7ec466
60b383e
 
 
8ae9de4
 
60b383e
935769d
60b383e
8ae9de4
935769d
b9d2bb7
8ae9de4
 
 
 
 
 
 
 
 
 
 
 
 
 
a7ec466
8ae9de4
 
9501d84
6bf90e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9501d84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a7ec466
9501d84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6bf90e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9501d84
 
 
b6f77f0
1365143
6bf90e5
 
f131341
 
5101617
8977de8
8ae9de4
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# app_gradio.py
# ──────────────────────────────────────────────────────────
# Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
# Live console logs of per-frame latency + status.
#
# EDITED: This version uses a more robust method for audio playback
# in Gradio by dynamically creating the Audio component.
# ──────────────────────────────────────────────────────────
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv

# This is a mock factory and detector for demonstration.
# Replace with your actual import.
from src.detection.factory import get_detector

# ───────────────────────────── logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s β”‚ %(message)s",
    datefmt="%H:%M:%S",
)

# ───────────────────────────── config / detector
load_dotenv()
with open("config.yaml") as f:
    CFG = yaml.safe_load(f)

detector = get_detector(CFG)

# ───────────────────────────── Alert Manager Class <--- CHANGE
# Encapsulating the alert logic makes the code much cleaner.
# It handles its own state (last alert time) internally.
class AlertManager:
    def __init__(self, config):
        self.cooldown_seconds = config.get("alert_cooldown_seconds", 5)
        self.last_alert_time = 0
        self.alert_data = None
        self.sample_rate = None
        # --- NEW: State variable to track if an alert is active ---
        self.is_alert_active = False
        self._load_sound(config.get("alert_sound_path"))

    def _load_sound(self, wav_path):
        if not wav_path:
            logging.warning("No 'alert_sound_path' found in config.")
            return
        try:
            # Load as int16 to avoid the Gradio conversion warning
            data, sr = sf.read(wav_path, dtype="int16")
            self.alert_data = data
            self.sample_rate = sr
            logging.info(f"Loaded alert sound: {wav_path} ({len(self.alert_data)/self.sample_rate:.2f}s)")
        except Exception as e:
            logging.error(f"Failed to load alert sound: {e}")
            self.alert_data = None

    def trigger_alert(self, level, lighting):
        """
        Checks conditions and returns audio payload if a new alert should fire.
        This is now stateful.
        """
        # --- NEW LOGIC: Part 1 ---
        # If an alert is currently active, we do nothing until the user is 'Awake'.
        if self.is_alert_active:
            if level == "Awake":
                logging.info("βœ… Alert state reset. User is Awake. Re-arming system.")
                self.is_alert_active = False
            return None # Important: Return None to prevent any sound

        # --- ORIGINAL LOGIC (with a small change) ---
        # If no alert is active, check for conditions to fire a new one.
        is_drowsy = level != "Awake"
        is_good_light = lighting != "Low"
        # The time-based cooldown is still useful to prevent flickering alerts.
        is_ready = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds

        if self.alert_data is not None and is_drowsy and is_good_light and is_ready:
            self.last_alert_time = time.monotonic()
            # --- NEW LOGIC: Part 2 ---
            # Set the alert to active so it doesn't fire again immediately.
            self.is_alert_active = True
            logging.info("πŸ”Š Drowsiness detected! Firing alert and setting state to active.")
            return (self.sample_rate, self.alert_data.copy())

        return None
# Initialize the alert manager
alert_manager = AlertManager(CFG["alerting"])

# ───────────────────────────── frame processing <--- MAJOR CHANGE
# Simplified by the AlertManager. No longer needs to pass 'last_alert_ts' back and forth.
def process_live_frame(frame):
    if frame is None:
        return (
            np.zeros((480, 640, 3), dtype=np.uint8),
            "Status: Inactive",
            None # No audio output
        )

    t0 = time.perf_counter()

    try:
        # Assuming your detector returns (processed_image, indicators_dict)
        processed, indic = detector.process_frame(frame)
    except Exception as e:
        logging.error(f"Error processing frame: {e}")
        processed = np.zeros_like(frame)
        indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}

    level = indic.get("drowsiness_level", "Awake")
    lighting = indic.get("lighting", "Good")
    score = indic.get("details", {}).get("Score", 0.0)

    dt_ms = (time.perf_counter() - t0) * 1000.0
    logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")

    status_txt = (
        f"Lighting: {lighting}\n"
        + ("Detection paused – low light." if lighting == "Low"
           else f"Status: {level}\nScore: {score:.2f}")
    )

    # Check for an alert and get the audio payload if ready
    audio_payload = alert_manager.trigger_alert(level, lighting)

    # This is the key: return a new gr.Audio component when an alert fires.
    # Otherwise, return None to clear the component on the frontend.
    if audio_payload:
        return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True)
    else:
        return processed, status_txt, None


# Constants for the video experiment
VIDEO_FPS = 30.0
CHUNK_SIZE_SECONDS = 2
CHUNK_FRAME_COUNT = int(VIDEO_FPS * CHUNK_SIZE_SECONDS)
TEMP_VIDEO_FILE = "temp_video_chunk.mp4"

def process_video_chunk(frame, frame_buffer):
    """
    Processes a single frame, adds it to a buffer, and encodes a video chunk
    when the buffer is full. The alert system remains real-time.
    """
    if frame is None:
        return None, "Status: Inactive", None, [] # Return empty buffer

    # --- Real-time detection and alerting (This is not delayed) ---
    try:
        processed_frame, indic = detector.process_frame(frame)
    except Exception as e:
        logging.error(f"Error processing frame: {e}")
        processed_frame = np.zeros_like(frame)
        indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}

    level = indic.get("drowsiness_level", "Awake")
    lighting = indic.get("lighting", "Good")
    score = indic.get("details", {}).get("Score", 0.0)
    status_txt = f"Lighting: {lighting}\nStatus: {level}\nScore: {score:.2f}"
    
    audio_payload = alert_manager.trigger_alert(level, lighting)
    audio_out = gr.Audio(value=audio_payload, autoplay=True) if audio_payload else None

    # --- Video Buffering Logic ---
    frame_buffer.append(processed_frame)

    video_out = None # No video output until the chunk is ready
    if len(frame_buffer) >= CHUNK_FRAME_COUNT:
        logging.info(f"Buffer full. Encoding {len(frame_buffer)} frames to video chunk...")
        # Encode the buffer to a video file
        h, w, _ = frame_buffer[0].shape
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        writer = cv2.VideoWriter(TEMP_VIDEO_FILE, fourcc, VIDEO_FPS, (w, h))
        for f in frame_buffer:
            writer.write(f)
        writer.release()
        
        video_out = TEMP_VIDEO_FILE # Set the output to the new video file path
        frame_buffer = [] # Clear the buffer for the next chunk
        logging.info("Encoding complete. Sending video to frontend.")

    # Note: Status and Audio are returned on every frame for real-time feedback
    return video_out, status_txt, audio_out, frame_buffer


# ───────────────────────────── UI Definition
def create_readme_tab():
    """Creates the content for the 'About' tab."""
    with gr.Blocks(title="Drive Paddy - About Page") as readme_tab:
        gr.Markdown(
            """
            <div align="center">
              <img src="https://em-content.zobj.net/source/samsung/380/automobile_1f697.png" alt="Car Emoji" width="100"/>
              <h1>Drive Paddy</h1>
              <p><strong>Your Drowsiness Detection Assistant</strong></p>
            </div>

            ---

            ## 🌟 Features
            - **Real-Time Webcam Streaming**: Directly processes your live camera feed for immediate feedback.
            - **Efficient Geometric Analysis**: Uses `MediaPipe` for high-performance facial landmark detection.
            - **Multi-Signal Analysis**: Detects eye closure (EAR), yawns (MAR), and head-nodding.
            - **Stateful Alert System**: Plays a clear audio alert for new drowsiness events and intelligently re-arms itself, preventing alert fatigue.
            - **Low-Light Warning**: Automatically detects and warns about poor lighting conditions.
            - **Configurable**: Key detection thresholds and settings can be tuned via `config.yaml`.

            ---

            ## πŸ› οΈ How It Works
            1.  **Video Streaming**: The `gradio.Image` component captures the camera feed.
            2.  **Frame Processing**: Each frame is sent to the `GeometricProcessor`.
            3.  **Stateful Alerting**: The `AlertManager` class uses internal state to decide if a *new* alert should be triggered.
            4.  **Dynamic Updates**: The processed video, status text, and audio alerts are sent back to the frontend for a seamless real-time experience.
            
            ---

            ## πŸ’‘ Understanding the Live Status
            The status panel provides real-time feedback on the following parameters:

            -   **`Lighting`**: Indicates the ambient light conditions.
                -   `Good`: Sufficient light for reliable detection.
                -   `Low`: Insufficient light. Detection is paused as the results would be unreliable.

            -   **`Status`**: The overall assessed level of driver alertness.
                -   `Awake`: The driver appears alert.
                -   `Slightly Drowsy`: Early signs of fatigue have been detected.
                -   `Very Drowsy`: Strong indicators of drowsiness are present. An alert is triggered.

            -   **`Score`**: A numerical value representing the accumulated evidence of drowsiness based on the weighted indicators (eye closure, yawning, head pose). A higher score corresponds to a greater level of detected drowsiness.
            """
        )
    return readme_tab

    
# ───────────────────────────── UI <--- CHANGE
def create_detection_tab():
    with gr.Blocks(title="Drive Paddy – πŸ“Ή Live Drowsiness Detection Tab") as detection_tab:
        gr.Markdown("## πŸ“Ή Live Drowsiness Detection")
        gr.Markdown("Press 'START' to activate your camera and begin monitoring. The console will show real-time logs.")
    
        with gr.Row():
            with gr.Column(scale=2):
                cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
            with gr.Column(scale=1):
                out_img = gr.Image(label="Processed Feed")
                out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
                
                # This audio component now acts as a placeholder.
                # We make it invisible because we don't need to show the player controls.
                # The backend will dynamically send a new, playable component to it.
                out_audio = gr.Audio(
                    label="Alert",
                    autoplay=True,
                    visible=False, # Hiding the component for a cleaner UX
                )
    
        # The gr.State for managing the timestamp is no longer needed, simplifying the stream call.
        cam.stream(
            fn=process_live_frame,
            inputs=[cam],
            outputs=[out_img, out_text, out_audio] # The output now targets the placeholder
        )

def create_video_experiment_tab():
    """Creates the content for the Video Chunk experiment tab."""
    with gr.Blocks() as video_tab:
        gr.Markdown("## πŸ§ͺ Video Output Experiment")
        gr.Markdown(f"This feed buffers processed frames and outputs them as **{CHUNK_SIZE_SECONDS}-second video chunks**. Notice the trade-off between smoothness and latency. Alerts remain real-time.")
        with gr.Row():
            with gr.Column(scale=2):
                cam_video = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
            with gr.Column(scale=1):
                out_video = gr.Video(label="Processed Video Chunk")
                out_text_video = gr.Textbox(label="Live Status", lines=3, interactive=False)
                out_audio_video = gr.Audio(label="Alert", autoplay=True, visible=False)

        # State to hold the buffer of frames between updates
        frame_buffer_state = gr.State([])
        
        cam_video.stream(
            fn=process_video_chunk,
            inputs=[cam_video, frame_buffer_state],
            outputs=[out_video, out_text_video, out_audio_video, frame_buffer_state]
        )
    return video_tab
    
with gr.Blocks(title="πŸš— Drive Paddy – Drowsiness Detection", theme=gr.themes.Soft()) as app:
    gr.Markdown("# πŸš— **Drive Paddy**")
    with gr.Tabs():
        with gr.TabItem("Live Detection"):
            create_detection_tab()
        with gr.TabItem("Video Output Experiment"):
            create_video_experiment_tab()
        with gr.TabItem("About this App"):
            create_readme_tab()

if __name__ == "__main__":
    logging.info("Launching Gradio app…")
    app.launch(debug=True)