driver-paddy / ai_alert_generator.py
Testimony Adekoya
WIP: Quick demo for project
e65b3b4
import google.generativeai as genai
import numpy as np
from scipy.io import wavfile
import tempfile
import os
from facial_detection import OpenCVFaceDetector, MetricsCalculator, DrowsinessAnalyzer, AlertManager, VisualizationRenderer, StatusLogger
class AIAlertGenerator:
"""Generate AI-powered voice alerts using Gemini"""
def __init__(self, api_key=None):
self.model = None
if api_key:
try:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-1.5-flash')
print("βœ… Gemini AI initialized for voice alerts")
except Exception as e:
print(f"⚠️ Failed to initialize Gemini: {e}")
def generate_alert_text(self, alert_type, severity="medium"):
"""Generate contextual alert text using Gemini"""
if not self.model:
return self._get_default_alert_text(alert_type, severity)
try:
prompts = {
"EYES_CLOSED": f"Generate a brief, urgent wake-up message (max 12 words) for a drowsy driver whose eyes are closing. Severity: {severity}. Sound caring but firm.",
"YAWNING": f"Generate a brief, gentle alert (max 10 words) for a driver who is yawning frequently. Severity: {severity}. Sound encouraging.",
"HEAD_NOD": f"Generate a brief, firm alert (max 10 words) for a driver whose head is nodding. Severity: {severity}. Sound urgent but supportive.",
"COMBINED": f"Generate a brief, critical alert (max 15 words) for a driver showing multiple drowsiness signs. Severity: {severity}. Sound very urgent but caring."
}
prompt_key = "COMBINED" if isinstance(alert_type, list) and len(alert_type) > 1 else alert_type[0] if isinstance(alert_type, list) else alert_type
prompt = prompts.get(prompt_key, prompts["EYES_CLOSED"])
response = self.model.generate_content(prompt)
alert_text = response.text.strip().replace('"', '').replace("'", "")
return alert_text[:100]
except Exception as e:
print(f"Error generating AI alert: {e}")
return self._get_default_alert_text(alert_type, severity)
def _get_default_alert_text(self, alert_type, severity):
"""Fallback alert messages"""
default_alerts = {
"EYES_CLOSED": {
"critical": "WAKE UP NOW! Pull over immediately!",
"high": "Eyes closing! Stay alert and pull over soon!",
"medium": "Please keep your eyes open while driving!"
},
"YAWNING": {
"critical": "Excessive yawning detected! Take a break!",
"high": "You seem tired. Consider resting soon.",
"medium": "Frequent yawning noticed. Stay alert!"
},
"HEAD_NOD": {
"critical": "Head nodding detected! Stop driving now!",
"high": "Your head is nodding. Pull over safely!",
"medium": "Head movement detected. Stay focused!"
}
}
alert_key = alert_type[0] if isinstance(alert_type, list) else alert_type
return default_alerts.get(alert_key, {}).get(severity, "Stay alert while driving!")
def create_audio_alert(self, text, sample_rate=22050):
"""Create audio alert (generates beep pattern)"""
try:
duration = 2.0
freq = 800
frames = int(duration * sample_rate)
# Create attention-grabbing beep pattern
t = np.linspace(0, duration, frames)
beep1 = np.sin(2 * np.pi * freq * t) * np.exp(-t * 3)
beep2 = np.sin(2 * np.pi * (freq * 1.5) * t) * np.exp(-t * 3)
# Combine beeps with pause
silence = np.zeros(int(0.1 * sample_rate))
audio = np.concatenate([beep1, silence, beep2, silence, beep1])
# Normalize and convert to int16
audio = (audio * 32767).astype(np.int16)
# Save to temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
wavfile.write(temp_file.name, sample_rate, audio)
return temp_file.name, text
except Exception as e:
print(f"Error creating audio alert: {e}")
return None, text
class DrowsinessDetectionSystem:
"""Main system coordinator"""
def __init__(self):
self.face_detector = OpenCVFaceDetector()
self.metrics_calculator = MetricsCalculator()
self.drowsiness_analyzer = DrowsinessAnalyzer()
self.alert_manager = AlertManager()
self.visualization_renderer = VisualizationRenderer()
self.logger = StatusLogger()
print("βœ… Drowsiness Detection System initialized with OpenCV")
def process_frame(self, frame):
"""Process a single frame and return results"""
try:
# Detect face and landmarks
face_rects, landmarks_list = self.face_detector.detect_landmarks(frame)
if not face_rects or not landmarks_list:
self.logger.log("No face detected")
return frame, ["πŸ‘€ No face detected"], False, {}
# Process first detected face
face_rect = face_rects[0]
landmarks = landmarks_list[0]
# Calculate metrics
ear_left = ear_right = 0.25 # Default values
if 'left_eye_corners' in landmarks:
ear_left = self.metrics_calculator.calculate_ear_from_points(landmarks['left_eye_corners'])
if 'right_eye_corners' in landmarks:
ear_right = self.metrics_calculator.calculate_ear_from_points(landmarks['right_eye_corners'])
ear = (ear_left + ear_right) / 2.0
mar = 0.3 # Default value
if 'mouth_corners' in landmarks:
mar = self.metrics_calculator.calculate_mar_from_points(landmarks['mouth_corners'])
# Head pose estimation
frame_center = (frame.shape[1] // 2, frame.shape[0] // 2)
head_angles = self.metrics_calculator.estimate_head_pose_simple(
landmarks.get('nose_tip'),
landmarks.get('chin'),
frame_center
)
# Analyze drowsiness
indicators = self.drowsiness_analyzer.analyze_drowsiness(ear, mar, head_angles)
severity = self.drowsiness_analyzer.get_severity_level(indicators)
# Check for alerts
should_alert = self.alert_manager.should_trigger_alert(indicators)
# Render visualization
self.visualization_renderer.draw_landmarks_and_contours(frame, landmarks, face_rect)
self.visualization_renderer.draw_metrics_overlay(frame, ear, mar, head_angles[0], indicators)
# Generate status text
status_text = self._generate_status_text(ear, mar, head_angles[0], indicators)
# Log events
if indicators:
self.logger.log(f"Drowsiness detected: {', '.join(indicators)} (Severity: {severity})")
# Prepare metrics
metrics = {
'ear': ear,
'mar': mar,
'head_angle': head_angles[0],
'indicators': indicators,
'severity': severity
}
return frame, status_text, should_alert, metrics
except Exception as e:
error_msg = f"Error processing frame: {str(e)}"
self.logger.log(error_msg)
return frame, [error_msg], False, {}
def _generate_status_text(self, ear, mar, head_angle, indicators):
"""Generate human-readable status text"""
status = []
# EAR status
if ear < self.drowsiness_analyzer.EAR_THRESHOLD:
status.append(f"πŸ‘οΈ Eyes closing! EAR: {ear:.3f}")
else:
status.append(f"πŸ‘οΈ Eyes open - EAR: {ear:.3f}")
# MAR status
if mar > self.drowsiness_analyzer.YAWN_THRESHOLD:
status.append(f"πŸ₯± Yawning detected! MAR: {mar:.3f}")
else:
status.append(f"πŸ‘„ Normal mouth - MAR: {mar:.3f}")
# Head pose status
if abs(head_angle) > self.drowsiness_analyzer.NOD_THRESHOLD:
status.append(f"πŸ“‰ Head nodding! Angle: {head_angle:.1f}Β°")
else:
status.append(f"πŸ“ Head pose normal - Pitch: {head_angle:.1f}Β°")
# Overall status
if indicators:
status.append(f"⚠️ ALERT: {', '.join(indicators)}")
else:
status.append("βœ… Driver appears alert")
return status
def get_logs(self):
"""Get recent system logs"""
return "\n".join(self.logger.get_recent_logs())