Spaces:
Runtime error
Runtime error
File size: 5,582 Bytes
19f420a d305df8 19f420a b828bd1 19f420a b828bd1 19f420a d472afb 19f420a b94e066 d472afb 19f420a d472afb f4fd68f 19f420a d472afb 19f420a f4fd68f 19f420a b828bd1 f4fd68f 19f420a b828bd1 19f420a 4974f3a d472afb 19f420a f4fd68f 19f420a f4fd68f 19f420a f4fd68f 19f420a f4fd68f 19f420a f4fd68f 19f420a f4fd68f 19f420a f4fd68f 19f420a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# drive_paddy/pages/1_Live_Detection.py
import streamlit as st
from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase
import yaml
import av
import os
from dotenv import load_dotenv
import base64
import queue
import time
from src.detection.factory import get_detector
from src.alerting.alert_system import get_alerter
if "status_queue" not in st.session_state:
st.session_state.status_queue = queue.Queue()
if "audio_queue" not in st.session_state:
st.session_state.audio_queue = queue.Queue()
if "last_status" not in st.session_state:
st.session_state.last_status = {"status": "Awake"}
# --- Load Configuration and Environment Variables ---
@st.cache_resource
def load_app_config():
"""Loads config from yaml and .env files."""
load_dotenv()
config_path = "/config.yaml" if os.path.exists("/config.yaml") else "config.yaml"
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Load secrets from environment
secrets = {
"gemini_api_key": os.getenv("GEMINI_API_KEY"),
"turn_username": os.getenv("TURN_USERNAME"),
"turn_credential": os.getenv("TURN_CREDENTIAL")
}
return config, secrets
config, secrets = load_app_config()
# --- Client-Side Audio Playback Function ---
def autoplay_audio(audio_bytes: bytes):
"""Injects HTML to autoplay audio in the user's browser."""
b64 = base64.b64encode(audio_bytes).decode()
md = f"""
<audio controls autoplay="true" style="display:none;">
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
</audio>
"""
st.markdown(md, unsafe_allow_html=True)
# --- WebRTC Video Processor ---
class VideoProcessor(VideoProcessorBase):
def __init__(self):
self._detector = get_detector(config)
self._alerter = get_alerter(config, secrets["gemini_api_key"])
# Thread-safe queues for communication
self.status_queue = queue.Queue()
self.audio_queue = queue.Queue()
def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
img = frame.to_ndarray(format="bgr24")
strategy = config.get('detection_strategy')
if strategy == 'hybrid':
processed_frame, alert_triggered, active_alerts = self._detector.process_frame(img)
self.status_queue.put(active_alerts if alert_triggered else {"status": "Awake"})
else:
processed_frame, indicators = self._detector.process_frame(img)
alert_triggered = any(indicators.values())
self.status_queue.put(indicators if alert_triggered else {"status": "Awake"})
if alert_triggered:
audio_data = self._alerter.trigger_alert()
if audio_data:
self.audio_queue.put(audio_data)
else:
self._alerter.reset_alert()
return av.VideoFrame.from_ndarray(processed_frame, format="bgr24")
# --- Page UI ---
st.title("πΉ Live Drowsiness Detection")
st.info("Press 'START' to activate your camera and begin monitoring.")
# --- Dynamically Build RTC Configuration ---
ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}]
if secrets["turn_username"] and secrets["turn_credential"]:
print("TURN credentials found, adding TURN servers to config.")
turn_servers = [
{'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]},
{'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}
]
ice_servers.extend(turn_servers)
RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers})
col1, col2 = st.columns([3, 1])
with col1:
webrtc_ctx = webrtc_streamer(
key="drowsiness-detection",
video_processor_factory=VideoProcessor,
rtc_configuration=RTC_CONFIGURATION, # Use the new robust configuration
media_stream_constraints={"video": True, "audio": False},
async_processing=True,
)
with col2:
st.header("System Status")
if not webrtc_ctx.state.playing:
st.warning("System Inactive.")
else:
st.success("β
System Active & Monitoring")
st.subheader("Live Status:")
status_placeholder = st.empty()
audio_placeholder = st.empty()
if webrtc_ctx.state.playing:
if "last_status" not in st.session_state:
st.session_state.last_status = {"status": "Awake"}
try:
status_result = st.session_state.status_queue.get(timeout=0.1)
st.session_state.last_status = status_result
except queue.Empty:
pass
with status_placeholder.container():
last_status = st.session_state.last_status
if last_status.get("Low Light"):
st.warning("β οΈ Low Light Detected! Accuracy may be affected.")
elif last_status.get("status") == "Awake":
st.info("βοΈ Driver is Awake")
else:
st.error("π¨ DROWSINESS DETECTED!")
for key, value in last_status.items():
if key != "Low Light" and key != "status":
st.warning(f"-> {key}: {value:.2f}" if isinstance(value, float) else f"-> {key}")
try:
audio_data = st.session_state.audio_queue.get(timeout=0.1)
with audio_placeholder.container():
autoplay_audio(audio_data)
except queue.Empty:
pass
time.sleep(0.1)
st.rerun()
else:
with status_placeholder.container():
st.info("βοΈ Driver is Awake")
|