Spaces:
Runtime error
Runtime error
File size: 6,710 Bytes
19f420a d305df8 f37553c d305df8 5030574 19f420a b828bd1 19f420a b828bd1 19f420a 5030574 19f420a b94e066 5030574 19f420a 8eabd81 19f420a 8eabd81 f37553c 8eabd81 19f420a d472afb 8eabd81 19f420a f4fd68f 19f420a 5030574 19f420a b828bd1 f4fd68f 19f420a b828bd1 19f420a 4974f3a d472afb 19f420a f4fd68f 19f420a 5030574 19f420a f4fd68f 19f420a f4fd68f 19f420a f4fd68f f37553c f4fd68f 19f420a f4fd68f 19f420a f37553c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# drive_paddy/pages/1_Live_Detection.py
import streamlit as st
from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase
import yaml
import av
import os
from dotenv import load_dotenv
import base64
import queue
import time
from src.detection.factory import get_detector
from src.alerting.alert_system import get_alerter
if "status_queue" not in st.session_state:
st.session_state.status_queue = queue.Queue()
if "audio_queue" not in st.session_state:
st.session_state.audio_queue = queue.Queue()
if "last_status" not in st.session_state:
st.session_state.last_status = {"drowsiness_level": "Awake", "lighting": "Good"}
# --- Load Configuration and Environment Variables ---
@st.cache_resource
def load_app_config():
"""Loads config from yaml and .env files."""
load_dotenv()
config_path = "/config.yaml" if os.path.exists("/config.yaml") else "config.yaml"
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Load secrets from environment
secrets = {
"gemini_api_key": os.getenv("GEMINI_API_KEY"),
"turn_username": os.getenv("TURN_USERNAME"),
"turn_credential": os.getenv("TURN_CREDENTIAL")
}
return config, secrets
config, secrets = load_app_config()
# --- Client-Side Audio Playback Function ---
def autoplay_audio(audio_bytes: bytes):
"""Injects HTML to autoplay audio in the user's browser."""
b64 = base64.b64encode(audio_bytes).decode()
md = f"""
<audio controls autoplay="true" style="display:none;">
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
</audio>
"""
st.markdown(md, unsafe_allow_html=True)
# --- WebRTC Video Processor ---
class VideoProcessor(VideoProcessorBase):
def __init__(self):
self._detector = get_detector(config)
self._alerter = get_alerter(config, secrets["gemini_api_key"])
# Thread-safe queues for communication
self.status_queue = queue.Queue()
self.audio_queue = queue.Queue()
def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
img = frame.to_ndarray(format="bgr24")
strategy = config.get('detection_strategy')
# The return signature of process_frame varies by strategy.
# We need to handle each case correctly.
if strategy == 'hybrid':
processed_frame, alert_triggered, active_alerts = self._detector.process_frame(img)
self.status_queue.put(active_alerts if alert_triggered or 'Low Light' in active_alerts else {"status": "Awake"})
elif strategy == 'geometric':
# The geometric processor returns frame, indicators, and landmarks.
processed_frame, indicators, _ = self._detector.process_frame(img)
alert_triggered = any(v for k, v in indicators.items() if k not in ['low_light', 'details'])
self.status_queue.put(indicators if alert_triggered or indicators.get('low_light') else {"status": "Awake"})
print(f"Indicators: {indicators}") # Debugging line
print(self.status_queue) # Debugging line
elif strategy == 'cnn_model':
# The cnn_model processor returns frame and indicators.
processed_frame, indicators = self._detector.process_frame(img)
alert_triggered = any(indicators.values())
self.status_queue.put(indicators if alert_triggered else {"status": "Awake"})
else:
# Default case if strategy is unknown
processed_frame = img
alert_triggered = False
if alert_triggered:
audio_data = self._alerter.trigger_alert()
if audio_data:
self.audio_queue.put(audio_data)
else:
self._alerter.reset_alert()
return av.VideoFrame.from_ndarray(processed_frame, format="bgr24")
# --- Page UI ---
st.title("πΉ Live Drowsiness Detection")
st.info("Press 'START' to activate your camera and begin monitoring.")
# --- Dynamically Build RTC Configuration ---
ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}]
if secrets["turn_username"] and secrets["turn_credential"]:
turn_servers = [
{'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]},
{'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}
]
ice_servers.extend(turn_servers)
RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers})
col1, col2 = st.columns([3, 1])
with col1:
webrtc_ctx = webrtc_streamer(
key="drowsiness-detection",
video_processor_factory=VideoProcessor,
rtc_configuration=RTC_CONFIGURATION, # Use the new robust configuration
media_stream_constraints={"video": True, "audio": False},
async_processing=True,
)
with col2:
st.header("System Status")
if not webrtc_ctx.state.playing:
st.warning("System Inactive.")
else:
st.success("β
System Active & Monitoring")
st.subheader("Live Status:")
status_placeholder = st.empty()
audio_placeholder = st.empty()
if webrtc_ctx.state.playing:
if "last_status" not in st.session_state:
st.session_state.last_status = {"status": "Awake"}
try:
# This line will now work because st.session_state.status_queue was initialized
status_result = st.session_state.status_queue.get(timeout=0.1)
st.session_state.last_status = status_result
except queue.Empty:
pass
with status_placeholder.container():
last_status = st.session_state.last_status
drowsiness_level = last_status.get("drowsiness_level", "Awake")
lighting = last_status.get("lighting", "Good")
score = last_status.get("details", {}).get("Score", 0)
st.metric(label="Lighting Condition", value=lighting)
if lighting == "Low": st.warning("Detection paused due to low light.")
if drowsiness_level == "Awake": st.info(f"βοΈ Awake (Score: {score:.2f})")
elif drowsiness_level == "Slightly Drowsy": st.warning(f"β οΈ Slightly Drowsy (Score: {score:.2f})")
elif drowsiness_level == "Very Drowsy": st.error(f"π¨ Very Drowsy! (Score: {score:.2f})")
try:
audio_data = st.session_state.audio_queue.get(timeout=0.1)
with audio_placeholder.container():
autoplay_audio(audio_data)
except queue.Empty:
pass
time.sleep(0.1)
st.rerun()
else:
with status_placeholder.container():
st.info("βοΈ Driver is Awake")
|