Spaces:
Runtime error
Runtime error
File size: 4,100 Bytes
19f420a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# drive_paddy/alerting/alert_system.py
import time
import os
import io
from gtts import gTTS
import google.generativeai as genai
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file
api_key = os.getenv("GEMINI_API_KEY")
class BaseAlerter:
"""Base class for alerter systems."""
def __init__(self, config):
self.config = config['alerting']
self.cooldown = self.config['alert_cooldown_seconds']
self.last_alert_time = 0
self.alert_on = False
def trigger_alert(self):
raise NotImplementedError
def reset_alert(self):
if self.alert_on:
print("Resetting Alert.")
self.alert_on = False
class FileAlertSystem(BaseAlerter):
"""Loads a static audio file from disk into memory."""
def __init__(self, config):
super().__init__(config)
self.sound_path = self.config['alert_sound_path']
self.audio_bytes = None
try:
if os.path.exists(self.sound_path):
with open(self.sound_path, "rb") as f:
self.audio_bytes = f.read()
else:
print(f"Warning: Alert sound file not found at '{self.sound_path}'.")
except Exception as e:
print(f"Warning: Could not load audio file. Error: {e}.")
def trigger_alert(self):
current_time = time.time()
if (current_time - self.last_alert_time) > self.cooldown:
if not self.alert_on and self.audio_bytes:
print("Triggering Static Alert!")
self.last_alert_time = current_time
self.alert_on = True
return self.audio_bytes # Return the audio data
return None
class GeminiAlertSystem(BaseAlerter):
"""Generates dynamic audio data using Gemini and gTTS."""
def __init__(self, config, api_key):
super().__init__(config)
try:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-1.5-flash') # Use the Gemini model
print("Gemini Alert System initialized successfully.")
except Exception as e:
print(f"Error initializing Gemini: {e}.")
self.model = None
def _generate_audio_data(self):
"""Generates a unique alert message and returns it as audio bytes."""
if not self.model:
alert_text = "Stay alert!"
else:
prompt = "You are an AI driving assistant. Generate a short, friendly, but firm audio alert (under 10 words) for a driver showing signs of drowsiness."
try:
response = self.model.generate_content(prompt)
alert_text = response.text.strip().replace('*', '')
except Exception as e:
print(f"Error generating alert text with Gemini: {e}")
alert_text = "Wake up please!"
print(f"Generated Alert Text: '{alert_text}'")
try:
# Generate TTS audio in memory
mp3_fp = io.BytesIO()
tts = gTTS(text=alert_text, lang='en')
tts.write_to_fp(mp3_fp)
mp3_fp.seek(0)
return mp3_fp.getvalue()
except Exception as e:
print(f"Error generating TTS audio: {e}")
return None
def trigger_alert(self):
current_time = time.time()
if (current_time - self.last_alert_time) > self.cooldown:
if not self.alert_on and self.model:
self.last_alert_time = current_time
self.alert_on = True
return self._generate_audio_data() # Return the audio data
return None
def get_alerter(config, api_key=None):
"""Factory to get the appropriate alerter based on config."""
use_gemini = config.get('gemini_api', {}).get('enabled', False)
if use_gemini and api_key:
print("Initializing Gemini Alert System.")
return GeminiAlertSystem(config, api_key)
else:
print("Initializing standard File Alert System.")
return FileAlertSystem(config) |