Spaces:
Sleeping
Sleeping
# drive_paddy/alerting/alert_system.py | |
import time, os, google.generativeai as genai | |
from gtts import gTTS | |
class BaseAlerter: | |
def __init__(self, config): | |
self.config = config.get("alerting", {}) | |
self.cooldown = self.config.get("alert_cooldown_seconds", 15) | |
self.last_alert_time = 0 | |
self.alert_on = False | |
def trigger_alert(self, level="Very Drowsy"): | |
raise NotImplementedError | |
def reset_alert(self): | |
if self.alert_on: | |
print("Resetting alert state.") | |
self.alert_on = False # β fixed typo | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# STATIC FILE ALERTER | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
class FileAlertSystem(BaseAlerter): | |
def __init__(self, config): | |
super().__init__(config) | |
self.audio_bytes = None | |
try: | |
path = config["alerting"]["alert_sound_path"] | |
with open(path, "rb") as f: | |
self.audio_bytes = f.read() | |
print(f"[AlertSystem] loaded sound: {path}") | |
except Exception as e: | |
print(f"[AlertSystem] could not load sound: {e}") | |
def trigger_alert(self, level="Very Drowsy"): | |
now = time.time() | |
if ( | |
self.audio_bytes | |
and (now - self.last_alert_time) > self.cooldown | |
and not self.alert_on | |
): | |
self.last_alert_time = now | |
self.alert_on = True | |
return self.audio_bytes | |
return None | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# GEMINI TTS TEXT ALERTER | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
class GeminiAlertSystem(BaseAlerter): | |
def __init__(self, config, api_key): | |
super().__init__(config) | |
try: | |
genai.configure(api_key=api_key) | |
self.model = genai.GenerativeModel("gemini-pro") | |
print("Gemini alert system ready.") | |
except Exception as e: | |
print(f"Gemini init failed: {e}") | |
self.model = None | |
def _gen_text(self, level): | |
if not self.model: | |
return "Driver alert! Please check your status." | |
prompt = ( | |
"Generate a short, gentle reminder (under 10 words) for a slightly fatigued driver." | |
if level == "Slightly Drowsy" | |
else "Generate a short, urgent alert (under 8 words) for a very drowsy driver." | |
) | |
try: | |
return self.model.generate_content(prompt).text.strip().replace("*", "") | |
except Exception as e: | |
print(f"Gemini error: {e}") | |
return "Stay alert! Pull over." | |
def trigger_alert(self, level="Very Drowsy"): | |
now = time.time() | |
if (now - self.last_alert_time) > self.cooldown and not self.alert_on: | |
self.last_alert_time = now | |
self.alert_on = True | |
return self._gen_text(level) | |
return None | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def get_alerter(config, api_key=None): | |
gem_cfg = config.get("alerting", {}).get("gemini_alerts", {}) | |
if gem_cfg.get("enabled") and api_key: | |
return GeminiAlertSystem(config, api_key) | |
return FileAlertSystem(config) | |