drive-paddy / pages /1_Live_Detection.py
Testys's picture
Update pages/1_Live_Detection.py
f39959a verified
raw
history blame
6.42 kB
# drive_paddy/pages/1_Live_Detection.py
import streamlit as st
from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase
import yaml
import av
import os
from dotenv import load_dotenv
import base64
import requests
import queue
import time
from typing import List, Dict, Union
# Correctly import from the drive_paddy package structure
from src.detection.factory import get_detector
from src.alerting.alert_system import get_alerter
# --- Initialize Session State at the TOP of the script ---
# This is the single source of truth for our queues and must run on every page load.
if "status_queue" not in st.session_state:
st.session_state.status_queue = queue.Queue()
if "audio_queue" not in st.session_state:
st.session_state.audio_queue = queue.Queue()
if "last_status" not in st.session_state:
st.session_state.last_status = {"drowsiness_level": "Awake", "lighting": "Good"}
# --- Load Configuration and Environment Variables ---
@st.cache_resource
def load_app_config():
"""Loads config from yaml and .env files."""
load_dotenv()
# Navigate up to the root to find the config file
config_path = "./config.yaml"
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Load secrets from environment
secrets = {
"gemini_api_key": os.getenv("GEMINI_API_KEY"),
"turn_username": os.getenv("TURN_USERNAME"),
"turn_credential": os.getenv("TURN_CREDENTIAL"),
"metered_domain": os.getenv("METERED_DOMAIN"),
"metered_api_key": os.getenv("METERED_KEY")
}
return config, secrets
config, secrets = load_app_config()
# --- Client-Side Audio Playback Function ---
def autoplay_audio(audio_bytes: bytes):
"""Injects HTML to autoplay audio in the user's browser."""
b64 = base64.b64encode(audio_bytes).decode()
md = f"""
<audio controls autoplay="true" style="display:none;">
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
</audio>
"""
st.markdown(md, unsafe_allow_html=True)
# --- WebRTC Video Processor ---
class VideoProcessor(VideoProcessorBase):
# The __init__ method now accepts the queues as arguments
def __init__(self):
# It uses the queues passed in from session_state, not new ones.
self.status_queue = queue.Queue
self.audio_queue = queue.Queue
self._detector = get_detector(config)
self._alerter = get_alerter(config, secrets["gemini_api_key"])
def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
img = frame.to_ndarray(format="bgr24")
strategy = config.get('detection_strategy')
# The return signature of process_frame varies by strategy.
processed_frame, indicators, _ = self._detector.process_frame(img)
drowsiness_level = indicators.get("drowsiness_level", "Awake")
# This now correctly puts data into the shared session_state queue.
self.status_queue.put(indicators)
if drowsiness_level != "Awake":
audio_data = self._alerter.trigger_alert(level=drowsiness_level)
if audio_data:
# This now correctly puts audio data into the shared queue.
self.audio_queue.put(audio_data)
else:
self._alerter.reset_alert()
return av.VideoFrame.from_ndarray(processed_frame, format="bgr24")
# --- Page UI ---
st.title("๐Ÿ“น Live Drowsiness Detection")
st.info("Press 'START' to activate your camera and begin monitoring.")
RTC_CONFIGURATION = RTCConfiguration({
"iceServers": [
{
"urls": ["stun:stun.relay.metered.ca:80"],
},
{
"urls": ["turn:global.relay.metered.ca:80"],
"username": "086986a440b20fe48229738b",
"credential": "mOC7fVSg00zjlsTD",
},
{
"urls": ["turn:global.relay.metered.ca:80?transport=tcp"],
"username": "086986a440b20fe48229738b",
"credential": "mOC7fVSg00zjlsTD",
},
{
"urls": ["turn:global.relay.metered.ca:443"],
"username": "086986a440b20fe48229738b",
"credential": "mOC7fVSg00zjlsTD",
},
{
"urls": ["turns:global.relay.metered.ca:443?transport=tcp"],
"username": "086986a440b20fe48229738b",
"credential": "mOC7fVSg00zjlsTD",
},
]
})
col1, col2 = st.columns([3, 1])
with col1:
webrtc_ctx = webrtc_streamer(
key="drowsiness-detection",
# The factory now correctly passes the queues from session_state
video_processor_factory=VideoProcessor,
# rtc_configuration=RTC_CONFIGURATION,
# media_stream_constraints={"video": True, "audio": False},
# async_processing=True,
)
with col2:
st.header("System Status")
audio_placeholder = st.empty()
if not webrtc_ctx.state.playing:
st.warning("System Inactive.")
else:
st.success("โœ… System Active & Monitoring")
st.subheader("Live Status:")
status_placeholder = st.empty()
if webrtc_ctx.state.playing:
try:
# This now reads from the correct queue that the processor is writing to.
status_result = st.session_state.status_queue.get(timeout=0.1)
st.session_state.last_status = status_result
except queue.Empty:
pass
with status_placeholder.container():
last_status = st.session_state.last_status
drowsiness_level = last_status.get("drowsiness_level", "Awake")
lighting = last_status.get("lighting", "Good")
score = last_status.get("details", {}).get("Score", 0)
st.metric(label="Lighting Condition", value=lighting)
if lighting == "Low":
st.warning("Detection paused due to low light.")
if drowsiness_level == "Awake":
st.info(f"โœ”๏ธ Awake (Score: {score:.2f})")
elif drowsiness_level == "Slightly Drowsy":
st.warning(f"โš ๏ธ Slightly Drowsy (Score: {score:.2f})")
elif drowsiness_level == "Very Drowsy":
st.error(f"๐Ÿšจ Very Drowsy! (Score: {score:.2f})")
try:
audio_data = st.session_state.audio_queue.get(timeout=0.1)
with audio_placeholder.container():
autoplay_audio(audio_data)
except queue.Empty:
pass
time.sleep(0.1)
st.rerun()
else:
with status_placeholder.container():
st.info("โœ”๏ธ Driver is Awake")