Testys commited on
Commit
fb982e6
·
verified ·
1 Parent(s): f6146b6

Update src/alerting/alert_system.py

Browse files
Files changed (1) hide show
  1. src/alerting/alert_system.py +55 -22
src/alerting/alert_system.py CHANGED
@@ -1,57 +1,90 @@
1
  # drive_paddy/alerting/alert_system.py
2
  import time, os, io, google.generativeai as genai
3
  from gtts import gTTS
 
4
  class BaseAlerter:
 
5
  def __init__(self, config):
6
- self.config = config['alerting']
7
- self.cooldown = self.config['alert_cooldown_seconds']
8
  self.last_alert_time = 0
9
  self.alert_on = False
10
- def trigger_alert(self, level="Very Drowsy"): raise NotImplementedError
 
 
 
 
 
 
 
 
11
  def reset_alert(self):
12
- if self.alert_on: print("Resetting Alert."); self.alert_on = False
 
 
 
 
 
 
13
 
14
  class FileAlertSystem(BaseAlerter):
15
  def __init__(self, config):
16
  super().__init__(config)
17
  self.audio_bytes = None
 
18
  try:
19
  if os.path.exists(config['alerting']['alert_sound_path']):
20
  with open(config['alerting']['alert_sound_path'], "rb") as f: self.audio_bytes = f.read()
21
  except Exception as e: print(f"Warning: Could not load audio file. Error: {e}.")
 
22
  def trigger_alert(self, level="Very Drowsy"):
23
  current_time = time.time()
24
  if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.audio_bytes:
25
- self.last_alert_time = current_time; self.alert_on = True
26
- print("Triggering Static Alert!")
27
- return self.audio_bytes
 
 
28
  return None
29
 
30
  class GeminiAlertSystem(BaseAlerter):
 
31
  def __init__(self, config, api_key):
32
  super().__init__(config)
33
- try: genai.configure(api_key=api_key); self.model = genai.GenerativeModel('gemini-pro')
34
- except Exception as e: print(f"Error initializing Gemini: {e}."); self.model = None
35
- def _generate_audio_data(self, level):
36
- if not self.model: return None
 
 
 
 
 
 
 
 
37
  if level == "Slightly Drowsy":
38
- prompt = "You are an AI driving assistant. Generate a short, gentle reminder (under 10 words) for a driver showing minor signs of fatigue."
39
  else: # Very Drowsy
40
- prompt = "You are an AI driving assistant. Generate a short, firm, and urgent alert (under 10 words) for a driver who is very drowsy."
41
  try:
42
  response = self.model.generate_content(prompt)
43
- alert_text = response.text.strip().replace('*', '')
44
- print(f"Generated Alert Text ({level}): '{alert_text}'")
45
- mp3_fp = io.BytesIO(); tts = gTTS(text=alert_text, lang='en'); tts.write_to_fp(mp3_fp)
46
- mp3_fp.seek(0); return mp3_fp.getvalue()
47
- except Exception as e: print(f"Error generating TTS audio: {e}"); return None
48
  def trigger_alert(self, level="Very Drowsy"):
 
49
  current_time = time.time()
50
- if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.model:
51
- self.last_alert_time = current_time; self.alert_on = True
52
- return self._generate_audio_data(level)
 
 
53
  return None
54
 
55
  def get_alerter(config, api_key=None):
56
- if config.get('gemini_api', {}).get('enabled', False) and api_key: return GeminiAlertSystem(config, api_key)
 
57
  return FileAlertSystem(config)
 
1
  # drive_paddy/alerting/alert_system.py
2
  import time, os, io, google.generativeai as genai
3
  from gtts import gTTS
4
+
5
  class BaseAlerter:
6
+ """Base class for alert systems, handling state and cooldowns."""
7
  def __init__(self, config):
8
+ self.config = config.get('alerting', {})
9
+ self.cooldown = self.config.get('alert_cooldown_seconds', 15) # Increased cooldown for dynamic alerts
10
  self.last_alert_time = 0
11
  self.alert_on = False
12
+
13
+ def trigger_alert(self, level="Very Drowsy"):
14
+ """
15
+ Checks if an alert can be triggered based on cooldown.
16
+ If so, marks the alert as active and returns the appropriate text.
17
+ Returns None if on cooldown.
18
+ """
19
+ raise NotImplementedError
20
+
21
  def reset_alert(self):
22
+ """
23
+ Resets the alert state. This is crucial to call after an alert has finished playing
24
+ to allow the next one to be triggered.
25
+ """
26
+ if self.alert_on:
27
+ print("Resetting Alert State.")
28
+ self.alert_on = Falsese
29
 
30
  class FileAlertSystem(BaseAlerter):
31
  def __init__(self, config):
32
  super().__init__(config)
33
  self.audio_bytes = None
34
+
35
  try:
36
  if os.path.exists(config['alerting']['alert_sound_path']):
37
  with open(config['alerting']['alert_sound_path'], "rb") as f: self.audio_bytes = f.read()
38
  except Exception as e: print(f"Warning: Could not load audio file. Error: {e}.")
39
+
40
  def trigger_alert(self, level="Very Drowsy"):
41
  current_time = time.time()
42
  if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.audio_bytes:
43
+ self.last_alert_time = current_time
44
+ self.alert_on = True
45
+ # For this simple system, we just return a flag to play the audio
46
+ # The main app will handle the audio data itself.
47
+ return "play_static_alert"
48
  return None
49
 
50
  class GeminiAlertSystem(BaseAlerter):
51
+ """Generates dynamic alert TEXT using Gemini."""
52
  def __init__(self, config, api_key):
53
  super().__init__(config)
54
+ self.model = None
55
+ try:
56
+ genai.configure(api_key=api_key)
57
+ self.model = genai.GenerativeModel('gemini-pro')
58
+ print("Successfully initialized GeminiAlertSystem.")
59
+ except Exception as e:
60
+ print(f"Error initializing Gemini: {e}.")
61
+
62
+ def get_alert_text(self, level):
63
+ """Generates just the text for an alert."""
64
+ if not self.model: return "Warning, driver is drowsy."
65
+
66
  if level == "Slightly Drowsy":
67
+ prompt = "Generate a short, gentle, and encouraging reminder (under 10 words) for a driver showing minor signs of fatigue. Example: 'Feeling a bit tired? Maybe take a quick break.'"
68
  else: # Very Drowsy
69
+ prompt = "Generate a short, firm, and urgent alert (under 8 words) for a driver who is very drowsy. Be direct and clear. Example: 'Danger! Pull over now!' or 'Wake up! Stay focused!'"
70
  try:
71
  response = self.model.generate_content(prompt)
72
+ return response.text.strip().replace('*', '')
73
+ except Exception as e:
74
+ print(f"Error generating text with Gemini: {e}")
75
+ return "Driver alert! Please check your status."
76
+
77
  def trigger_alert(self, level="Very Drowsy"):
78
+ """If not on cooldown, generates alert text and returns it."""
79
  current_time = time.time()
80
+ if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on:
81
+ self.last_alert_time = current_time
82
+ self.alert_on = True
83
+ print(f"Alert state is ON for level: {level}. Generating text.")
84
+ return self.get_alert_text(level)
85
  return None
86
 
87
  def get_alerter(config, api_key=None):
88
+ if config.get('alerting', {}).get('gemini_alerts', {}).get('enabled', False) and api_key:
89
+ return GeminiAlertSystem(config, api_key)
90
  return FileAlertSystem(config)