Spaces:
Sleeping
Sleeping
# drive_paddy/pages/1_Live_Detection.py | |
import streamlit as st | |
from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase | |
import yaml | |
import av | |
import os | |
from dotenv import load_dotenv | |
import base64 | |
import requests | |
import queue | |
import time | |
from typing import List, Dict, Union | |
# Correctly import from the drive_paddy package structure | |
from src.detection.factory import get_detector | |
from src.alerting.alert_system import get_alerter | |
# --- Initialize Session State at the TOP of the script --- | |
# This is the single source of truth for our queues and must run on every page load. | |
if "status_queue" not in st.session_state: | |
st.session_state.status_queue = queue.Queue() | |
if "audio_queue" not in st.session_state: | |
st.session_state.audio_queue = queue.Queue() | |
if "last_status" not in st.session_state: | |
st.session_state.last_status = {"drowsiness_level": "Awake", "lighting": "Good"} | |
# --- Load Configuration and Environment Variables --- | |
def load_app_config(): | |
"""Loads config from yaml and .env files.""" | |
load_dotenv() | |
# Navigate up to the root to find the config file | |
config_path = "./config.yaml" | |
with open(config_path, 'r') as f: | |
config = yaml.safe_load(f) | |
# Load secrets from environment | |
secrets = { | |
"gemini_api_key": os.getenv("GEMINI_API_KEY"), | |
"turn_username": os.getenv("TURN_USERNAME"), | |
"turn_credential": os.getenv("TURN_CREDENTIAL"), | |
"metered_domain": os.getenv("METERED_DOMAIN"), | |
"metered_api_key": os.getenv("METERED_KEY") | |
} | |
return config, secrets | |
config, secrets = load_app_config() | |
# --- Client-Side Audio Playback Function --- | |
def autoplay_audio(audio_bytes: bytes): | |
"""Injects HTML to autoplay audio in the user's browser.""" | |
b64 = base64.b64encode(audio_bytes).decode() | |
md = f""" | |
<audio controls autoplay="true" style="display:none;"> | |
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3"> | |
</audio> | |
""" | |
st.markdown(md, unsafe_allow_html=True) | |
# --- WebRTC Video Processor --- | |
class VideoProcessor(VideoProcessorBase): | |
# The __init__ method now accepts the queues as arguments | |
def __init__(self): | |
# It uses the queues passed in from session_state, not new ones. | |
self.status_queue = queue.Queue | |
self.audio_queue = queue.Queue | |
self._detector = get_detector(config) | |
self._alerter = get_alerter(config, secrets["gemini_api_key"]) | |
def recv(self, frame: av.VideoFrame) -> av.VideoFrame: | |
img = frame.to_ndarray(format="bgr24") | |
strategy = config.get('detection_strategy') | |
# The return signature of process_frame varies by strategy. | |
processed_frame, indicators, _ = self._detector.process_frame(img) | |
drowsiness_level = indicators.get("drowsiness_level", "Awake") | |
# This now correctly puts data into the shared session_state queue. | |
self.status_queue.put(indicators) | |
if drowsiness_level != "Awake": | |
audio_data = self._alerter.trigger_alert(level=drowsiness_level) | |
if audio_data: | |
# This now correctly puts audio data into the shared queue. | |
self.audio_queue.put(audio_data) | |
else: | |
self._alerter.reset_alert() | |
return av.VideoFrame.from_ndarray(processed_frame, format="bgr24") | |
# --- Page UI --- | |
st.title("๐น Live Drowsiness Detection") | |
st.info("Press 'START' to activate your camera and begin monitoring.") | |
RTC_CONFIGURATION = RTCConfiguration({ | |
"iceServers": [ | |
{ | |
"urls": ["stun:stun.relay.metered.ca:80"], | |
}, | |
{ | |
"urls": ["turn:global.relay.metered.ca:80"], | |
"username": "086986a440b20fe48229738b", | |
"credential": "mOC7fVSg00zjlsTD", | |
}, | |
{ | |
"urls": ["turn:global.relay.metered.ca:80?transport=tcp"], | |
"username": "086986a440b20fe48229738b", | |
"credential": "mOC7fVSg00zjlsTD", | |
}, | |
{ | |
"urls": ["turn:global.relay.metered.ca:443"], | |
"username": "086986a440b20fe48229738b", | |
"credential": "mOC7fVSg00zjlsTD", | |
}, | |
{ | |
"urls": ["turns:global.relay.metered.ca:443?transport=tcp"], | |
"username": "086986a440b20fe48229738b", | |
"credential": "mOC7fVSg00zjlsTD", | |
}, | |
] | |
}) | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
webrtc_ctx = webrtc_streamer( | |
key="drowsiness-detection", | |
# The factory now correctly passes the queues from session_state | |
video_processor_factory=VideoProcessor, | |
# rtc_configuration=RTC_CONFIGURATION, | |
# media_stream_constraints={"video": True, "audio": False}, | |
# async_processing=True, | |
) | |
with col2: | |
st.header("System Status") | |
audio_placeholder = st.empty() | |
if not webrtc_ctx.state.playing: | |
st.warning("System Inactive.") | |
else: | |
st.success("โ System Active & Monitoring") | |
st.subheader("Live Status:") | |
status_placeholder = st.empty() | |
if webrtc_ctx.state.playing: | |
try: | |
# This now reads from the correct queue that the processor is writing to. | |
status_result = st.session_state.status_queue.get(timeout=0.1) | |
st.session_state.last_status = status_result | |
except queue.Empty: | |
pass | |
with status_placeholder.container(): | |
last_status = st.session_state.last_status | |
drowsiness_level = last_status.get("drowsiness_level", "Awake") | |
lighting = last_status.get("lighting", "Good") | |
score = last_status.get("details", {}).get("Score", 0) | |
st.metric(label="Lighting Condition", value=lighting) | |
if lighting == "Low": | |
st.warning("Detection paused due to low light.") | |
if drowsiness_level == "Awake": | |
st.info(f"โ๏ธ Awake (Score: {score:.2f})") | |
elif drowsiness_level == "Slightly Drowsy": | |
st.warning(f"โ ๏ธ Slightly Drowsy (Score: {score:.2f})") | |
elif drowsiness_level == "Very Drowsy": | |
st.error(f"๐จ Very Drowsy! (Score: {score:.2f})") | |
try: | |
audio_data = st.session_state.audio_queue.get(timeout=0.1) | |
with audio_placeholder.container(): | |
autoplay_audio(audio_data) | |
except queue.Empty: | |
pass | |
time.sleep(0.1) | |
st.rerun() | |
else: | |
with status_placeholder.container(): | |
st.info("โ๏ธ Driver is Awake") | |