# drive_paddy/alerting/alert_system.py import time, os, google.generativeai as genai from gtts import gTTS class BaseAlerter: def __init__(self, config): self.config = config.get("alerting", {}) self.cooldown = self.config.get("alert_cooldown_seconds", 15) self.last_alert_time = 0 self.alert_on = False def trigger_alert(self, level="Very Drowsy"): raise NotImplementedError def reset_alert(self): if self.alert_on: print("Resetting alert state.") self.alert_on = False # ← fixed typo # ────────────────────────────────────────────────────────── # STATIC FILE ALERTER # ────────────────────────────────────────────────────────── class FileAlertSystem(BaseAlerter): def __init__(self, config): super().__init__(config) self.audio_bytes = None try: path = config["alerting"]["alert_sound_path"] with open(path, "rb") as f: self.audio_bytes = f.read() print(f"[AlertSystem] loaded sound: {path}") except Exception as e: print(f"[AlertSystem] could not load sound: {e}") def trigger_alert(self, level="Very Drowsy"): now = time.time() if ( self.audio_bytes and (now - self.last_alert_time) > self.cooldown and not self.alert_on ): self.last_alert_time = now self.alert_on = True return self.audio_bytes return None # ────────────────────────────────────────────────────────── # GEMINI TTS TEXT ALERTER # ────────────────────────────────────────────────────────── class GeminiAlertSystem(BaseAlerter): def __init__(self, config, api_key): super().__init__(config) try: genai.configure(api_key=api_key) self.model = genai.GenerativeModel("gemini-pro") print("Gemini alert system ready.") except Exception as e: print(f"Gemini init failed: {e}") self.model = None def _gen_text(self, level): if not self.model: return "Driver alert! Please check your status." prompt = ( "Generate a short, gentle reminder (under 10 words) for a slightly fatigued driver." if level == "Slightly Drowsy" else "Generate a short, urgent alert (under 8 words) for a very drowsy driver." ) try: return self.model.generate_content(prompt).text.strip().replace("*", "") except Exception as e: print(f"Gemini error: {e}") return "Stay alert! Pull over." def trigger_alert(self, level="Very Drowsy"): now = time.time() if (now - self.last_alert_time) > self.cooldown and not self.alert_on: self.last_alert_time = now self.alert_on = True return self._gen_text(level) return None # ────────────────────────────────────────────────────────── def get_alerter(config, api_key=None): gem_cfg = config.get("alerting", {}).get("gemini_alerts", {}) if gem_cfg.get("enabled") and api_key: return GeminiAlertSystem(config, api_key) return FileAlertSystem(config)