File size: 6,519 Bytes
60b383e
 
 
 
 
 
 
 
a7ec466
 
 
 
 
ba6a8ea
935769d
b9d2bb7
 
935769d
b9d2bb7
 
 
935769d
 
 
 
b9d2bb7
935769d
ba6a8ea
a7ec466
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba6a8ea
60b383e
a7ec466
 
3a175cd
60b383e
 
 
a7ec466
60b383e
a7ec466
935769d
a7ec466
60b383e
a7ec466
 
60b383e
 
 
a7ec466
 
 
 
 
 
 
60b383e
935769d
60b383e
a7ec466
 
935769d
b9d2bb7
a7ec466
 
 
 
 
 
 
 
 
935769d
a7ec466
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5101617
a7ec466
8977de8
a7ec466
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv
from threading import Lock
from queue import Queue
import threading

from src.detection.factory import get_detector

# ───────────────────────────── logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s β”‚ %(message)s",
    datefmt="%H:%M:%S",
)

# ───────────────────────────── config / detector
load_dotenv()
with open("config.yaml") as f:
    CFG = yaml.safe_load(f)

detector = get_detector(CFG)

# ───────────────────────────── alert sound manager
class AlertManager:
    def __init__(self, wav_path, cooldown_seconds=5):
        self.cooldown_seconds = cooldown_seconds
        self.last_alert_time = 0
        self.lock = Lock()
        self.alert_queue = Queue(maxsize=1)
        
        # Load alert sound
        try:
            self.sample_rate, self.audio_data = sf.read(wav_path, dtype="float32")
            
            # Ensure stereo format for Gradio
            if self.audio_data.ndim == 1:
                self.audio_data = np.column_stack([self.audio_data, self.audio_data])
            
            # Normalize to [-1, 1] range
            max_val = np.abs(self.audio_data).max()
            if max_val > 0:
                self.audio_data = self.audio_data / max_val
            
            logging.info(f"Loaded alert sound: {wav_path} "
                        f"({len(self.audio_data)/self.sample_rate:.2f}s, "
                        f"shape: {self.audio_data.shape})")
            self.is_loaded = True
        except Exception as e:
            logging.error(f"Failed to load alert sound: {e}")
            self.is_loaded = False
            self.sample_rate = 44100
            self.audio_data = None
    
    def should_alert(self, drowsiness_level, lighting):
        """Check if we should trigger an alert"""
        if not self.is_loaded:
            return False
        
        with self.lock:
            current_time = time.monotonic()
            if (drowsiness_level != "Awake" 
                and lighting != "Low" 
                and (current_time - self.last_alert_time) > self.cooldown_seconds):
                self.last_alert_time = current_time
                return True
        return False
    
    def get_alert_audio(self):
        """Get the alert audio data"""
        if self.is_loaded:
            return (int(self.sample_rate), self.audio_data.copy())
        return None

# Initialize alert manager
alert_manager = AlertManager(
    wav_path=CFG["alerting"].get("alert_sound_path"),
    cooldown_seconds=CFG["alerting"].get("alert_cooldown_seconds", 5)
)

# ───────────────────────────── frame processing
def process_live_frame(frame):
    """Process frame for drowsiness detection"""
    if frame is None:
        return (
            np.zeros((480, 640, 3), dtype=np.uint8),
            "Status: Inactive",
            None
        )
    
    t0 = time.perf_counter()
    
    try:
        # Process frame through detector
        processed, indic = detector.process_frame(frame)
    except Exception as e:
        logging.error(f"Error processing frame: {e}")
        processed = np.zeros_like(frame)
        indic = {
            "drowsiness_level": "Error",
            "lighting": "Unknown",
            "details": {"Score": 0.0}
        }
    
    # Extract detection results
    level = indic.get("drowsiness_level", "Awake")
    lighting = indic.get("lighting", "Good")
    score = indic.get("details", {}).get("Score", 0.0)
    
    # Log performance
    dt_ms = (time.perf_counter() - t0) * 1000.0
    logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
    
    # Create status text
    status_txt = f"Lighting: {lighting}\n"
    if lighting == "Low":
        status_txt += "Detection paused – low light."
    else:
        status_txt += f"Status: {level}\nScore: {score:.2f}"
    
    # Check if we should trigger alert
    audio_out = None
    if alert_manager.should_alert(level, lighting):
        audio_out = alert_manager.get_alert_audio()
        if audio_out:
            logging.info("πŸ”Š Alert triggered!")
    
    return processed, status_txt, audio_out

# ───────────────────────────── UI with error handling
def create_ui():
    with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
        gr.Markdown("# πŸš— **Drive Paddy** – Real-time Drowsiness Detection")
        gr.Markdown("Webcam-based drowsiness detection with audio alerts.")
        
        with gr.Row():
            with gr.Column(scale=2):
                cam = gr.Image(
                    sources=["webcam"],
                    streaming=True,
                    label="Live Camera Feed"
                )
            
            with gr.Column(scale=1):
                out_img = gr.Image(label="Processed Feed")
                out_text = gr.Textbox(
                    label="Live Status",
                    lines=4,
                    interactive=False
                )
                out_audio = gr.Audio(
                    label="Alert Sound",
                    autoplay=True,
                    type="numpy",
                    visible=True,
                )
        
        # Add system info
        with gr.Row():
            gr.Markdown(f"""
            **System Info:**
            - Alert cooldown: {CFG['alerting'].get('alert_cooldown_seconds', 5)}s
            - Alert sound loaded: {'βœ“' if alert_manager.is_loaded else 'βœ—'}
            """)
        
        # Connect the streaming
        cam.stream(
            fn=process_live_frame,
            inputs=[cam],
            outputs=[out_img, out_text, out_audio]
        )
    
    return app

# ───────────────────────────── main
if __name__ == "__main__":
    logging.info("Initializing Drive Paddy...")
    
    # Create and launch app
    app = create_ui()
    
    logging.info("Launching Gradio app...")
    app.launch(
        debug=True,
        share=False,  # Set to True if you want a public link
        server_name="0.0.0.0",  # Allow external connections
        server_port=7860
    )