Testys commited on
Commit
2dba7bc
Β·
verified Β·
1 Parent(s): ba6a8ea

Update src/alerting/alert_system.py

Browse files
Files changed (1) hide show
  1. src/alerting/alert_system.py +44 -50
src/alerting/alert_system.py CHANGED
@@ -1,90 +1,84 @@
1
  # drive_paddy/alerting/alert_system.py
2
- import time, os, io, google.generativeai as genai
3
  from gtts import gTTS
4
 
5
  class BaseAlerter:
6
- """Base class for alert systems, handling state and cooldowns."""
7
  def __init__(self, config):
8
- self.config = config.get('alerting', {})
9
- self.cooldown = self.config.get('alert_cooldown_seconds', 15) # Increased cooldown for dynamic alerts
10
  self.last_alert_time = 0
11
  self.alert_on = False
12
 
13
  def trigger_alert(self, level="Very Drowsy"):
14
- """
15
- Checks if an alert can be triggered based on cooldown.
16
- If so, marks the alert as active and returns the appropriate text.
17
- Returns None if on cooldown.
18
- """
19
  raise NotImplementedError
20
 
21
  def reset_alert(self):
22
- """
23
- Resets the alert state. This is crucial to call after an alert has finished playing
24
- to allow the next one to be triggered.
25
- """
26
  if self.alert_on:
27
- print("Resetting Alert State.")
28
- self.alert_on = Falsese
29
 
 
 
 
30
  class FileAlertSystem(BaseAlerter):
31
  def __init__(self, config):
32
  super().__init__(config)
33
  self.audio_bytes = None
34
-
35
  try:
36
- if os.path.exists(config['alerting']['alert_sound_path']):
37
- with open(config['alerting']['alert_sound_path'], "rb") as f: self.audio_bytes = f.read()
38
- except Exception as e: print(f"Warning: Could not load audio file. Error: {e}.")
39
-
 
 
 
40
  def trigger_alert(self, level="Very Drowsy"):
41
- current_time = time.time()
42
- if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.audio_bytes:
43
- self.last_alert_time = current_time
44
  self.alert_on = True
45
- # For this simple system, we just return a flag to play the audio
46
- # The main app will handle the audio data itself.
47
- return "play_static_alert"
48
  return None
49
 
 
 
 
50
  class GeminiAlertSystem(BaseAlerter):
51
- """Generates dynamic alert TEXT using Gemini."""
52
  def __init__(self, config, api_key):
53
  super().__init__(config)
54
- self.model = None
55
  try:
56
  genai.configure(api_key=api_key)
57
- self.model = genai.GenerativeModel('gemini-pro')
58
- print("Successfully initialized GeminiAlertSystem.")
59
  except Exception as e:
60
- print(f"Error initializing Gemini: {e}.")
61
-
62
- def get_alert_text(self, level):
63
- """Generates just the text for an alert."""
64
- if not self.model: return "Warning, driver is drowsy."
65
 
66
- if level == "Slightly Drowsy":
67
- prompt = "Generate a short, gentle, and encouraging reminder (under 10 words) for a driver showing minor signs of fatigue. Example: 'Feeling a bit tired? Maybe take a quick break.'"
68
- else: # Very Drowsy
69
- prompt = "Generate a short, firm, and urgent alert (under 8 words) for a driver who is very drowsy. Be direct and clear. Example: 'Danger! Pull over now!' or 'Wake up! Stay focused!'"
 
 
 
 
70
  try:
71
- response = self.model.generate_content(prompt)
72
- return response.text.strip().replace('*', '')
73
  except Exception as e:
74
- print(f"Error generating text with Gemini: {e}")
75
- return "Driver alert! Please check your status."
76
 
77
  def trigger_alert(self, level="Very Drowsy"):
78
- """If not on cooldown, generates alert text and returns it."""
79
- current_time = time.time()
80
- if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on:
81
- self.last_alert_time = current_time
82
  self.alert_on = True
83
- print(f"Alert state is ON for level: {level}. Generating text.")
84
- return self.get_alert_text(level)
85
  return None
86
 
 
87
  def get_alerter(config, api_key=None):
88
- if config.get('alerting', {}).get('gemini_alerts', {}).get('enabled', False) and api_key:
 
89
  return GeminiAlertSystem(config, api_key)
90
  return FileAlertSystem(config)
 
1
  # drive_paddy/alerting/alert_system.py
2
+ import time, os, google.generativeai as genai
3
  from gtts import gTTS
4
 
5
  class BaseAlerter:
 
6
  def __init__(self, config):
7
+ self.config = config.get("alerting", {})
8
+ self.cooldown = self.config.get("alert_cooldown_seconds", 15)
9
  self.last_alert_time = 0
10
  self.alert_on = False
11
 
12
  def trigger_alert(self, level="Very Drowsy"):
 
 
 
 
 
13
  raise NotImplementedError
14
 
15
  def reset_alert(self):
 
 
 
 
16
  if self.alert_on:
17
+ print("Resetting alert state.")
18
+ self.alert_on = False # ← fixed typo
19
 
20
+ # ──────────────────────────────────────────────────────────
21
+ # STATIC FILE ALERTER
22
+ # ──────────────────────────────────────────────────────────
23
  class FileAlertSystem(BaseAlerter):
24
  def __init__(self, config):
25
  super().__init__(config)
26
  self.audio_bytes = None
 
27
  try:
28
+ p = config["alerting"]["alert_sound_path"]
29
+ with open(p, "rb") as f:
30
+ self.audio_bytes = f.read()
31
+ print("Static alert sound loaded.")
32
+ except Exception as e:
33
+ print(f"Could not load alert sound: {e}")
34
+
35
  def trigger_alert(self, level="Very Drowsy"):
36
+ now = time.time()
37
+ if (now - self.last_alert_time) > self.cooldown and not self.alert_on and self.audio_bytes:
38
+ self.last_alert_time = now
39
  self.alert_on = True
40
+ return self.audio_bytes # return raw bytes
 
 
41
  return None
42
 
43
+ # ──────────────────────────────────────────────────────────
44
+ # GEMINI TTS TEXT ALERTER
45
+ # ──────────────────────────────────────────────────────────
46
  class GeminiAlertSystem(BaseAlerter):
 
47
  def __init__(self, config, api_key):
48
  super().__init__(config)
 
49
  try:
50
  genai.configure(api_key=api_key)
51
+ self.model = genai.GenerativeModel("gemini-pro")
52
+ print("Gemini alert system ready.")
53
  except Exception as e:
54
+ print(f"Gemini init failed: {e}")
55
+ self.model = None
 
 
 
56
 
57
+ def _gen_text(self, level):
58
+ if not self.model:
59
+ return "Driver alert! Please check your status."
60
+ prompt = (
61
+ "Generate a short, gentle reminder (under 10 words) for a slightly fatigued driver."
62
+ if level == "Slightly Drowsy"
63
+ else "Generate a short, urgent alert (under 8 words) for a very drowsy driver."
64
+ )
65
  try:
66
+ return self.model.generate_content(prompt).text.strip().replace("*", "")
 
67
  except Exception as e:
68
+ print(f"Gemini error: {e}")
69
+ return "Stay alert! Pull over."
70
 
71
  def trigger_alert(self, level="Very Drowsy"):
72
+ now = time.time()
73
+ if (now - self.last_alert_time) > self.cooldown and not self.alert_on:
74
+ self.last_alert_time = now
 
75
  self.alert_on = True
76
+ return self._gen_text(level)
 
77
  return None
78
 
79
+ # ──────────────────────────────────────────────────────────
80
  def get_alerter(config, api_key=None):
81
+ gem_cfg = config.get("alerting", {}).get("gemini_alerts", {})
82
+ if gem_cfg.get("enabled") and api_key:
83
  return GeminiAlertSystem(config, api_key)
84
  return FileAlertSystem(config)