drive-paddy / src /alerting /alert_system.py
Testys's picture
Update src/alerting/alert_system.py
44f4448 verified
# drive_paddy/alerting/alert_system.py
import time, os, google.generativeai as genai
from gtts import gTTS
class BaseAlerter:
def __init__(self, config):
self.config = config.get("alerting", {})
self.cooldown = self.config.get("alert_cooldown_seconds", 15)
self.last_alert_time = 0
self.alert_on = False
def trigger_alert(self, level="Very Drowsy"):
raise NotImplementedError
def reset_alert(self):
if self.alert_on:
print("Resetting alert state.")
self.alert_on = False # ← fixed typo
# ──────────────────────────────────────────────────────────
# STATIC FILE ALERTER
# ──────────────────────────────────────────────────────────
class FileAlertSystem(BaseAlerter):
def __init__(self, config):
super().__init__(config)
self.audio_bytes = None
try:
path = config["alerting"]["alert_sound_path"]
with open(path, "rb") as f:
self.audio_bytes = f.read()
print(f"[AlertSystem] loaded sound: {path}")
except Exception as e:
print(f"[AlertSystem] could not load sound: {e}")
def trigger_alert(self, level="Very Drowsy"):
now = time.time()
if (
self.audio_bytes
and (now - self.last_alert_time) > self.cooldown
and not self.alert_on
):
self.last_alert_time = now
self.alert_on = True
return self.audio_bytes
return None
# ──────────────────────────────────────────────────────────
# GEMINI TTS TEXT ALERTER
# ──────────────────────────────────────────────────────────
class GeminiAlertSystem(BaseAlerter):
def __init__(self, config, api_key):
super().__init__(config)
try:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel("gemini-pro")
print("Gemini alert system ready.")
except Exception as e:
print(f"Gemini init failed: {e}")
self.model = None
def _gen_text(self, level):
if not self.model:
return "Driver alert! Please check your status."
prompt = (
"Generate a short, gentle reminder (under 10 words) for a slightly fatigued driver."
if level == "Slightly Drowsy"
else "Generate a short, urgent alert (under 8 words) for a very drowsy driver."
)
try:
return self.model.generate_content(prompt).text.strip().replace("*", "")
except Exception as e:
print(f"Gemini error: {e}")
return "Stay alert! Pull over."
def trigger_alert(self, level="Very Drowsy"):
now = time.time()
if (now - self.last_alert_time) > self.cooldown and not self.alert_on:
self.last_alert_time = now
self.alert_on = True
return self._gen_text(level)
return None
# ──────────────────────────────────────────────────────────
def get_alerter(config, api_key=None):
gem_cfg = config.get("alerting", {}).get("gemini_alerts", {})
if gem_cfg.get("enabled") and api_key:
return GeminiAlertSystem(config, api_key)
return FileAlertSystem(config)