Spaces:
Runtime error
Runtime error
# drive_paddy/pages/1_Live_Detection.py | |
import streamlit as st | |
from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase | |
import yaml | |
import av | |
import os | |
from dotenv import load_dotenv | |
import base64 | |
import queue | |
import time | |
from src.detection.factory import get_detector | |
from src.alerting.alert_system import get_alerter | |
if "status_queue" not in st.session_state: | |
st.session_state.status_queue = queue.Queue() | |
if "audio_queue" not in st.session_state: | |
st.session_state.audio_queue = queue.Queue() | |
if "last_status" not in st.session_state: | |
st.session_state.last_status = {"status": "Awake"} | |
# --- Load Configuration and Environment Variables --- | |
def load_app_config(): | |
"""Loads config from yaml and .env files.""" | |
load_dotenv() | |
config_path = "/config.yaml" if os.path.exists("/config.yaml") else "config.yaml" | |
with open(config_path, 'r') as f: | |
config = yaml.safe_load(f) | |
# Load secrets from environment | |
secrets = { | |
"gemini_api_key": os.getenv("GEMINI_API_KEY"), | |
"turn_username": os.getenv("TURN_USERNAME"), | |
"turn_credential": os.getenv("TURN_CREDENTIAL") | |
} | |
return config, secrets | |
config, secrets = load_app_config() | |
# --- Client-Side Audio Playback Function --- | |
def autoplay_audio(audio_bytes: bytes): | |
"""Injects HTML to autoplay audio in the user's browser.""" | |
b64 = base64.b64encode(audio_bytes).decode() | |
md = f""" | |
<audio controls autoplay="true" style="display:none;"> | |
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3"> | |
</audio> | |
""" | |
st.markdown(md, unsafe_allow_html=True) | |
# --- WebRTC Video Processor --- | |
class VideoProcessor(VideoProcessorBase): | |
def __init__(self): | |
self._detector = get_detector(config) | |
self._alerter = get_alerter(config, secrets["gemini_api_key"]) | |
# Thread-safe queues for communication | |
self.status_queue = queue.Queue() | |
self.audio_queue = queue.Queue() | |
def recv(self, frame: av.VideoFrame) -> av.VideoFrame: | |
img = frame.to_ndarray(format="bgr24") | |
strategy = config.get('detection_strategy') | |
if strategy == 'hybrid': | |
processed_frame, alert_triggered, active_alerts = self._detector.process_frame(img) | |
self.status_queue.put(active_alerts if alert_triggered else {"status": "Awake"}) | |
else: | |
processed_frame, indicators = self._detector.process_frame(img) | |
alert_triggered = any(indicators.values()) | |
self.status_queue.put(indicators if alert_triggered else {"status": "Awake"}) | |
if alert_triggered: | |
audio_data = self._alerter.trigger_alert() | |
if audio_data: | |
self.audio_queue.put(audio_data) | |
else: | |
self._alerter.reset_alert() | |
return av.VideoFrame.from_ndarray(processed_frame, format="bgr24") | |
# --- Page UI --- | |
st.title("πΉ Live Drowsiness Detection") | |
st.info("Press 'START' to activate your camera and begin monitoring.") | |
# --- Dynamically Build RTC Configuration --- | |
ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}] | |
if secrets["turn_username"] and secrets["turn_credential"]: | |
print("TURN credentials found, adding TURN servers to config.") | |
turn_servers = [ | |
{'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}, | |
{'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]} | |
] | |
ice_servers.extend(turn_servers) | |
RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers}) | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
webrtc_ctx = webrtc_streamer( | |
key="drowsiness-detection", | |
video_processor_factory=VideoProcessor, | |
rtc_configuration=RTC_CONFIGURATION, # Use the new robust configuration | |
media_stream_constraints={"video": True, "audio": False}, | |
async_processing=True, | |
) | |
with col2: | |
st.header("System Status") | |
if not webrtc_ctx.state.playing: | |
st.warning("System Inactive.") | |
else: | |
st.success("β System Active & Monitoring") | |
st.subheader("Live Status:") | |
status_placeholder = st.empty() | |
audio_placeholder = st.empty() | |
if webrtc_ctx.state.playing: | |
if "last_status" not in st.session_state: | |
st.session_state.last_status = {"status": "Awake"} | |
try: | |
status_result = st.session_state.status_queue.get(timeout=0.1) | |
st.session_state.last_status = status_result | |
except queue.Empty: | |
pass | |
with status_placeholder.container(): | |
last_status = st.session_state.last_status | |
if last_status.get("Low Light"): | |
st.warning("β οΈ Low Light Detected! Accuracy may be affected.") | |
elif last_status.get("status") == "Awake": | |
st.info("βοΈ Driver is Awake") | |
else: | |
st.error("π¨ DROWSINESS DETECTED!") | |
for key, value in last_status.items(): | |
if key != "Low Light" and key != "status": | |
st.warning(f"-> {key}: {value:.2f}" if isinstance(value, float) else f"-> {key}") | |
try: | |
audio_data = st.session_state.audio_queue.get(timeout=0.1) | |
with audio_placeholder.container(): | |
autoplay_audio(audio_data) | |
except queue.Empty: | |
pass | |
time.sleep(0.1) | |
st.rerun() | |
else: | |
with status_placeholder.container(): | |
st.info("βοΈ Driver is Awake") | |