Spaces:
Sleeping
Sleeping
File size: 6,415 Bytes
19f420a c863e7a 19f420a 929f736 19f420a 929f736 19f420a 929f736 d305df8 929f736 f37553c d305df8 929f736 19f420a 929f736 9af53cb 19f420a b828bd1 6b8d232 b828bd1 19f420a b828bd1 19f420a 929f736 45edd80 929f736 45edd80 19f420a b94e066 19f420a 8eabd81 929f736 19f420a 929f736 19f420a 929f736 f4fd68f 19f420a 5030574 19f420a ed2439c 929f736 19f420a 929f736 f1c9a67 f39959a 19f420a 929f736 19f420a 929f736 19f420a f4fd68f 19f420a f4fd68f 19f420a f4fd68f f37553c 929f736 f37553c 929f736 f4fd68f 19f420a f4fd68f 19f420a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# drive_paddy/pages/1_Live_Detection.py
import streamlit as st
from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase
import yaml
import av
import os
from dotenv import load_dotenv
import base64
import requests
import queue
import time
from typing import List, Dict, Union
# Correctly import from the drive_paddy package structure
from src.detection.factory import get_detector
from src.alerting.alert_system import get_alerter
# --- Initialize Session State at the TOP of the script ---
# This is the single source of truth for our queues and must run on every page load.
if "status_queue" not in st.session_state:
st.session_state.status_queue = queue.Queue()
if "audio_queue" not in st.session_state:
st.session_state.audio_queue = queue.Queue()
if "last_status" not in st.session_state:
st.session_state.last_status = {"drowsiness_level": "Awake", "lighting": "Good"}
# --- Load Configuration and Environment Variables ---
@st.cache_resource
def load_app_config():
"""Loads config from yaml and .env files."""
load_dotenv()
# Navigate up to the root to find the config file
config_path = "./config.yaml"
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Load secrets from environment
secrets = {
"gemini_api_key": os.getenv("GEMINI_API_KEY"),
"turn_username": os.getenv("TURN_USERNAME"),
"turn_credential": os.getenv("TURN_CREDENTIAL"),
"metered_domain": os.getenv("METERED_DOMAIN"),
"metered_api_key": os.getenv("METERED_KEY")
}
return config, secrets
config, secrets = load_app_config()
# --- Client-Side Audio Playback Function ---
def autoplay_audio(audio_bytes: bytes):
"""Injects HTML to autoplay audio in the user's browser."""
b64 = base64.b64encode(audio_bytes).decode()
md = f"""
<audio controls autoplay="true" style="display:none;">
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
</audio>
"""
st.markdown(md, unsafe_allow_html=True)
# --- WebRTC Video Processor ---
class VideoProcessor(VideoProcessorBase):
# The __init__ method now accepts the queues as arguments
def __init__(self):
# It uses the queues passed in from session_state, not new ones.
self.status_queue = queue.Queue
self.audio_queue = queue.Queue
self._detector = get_detector(config)
self._alerter = get_alerter(config, secrets["gemini_api_key"])
def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
img = frame.to_ndarray(format="bgr24")
strategy = config.get('detection_strategy')
# The return signature of process_frame varies by strategy.
processed_frame, indicators, _ = self._detector.process_frame(img)
drowsiness_level = indicators.get("drowsiness_level", "Awake")
# This now correctly puts data into the shared session_state queue.
self.status_queue.put(indicators)
if drowsiness_level != "Awake":
audio_data = self._alerter.trigger_alert(level=drowsiness_level)
if audio_data:
# This now correctly puts audio data into the shared queue.
self.audio_queue.put(audio_data)
else:
self._alerter.reset_alert()
return av.VideoFrame.from_ndarray(processed_frame, format="bgr24")
# --- Page UI ---
st.title("๐น Live Drowsiness Detection")
st.info("Press 'START' to activate your camera and begin monitoring.")
RTC_CONFIGURATION = RTCConfiguration({
"iceServers": [
{
"urls": ["stun:stun.relay.metered.ca:80"],
},
{
"urls": ["turn:global.relay.metered.ca:80"],
"username": "086986a440b20fe48229738b",
"credential": "mOC7fVSg00zjlsTD",
},
{
"urls": ["turn:global.relay.metered.ca:80?transport=tcp"],
"username": "086986a440b20fe48229738b",
"credential": "mOC7fVSg00zjlsTD",
},
{
"urls": ["turn:global.relay.metered.ca:443"],
"username": "086986a440b20fe48229738b",
"credential": "mOC7fVSg00zjlsTD",
},
{
"urls": ["turns:global.relay.metered.ca:443?transport=tcp"],
"username": "086986a440b20fe48229738b",
"credential": "mOC7fVSg00zjlsTD",
},
]
})
col1, col2 = st.columns([3, 1])
with col1:
webrtc_ctx = webrtc_streamer(
key="drowsiness-detection",
# The factory now correctly passes the queues from session_state
video_processor_factory=VideoProcessor,
# rtc_configuration=RTC_CONFIGURATION,
# media_stream_constraints={"video": True, "audio": False},
# async_processing=True,
)
with col2:
st.header("System Status")
audio_placeholder = st.empty()
if not webrtc_ctx.state.playing:
st.warning("System Inactive.")
else:
st.success("โ
System Active & Monitoring")
st.subheader("Live Status:")
status_placeholder = st.empty()
if webrtc_ctx.state.playing:
try:
# This now reads from the correct queue that the processor is writing to.
status_result = st.session_state.status_queue.get(timeout=0.1)
st.session_state.last_status = status_result
except queue.Empty:
pass
with status_placeholder.container():
last_status = st.session_state.last_status
drowsiness_level = last_status.get("drowsiness_level", "Awake")
lighting = last_status.get("lighting", "Good")
score = last_status.get("details", {}).get("Score", 0)
st.metric(label="Lighting Condition", value=lighting)
if lighting == "Low":
st.warning("Detection paused due to low light.")
if drowsiness_level == "Awake":
st.info(f"โ๏ธ Awake (Score: {score:.2f})")
elif drowsiness_level == "Slightly Drowsy":
st.warning(f"โ ๏ธ Slightly Drowsy (Score: {score:.2f})")
elif drowsiness_level == "Very Drowsy":
st.error(f"๐จ Very Drowsy! (Score: {score:.2f})")
try:
audio_data = st.session_state.audio_queue.get(timeout=0.1)
with audio_placeholder.container():
autoplay_audio(audio_data)
except queue.Empty:
pass
time.sleep(0.1)
st.rerun()
else:
with status_placeholder.container():
st.info("โ๏ธ Driver is Awake")
|