driver-paddy / pages /1_Live_Detection.py
Testimony Adekoya
Push the code fucker
d305df8
raw
history blame
5.58 kB
# drive_paddy/pages/1_Live_Detection.py
import streamlit as st
from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase
import yaml
import av
import os
from dotenv import load_dotenv
import base64
import queue
import time
from src.detection.factory import get_detector
from src.alerting.alert_system import get_alerter
if "status_queue" not in st.session_state:
st.session_state.status_queue = queue.Queue()
if "audio_queue" not in st.session_state:
st.session_state.audio_queue = queue.Queue()
if "last_status" not in st.session_state:
st.session_state.last_status = {"status": "Awake"}
# --- Load Configuration and Environment Variables ---
@st.cache_resource
def load_app_config():
"""Loads config from yaml and .env files."""
load_dotenv()
config_path = "/config.yaml" if os.path.exists("/config.yaml") else "config.yaml"
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Load secrets from environment
secrets = {
"gemini_api_key": os.getenv("GEMINI_API_KEY"),
"turn_username": os.getenv("TURN_USERNAME"),
"turn_credential": os.getenv("TURN_CREDENTIAL")
}
return config, secrets
config, secrets = load_app_config()
# --- Client-Side Audio Playback Function ---
def autoplay_audio(audio_bytes: bytes):
"""Injects HTML to autoplay audio in the user's browser."""
b64 = base64.b64encode(audio_bytes).decode()
md = f"""
<audio controls autoplay="true" style="display:none;">
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
</audio>
"""
st.markdown(md, unsafe_allow_html=True)
# --- WebRTC Video Processor ---
class VideoProcessor(VideoProcessorBase):
def __init__(self):
self._detector = get_detector(config)
self._alerter = get_alerter(config, secrets["gemini_api_key"])
# Thread-safe queues for communication
self.status_queue = queue.Queue()
self.audio_queue = queue.Queue()
def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
img = frame.to_ndarray(format="bgr24")
strategy = config.get('detection_strategy')
if strategy == 'hybrid':
processed_frame, alert_triggered, active_alerts = self._detector.process_frame(img)
self.status_queue.put(active_alerts if alert_triggered else {"status": "Awake"})
else:
processed_frame, indicators = self._detector.process_frame(img)
alert_triggered = any(indicators.values())
self.status_queue.put(indicators if alert_triggered else {"status": "Awake"})
if alert_triggered:
audio_data = self._alerter.trigger_alert()
if audio_data:
self.audio_queue.put(audio_data)
else:
self._alerter.reset_alert()
return av.VideoFrame.from_ndarray(processed_frame, format="bgr24")
# --- Page UI ---
st.title("πŸ“Ή Live Drowsiness Detection")
st.info("Press 'START' to activate your camera and begin monitoring.")
# --- Dynamically Build RTC Configuration ---
ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}]
if secrets["turn_username"] and secrets["turn_credential"]:
print("TURN credentials found, adding TURN servers to config.")
turn_servers = [
{'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]},
{'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}
]
ice_servers.extend(turn_servers)
RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers})
col1, col2 = st.columns([3, 1])
with col1:
webrtc_ctx = webrtc_streamer(
key="drowsiness-detection",
video_processor_factory=VideoProcessor,
rtc_configuration=RTC_CONFIGURATION, # Use the new robust configuration
media_stream_constraints={"video": True, "audio": False},
async_processing=True,
)
with col2:
st.header("System Status")
if not webrtc_ctx.state.playing:
st.warning("System Inactive.")
else:
st.success("βœ… System Active & Monitoring")
st.subheader("Live Status:")
status_placeholder = st.empty()
audio_placeholder = st.empty()
if webrtc_ctx.state.playing:
if "last_status" not in st.session_state:
st.session_state.last_status = {"status": "Awake"}
try:
status_result = st.session_state.status_queue.get(timeout=0.1)
st.session_state.last_status = status_result
except queue.Empty:
pass
with status_placeholder.container():
last_status = st.session_state.last_status
if last_status.get("Low Light"):
st.warning("⚠️ Low Light Detected! Accuracy may be affected.")
elif last_status.get("status") == "Awake":
st.info("βœ”οΈ Driver is Awake")
else:
st.error("🚨 DROWSINESS DETECTED!")
for key, value in last_status.items():
if key != "Low Light" and key != "status":
st.warning(f"-> {key}: {value:.2f}" if isinstance(value, float) else f"-> {key}")
try:
audio_data = st.session_state.audio_queue.get(timeout=0.1)
with audio_placeholder.container():
autoplay_audio(audio_data)
except queue.Empty:
pass
time.sleep(0.1)
st.rerun()
else:
with status_placeholder.container():
st.info("βœ”οΈ Driver is Awake")