# drive_paddy/alerting/alert_system.py import time, os, io, google.generativeai as genai from gtts import gTTS class BaseAlerter: """Base class for alert systems, handling state and cooldowns.""" def __init__(self, config): self.config = config.get('alerting', {}) self.cooldown = self.config.get('alert_cooldown_seconds', 15) # Increased cooldown for dynamic alerts self.last_alert_time = 0 self.alert_on = False def trigger_alert(self, level="Very Drowsy"): """ Checks if an alert can be triggered based on cooldown. If so, marks the alert as active and returns the appropriate text. Returns None if on cooldown. """ raise NotImplementedError def reset_alert(self): """ Resets the alert state. This is crucial to call after an alert has finished playing to allow the next one to be triggered. """ if self.alert_on: print("Resetting Alert State.") self.alert_on = Falsese class FileAlertSystem(BaseAlerter): def __init__(self, config): super().__init__(config) self.audio_bytes = None try: if os.path.exists(config['alerting']['alert_sound_path']): with open(config['alerting']['alert_sound_path'], "rb") as f: self.audio_bytes = f.read() except Exception as e: print(f"Warning: Could not load audio file. Error: {e}.") def trigger_alert(self, level="Very Drowsy"): current_time = time.time() if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.audio_bytes: self.last_alert_time = current_time self.alert_on = True # For this simple system, we just return a flag to play the audio # The main app will handle the audio data itself. return "play_static_alert" return None class GeminiAlertSystem(BaseAlerter): """Generates dynamic alert TEXT using Gemini.""" def __init__(self, config, api_key): super().__init__(config) self.model = None try: genai.configure(api_key=api_key) self.model = genai.GenerativeModel('gemini-pro') print("Successfully initialized GeminiAlertSystem.") except Exception as e: print(f"Error initializing Gemini: {e}.") def get_alert_text(self, level): """Generates just the text for an alert.""" if not self.model: return "Warning, driver is drowsy." if level == "Slightly Drowsy": prompt = "Generate a short, gentle, and encouraging reminder (under 10 words) for a driver showing minor signs of fatigue. Example: 'Feeling a bit tired? Maybe take a quick break.'" else: # Very Drowsy prompt = "Generate a short, firm, and urgent alert (under 8 words) for a driver who is very drowsy. Be direct and clear. Example: 'Danger! Pull over now!' or 'Wake up! Stay focused!'" try: response = self.model.generate_content(prompt) return response.text.strip().replace('*', '') except Exception as e: print(f"Error generating text with Gemini: {e}") return "Driver alert! Please check your status." def trigger_alert(self, level="Very Drowsy"): """If not on cooldown, generates alert text and returns it.""" current_time = time.time() if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on: self.last_alert_time = current_time self.alert_on = True print(f"Alert state is ON for level: {level}. Generating text.") return self.get_alert_text(level) return None def get_alerter(config, api_key=None): if config.get('alerting', {}).get('gemini_alerts', {}).get('enabled', False) and api_key: return GeminiAlertSystem(config, api_key) return FileAlertSystem(config)