Spaces:
Sleeping
Sleeping
# app_gradio.py | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Webcam β geometric detector β static WAV alert (with cooldown) | |
# Live console logs of per-frame latency + status. | |
# | |
# EDITED: This version uses a more robust method for audio playback | |
# in Gradio by dynamically creating the Audio component. | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
import time | |
import os | |
import yaml | |
import logging | |
import numpy as np | |
import gradio as gr | |
import soundfile as sf | |
from dotenv import load_dotenv | |
# This is a mock factory and detector for demonstration. | |
# Replace with your actual import. | |
from src.detection.factory import get_detector | |
# βββββββββββββββββββββββββββββ logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s β %(message)s", | |
datefmt="%H:%M:%S", | |
) | |
# βββββββββββββββββββββββββββββ config / detector | |
load_dotenv() | |
with open("config.yaml") as f: | |
CFG = yaml.safe_load(f) | |
detector = get_detector(CFG) | |
# βββββββββββββββββββββββββββββ Alert Manager Class <--- CHANGE | |
# Encapsulating the alert logic makes the code much cleaner. | |
# It handles its own state (last alert time) internally. | |
class AlertManager: | |
def __init__(self, config): | |
self.cooldown_seconds = config.get("alert_cooldown_seconds", 5) | |
self.last_alert_time = 0 | |
self.alert_data = None | |
self.sample_rate = None | |
# --- NEW: State variable to track if an alert is active --- | |
self.is_alert_active = False | |
self._load_sound(config.get("alert_sound_path")) | |
def _load_sound(self, wav_path): | |
if not wav_path: | |
logging.warning("No 'alert_sound_path' found in config.") | |
return | |
try: | |
# Load as int16 to avoid the Gradio conversion warning | |
data, sr = sf.read(wav_path, dtype="int16") | |
self.alert_data = data | |
self.sample_rate = sr | |
logging.info(f"Loaded alert sound: {wav_path} ({len(self.alert_data)/self.sample_rate:.2f}s)") | |
except Exception as e: | |
logging.error(f"Failed to load alert sound: {e}") | |
self.alert_data = None | |
def trigger_alert(self, level, lighting): | |
""" | |
Checks conditions and returns audio payload if a new alert should fire. | |
This is now stateful. | |
""" | |
# --- NEW LOGIC: Part 1 --- | |
# If an alert is currently active, we do nothing until the user is 'Awake'. | |
if self.is_alert_active: | |
if level == "Awake": | |
logging.info("β Alert state reset. User is Awake. Re-arming system.") | |
self.is_alert_active = False | |
return None # Important: Return None to prevent any sound | |
# --- ORIGINAL LOGIC (with a small change) --- | |
# If no alert is active, check for conditions to fire a new one. | |
is_drowsy = level != "Awake" | |
is_good_light = lighting != "Low" | |
# The time-based cooldown is still useful to prevent flickering alerts. | |
is_ready = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds | |
if self.alert_data is not None and is_drowsy and is_good_light and is_ready: | |
self.last_alert_time = time.monotonic() | |
# --- NEW LOGIC: Part 2 --- | |
# Set the alert to active so it doesn't fire again immediately. | |
self.is_alert_active = True | |
logging.info("π Drowsiness detected! Firing alert and setting state to active.") | |
return (self.sample_rate, self.alert_data.copy()) | |
return None | |
# Initialize the alert manager | |
alert_manager = AlertManager(CFG["alerting"]) | |
# βββββββββββββββββββββββββββββ frame processing <--- MAJOR CHANGE | |
def process_live_frame(frame): | |
if frame is None: | |
return ( | |
np.zeros((480, 640, 3), dtype=np.uint8), | |
"Status: Inactive", | |
None # No audio output | |
) | |
t0 = time.perf_counter() | |
try: | |
processed, indic = detector.process_frame(frame) | |
except Exception as e: | |
logging.error(f"Error processing frame: {e}") | |
processed = np.zeros_like(frame) | |
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}} | |
level = indic.get("drowsiness_level", "Awake") | |
lighting = indic.get("lighting", "Good") | |
score = indic.get("details", {}).get("Score", 0.0) | |
dt_ms = (time.perf_counter() - t0) * 1000.0 | |
logging.info(f"{dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}") | |
status_txt = ( | |
f"Lighting: {lighting}\n" | |
+ ("Detection paused β low light." if lighting == "Low" | |
else f"Status: {level}\nScore: {score:.2f}") | |
) | |
audio_payload = alert_manager.trigger_alert(level, lighting) | |
if audio_payload: | |
return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True) | |
else: | |
return processed, status_txt, None | |
# βββββββββββββββββββββββββββββ NEW: Frame Processing for Tab 2 (Analysis-Only) | |
def process_for_stats_only(frame): | |
""" | |
Processes a frame but does not return any video/image output. | |
This is the fastest method, focused only on status and alerts. | |
""" | |
if frame is None: | |
return "Status: Inactive", None | |
t0 = time.perf_counter() | |
try: | |
# We still call the same detector, but we will ignore the processed frame it returns. | |
_, indic = detector.process_frame(frame) | |
except Exception as e: | |
logging.error(f"Error processing frame: {e}") | |
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}} | |
level = indic.get("drowsiness_level", "Awake") | |
lighting = indic.get("lighting", "Good") | |
score = indic.get("details", {}).get("Score", 0.0) | |
dt_ms = (time.perf_counter() - t0) * 1000.0 | |
logging.info(f"ANALYSIS ONLY β {dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}") | |
status_txt = ( | |
f"Status: {level} (Score: {score:.2f})\n" | |
f"Lighting: {lighting}\n" | |
f"Processing Time: {dt_ms:.1f} ms" | |
) | |
audio_payload = alert_manager.trigger_alert(level, lighting) | |
audio_out = gr.Audio(value=audio_payload, autoplay=True) if audio_payload else None | |
return status_txt, audio_out | |
# βββββββββββββββββββββββββββββ UI Definition | |
def create_readme_tab(): | |
"""Creates the content for the 'About' tab.""" | |
with gr.Blocks(title="Drive Paddy - About Page") as readme_tab: | |
gr.Markdown( | |
""" | |
<div align="center"> | |
<img src="https://em-content.zobj.net/source/samsung/380/automobile_1f697.png" alt="Car Emoji" width="100"/> | |
<h1>Drive Paddy</h1> | |
<p><strong>Your Drowsiness Detection Assistant</strong></p> | |
</div> | |
--- | |
## π Features | |
- **Real-Time Webcam Streaming**: Directly processes your live camera feed for immediate feedback. | |
- **Efficient Geometric Analysis**: Uses `MediaPipe` for high-performance facial landmark detection. | |
- **Multi-Signal Analysis**: Detects eye closure (EAR), yawns (MAR), and head-nodding. | |
- **Stateful Alert System**: Plays a clear audio alert for new drowsiness events and intelligently re-arms itself, preventing alert fatigue. | |
- **Low-Light Warning**: Automatically detects and warns about poor lighting conditions. | |
- **Configurable**: Key detection thresholds and settings can be tuned via `config.yaml`. | |
--- | |
## π οΈ How It Works | |
1. **Video Streaming**: The `gradio.Image` component captures the camera feed. | |
2. **Frame Processing**: Each frame is sent to the `GeometricProcessor`. | |
3. **Stateful Alerting**: The `AlertManager` class uses internal state to decide if a *new* alert should be triggered. | |
4. **Dynamic Updates**: The processed video, status text, and audio alerts are sent back to the frontend for a seamless real-time experience. | |
--- | |
## π‘ Understanding the Live Status | |
The status panel provides real-time feedback on the following parameters: | |
- **`Lighting`**: Indicates the ambient light conditions. | |
- `Good`: Sufficient light for reliable detection. | |
- `Low`: Insufficient light. Detection is paused as the results would be unreliable. | |
- **`Status`**: The overall assessed level of driver alertness. | |
- `Awake`: The driver appears alert. | |
- `Slightly Drowsy`: Early signs of fatigue have been detected. | |
- `Very Drowsy`: Strong indicators of drowsiness are present. An alert is triggered. | |
- **`Score`**: A numerical value representing the accumulated evidence of drowsiness based on the weighted indicators (eye closure, yawning, head pose). A higher score corresponds to a greater level of detected drowsiness. | |
""" | |
) | |
return readme_tab | |
# βββββββββββββββββββββββββββββ UI <--- CHANGE | |
def create_detection_tab(): | |
with gr.Blocks(title="Drive Paddy β πΉ Live Drowsiness Detection Tab") as detection_tab: | |
gr.Markdown("## πΉ Live Drowsiness Detection") | |
gr.Markdown("Press 'START' to activate your camera and begin monitoring. The console will show real-time logs.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed") | |
with gr.Column(scale=1): | |
out_img = gr.Image(label="Processed Feed") | |
out_text = gr.Textbox(label="Live Status", lines=3, interactive=False) | |
# This audio component now acts as a placeholder. | |
# We make it invisible because we don't need to show the player controls. | |
# The backend will dynamically send a new, playable component to it. | |
out_audio = gr.Audio( | |
label="Alert", | |
autoplay=True, | |
visible=False, # Hiding the component for a cleaner UX | |
) | |
# The gr.State for managing the timestamp is no longer needed, simplifying the stream call. | |
cam.stream( | |
fn=process_live_frame, | |
inputs=[cam], | |
outputs=[out_img, out_text, out_audio] # The output now targets the placeholder | |
) | |
def create_analysis_only_tab(): | |
"""Creates the content for the Analysis-Only Mode tab.""" | |
gr.Markdown("## β‘ Analysis-Only Mode") | |
gr.Markdown("This mode provides the fastest possible analysis by not sending any video back to the browser. The camera is still active for detection, but you will only see the live status and hear alerts.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# The input camera is visible so the user knows it's working, | |
# but there is no corresponding video output component. | |
cam_analysis = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed (for detection)") | |
with gr.Column(scale=1): | |
out_text_analysis = gr.Textbox(label="Live Status & Performance", lines=4, interactive=False) | |
out_audio_analysis = gr.Audio(label="Alert", autoplay=True, visible=False) | |
cam_analysis.stream( | |
fn=process_for_stats_only, | |
inputs=[cam_analysis], | |
outputs=[out_text_analysis, out_audio_analysis] | |
) | |
with gr.Blocks(title="π Drive Paddy β Drowsiness Detection", theme=gr.themes.Soft()) as app: | |
gr.Markdown("# π **Drive Paddy**") | |
with gr.Tabs(): | |
with gr.TabItem("Live Detection"): | |
create_detection_tab() | |
with gr.TabItem("Analysis-Only Mode"): | |
create_analysis_only_tab() | |
with gr.TabItem("About this App"): | |
create_readme_tab() | |
if __name__ == "__main__": | |
logging.info("Launching Gradio appβ¦") | |
app.launch(debug=True) |