File size: 6,402 Bytes
19f420a
 
 
 
 
 
 
 
 
 
 
 
 
 
d305df8
 
 
 
 
 
 
 
19f420a
 
 
 
 
 
 
 
b828bd1
 
 
 
 
 
 
19f420a
b828bd1
19f420a
 
 
 
 
 
 
 
 
 
 
 
 
 
8eabd81
 
 
19f420a
b94e066
19f420a
 
 
 
 
8eabd81
 
 
19f420a
 
8eabd81
 
 
 
 
 
 
 
19f420a
d472afb
 
8eabd81
 
 
 
19f420a
 
 
 
f4fd68f
19f420a
 
 
 
8eabd81
 
19f420a
 
 
 
b828bd1
 
 
 
 
f4fd68f
 
19f420a
b828bd1
 
 
19f420a
 
 
 
 
 
 
4974f3a
d472afb
19f420a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4fd68f
 
 
19f420a
 
f4fd68f
19f420a
f4fd68f
19f420a
 
f4fd68f
19f420a
 
 
 
 
 
 
f4fd68f
19f420a
 
f4fd68f
 
19f420a
 
f4fd68f
 
 
19f420a
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# drive_paddy/pages/1_Live_Detection.py
import streamlit as st
from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase
import yaml
import av
import os
from dotenv import load_dotenv
import base64
import queue
import time

from src.detection.factory import get_detector
from src.alerting.alert_system import get_alerter

if "status_queue" not in st.session_state:
    st.session_state.status_queue = queue.Queue()
if "audio_queue" not in st.session_state:
    st.session_state.audio_queue = queue.Queue()
if "last_status" not in st.session_state:
    st.session_state.last_status = {"status": "Awake"}


# --- Load Configuration and Environment Variables ---
@st.cache_resource
def load_app_config():
    """Loads config from yaml and .env files."""
    load_dotenv()
    config_path = "/config.yaml" if os.path.exists("/config.yaml") else "config.yaml"
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)
    # Load secrets from environment
    secrets = {
        "gemini_api_key": os.getenv("GEMINI_API_KEY"),
        "turn_username": os.getenv("TURN_USERNAME"),
        "turn_credential": os.getenv("TURN_CREDENTIAL")
    }
    return config, secrets

config, secrets = load_app_config()

# --- Client-Side Audio Playback Function ---
def autoplay_audio(audio_bytes: bytes):
    """Injects HTML to autoplay audio in the user's browser."""
    b64 = base64.b64encode(audio_bytes).decode()
    md = f"""
        <audio controls autoplay="true" style="display:none;">
        <source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
        </audio>
        """
    st.markdown(md, unsafe_allow_html=True)

# --- WebRTC Video Processor ---
class VideoProcessor(VideoProcessorBase):
    def __init__(self, status_queue: queue.Queue, audio_queue: queue.Queue):
        self.status_queue = status_queue
        self.audio_queue = audio_queue
        self._detector = get_detector(config)
        self._alerter = get_alerter(config, secrets["gemini_api_key"])

    def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
        img = frame.to_ndarray(format="bgr24")
        
        strategy = config.get('detection_strategy')
        
        # The return signature of process_frame varies by strategy.
        # We need to handle each case correctly.
        if strategy == 'hybrid':
            processed_frame, alert_triggered, active_alerts = self._detector.process_frame(img)
            self.status_queue.put(active_alerts if alert_triggered or 'Low Light' in active_alerts else {"status": "Awake"})
        elif strategy == 'geometric':
            # The geometric processor returns frame, indicators, and landmarks.
            processed_frame, indicators, _ = self._detector.process_frame(img)
            alert_triggered = any(v for k, v in indicators.items() if k not in ['low_light', 'details'])
            self.status_queue.put(indicators if alert_triggered or indicators.get('low_light') else {"status": "Awake"})
        elif strategy == 'cnn_model':
            # The cnn_model processor returns frame and indicators.
            processed_frame, indicators = self._detector.process_frame(img)
            alert_triggered = any(indicators.values())
            self.status_queue.put(indicators if alert_triggered else {"status": "Awake"})
        else:
            # Default case if strategy is unknown
            processed_frame = img
            alert_triggered = False

        if alert_triggered:
            audio_data = self._alerter.trigger_alert()
            if audio_data:
                self.audio_queue.put(audio_data)
        else:
            self._alerter.reset_alert()
            
        return av.VideoFrame.from_ndarray(processed_frame, format="bgr24")
    
    
# --- Page UI ---
st.title("πŸ“Ή Live Drowsiness Detection")
st.info("Press 'START' to activate your camera and begin monitoring.")

# --- Dynamically Build RTC Configuration ---
ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}]
if secrets["turn_username"] and secrets["turn_credential"]:
    print("TURN credentials found, adding TURN servers to config.")
    turn_servers = [
        {'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]},
        {'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}
    ]
    ice_servers.extend(turn_servers)

RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers})


col1, col2 = st.columns([3, 1])

with col1:
    webrtc_ctx = webrtc_streamer(
        key="drowsiness-detection",
        video_processor_factory=VideoProcessor,
        rtc_configuration=RTC_CONFIGURATION, # Use the new robust configuration
        media_stream_constraints={"video": True, "audio": False},
        async_processing=True,
    )

with col2:
    st.header("System Status")
    if not webrtc_ctx.state.playing:
        st.warning("System Inactive.")
    else:
        st.success("βœ… System Active & Monitoring")

    st.subheader("Live Status:")
    status_placeholder = st.empty()
    audio_placeholder = st.empty()

if webrtc_ctx.state.playing:
    if "last_status" not in st.session_state:
        st.session_state.last_status = {"status": "Awake"}
    
    try:
        status_result = st.session_state.status_queue.get(timeout=0.1)
        st.session_state.last_status = status_result
    except queue.Empty:
        pass

    with status_placeholder.container():
        last_status = st.session_state.last_status
        if last_status.get("Low Light"):
             st.warning("⚠️ Low Light Detected! Accuracy may be affected.")
        elif last_status.get("status") == "Awake":
            st.info("βœ”οΈ Driver is Awake")
        else:
            st.error("🚨 DROWSINESS DETECTED!")
            for key, value in last_status.items():
                if key != "Low Light" and key != "status":
                    st.warning(f"-> {key}: {value:.2f}" if isinstance(value, float) else f"-> {key}")
    
    try:
        audio_data = st.session_state.audio_queue.get(timeout=0.1)
        with audio_placeholder.container():
            autoplay_audio(audio_data)
    except queue.Empty:
        pass

    time.sleep(0.1)
    st.rerun()
else:
    with status_placeholder.container():
        st.info("βœ”οΈ Driver is Awake")