drive-paddy / src /alerting /alert_system.py
Testys's picture
Update src/alerting/alert_system.py
fb982e6 verified
raw
history blame
3.98 kB
# drive_paddy/alerting/alert_system.py
import time, os, io, google.generativeai as genai
from gtts import gTTS
class BaseAlerter:
"""Base class for alert systems, handling state and cooldowns."""
def __init__(self, config):
self.config = config.get('alerting', {})
self.cooldown = self.config.get('alert_cooldown_seconds', 15) # Increased cooldown for dynamic alerts
self.last_alert_time = 0
self.alert_on = False
def trigger_alert(self, level="Very Drowsy"):
"""
Checks if an alert can be triggered based on cooldown.
If so, marks the alert as active and returns the appropriate text.
Returns None if on cooldown.
"""
raise NotImplementedError
def reset_alert(self):
"""
Resets the alert state. This is crucial to call after an alert has finished playing
to allow the next one to be triggered.
"""
if self.alert_on:
print("Resetting Alert State.")
self.alert_on = Falsese
class FileAlertSystem(BaseAlerter):
def __init__(self, config):
super().__init__(config)
self.audio_bytes = None
try:
if os.path.exists(config['alerting']['alert_sound_path']):
with open(config['alerting']['alert_sound_path'], "rb") as f: self.audio_bytes = f.read()
except Exception as e: print(f"Warning: Could not load audio file. Error: {e}.")
def trigger_alert(self, level="Very Drowsy"):
current_time = time.time()
if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.audio_bytes:
self.last_alert_time = current_time
self.alert_on = True
# For this simple system, we just return a flag to play the audio
# The main app will handle the audio data itself.
return "play_static_alert"
return None
class GeminiAlertSystem(BaseAlerter):
"""Generates dynamic alert TEXT using Gemini."""
def __init__(self, config, api_key):
super().__init__(config)
self.model = None
try:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-pro')
print("Successfully initialized GeminiAlertSystem.")
except Exception as e:
print(f"Error initializing Gemini: {e}.")
def get_alert_text(self, level):
"""Generates just the text for an alert."""
if not self.model: return "Warning, driver is drowsy."
if level == "Slightly Drowsy":
prompt = "Generate a short, gentle, and encouraging reminder (under 10 words) for a driver showing minor signs of fatigue. Example: 'Feeling a bit tired? Maybe take a quick break.'"
else: # Very Drowsy
prompt = "Generate a short, firm, and urgent alert (under 8 words) for a driver who is very drowsy. Be direct and clear. Example: 'Danger! Pull over now!' or 'Wake up! Stay focused!'"
try:
response = self.model.generate_content(prompt)
return response.text.strip().replace('*', '')
except Exception as e:
print(f"Error generating text with Gemini: {e}")
return "Driver alert! Please check your status."
def trigger_alert(self, level="Very Drowsy"):
"""If not on cooldown, generates alert text and returns it."""
current_time = time.time()
if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on:
self.last_alert_time = current_time
self.alert_on = True
print(f"Alert state is ON for level: {level}. Generating text.")
return self.get_alert_text(level)
return None
def get_alerter(config, api_key=None):
if config.get('alerting', {}).get('gemini_alerts', {}).get('enabled', False) and api_key:
return GeminiAlertSystem(config, api_key)
return FileAlertSystem(config)