File size: 4,100 Bytes
19f420a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# drive_paddy/alerting/alert_system.py
import time
import os
import io
from gtts import gTTS
import google.generativeai as genai
from dotenv import load_dotenv

load_dotenv()  # Load environment variables from .env file

api_key = os.getenv("GEMINI_API_KEY")
class BaseAlerter:
    """Base class for alerter systems."""
    def __init__(self, config):
        self.config = config['alerting']
        self.cooldown = self.config['alert_cooldown_seconds']
        self.last_alert_time = 0
        self.alert_on = False

    def trigger_alert(self):
        raise NotImplementedError

    def reset_alert(self):
        if self.alert_on:
            print("Resetting Alert.")
            self.alert_on = False

class FileAlertSystem(BaseAlerter):
    """Loads a static audio file from disk into memory."""
    def __init__(self, config):
        super().__init__(config)
        self.sound_path = self.config['alert_sound_path']
        self.audio_bytes = None
        try:
            if os.path.exists(self.sound_path):
                with open(self.sound_path, "rb") as f:
                    self.audio_bytes = f.read()
            else:
                print(f"Warning: Alert sound file not found at '{self.sound_path}'.")
        except Exception as e:
            print(f"Warning: Could not load audio file. Error: {e}.")

    def trigger_alert(self):
        current_time = time.time()
        if (current_time - self.last_alert_time) > self.cooldown:
            if not self.alert_on and self.audio_bytes:
                print("Triggering Static Alert!")
                self.last_alert_time = current_time
                self.alert_on = True
                return self.audio_bytes # Return the audio data
        return None


class GeminiAlertSystem(BaseAlerter):
    """Generates dynamic audio data using Gemini and gTTS."""
    def __init__(self, config, api_key):
        super().__init__(config)
        try:
            genai.configure(api_key=api_key)
            self.model = genai.GenerativeModel('gemini-1.5-flash')  # Use the Gemini model
            print("Gemini Alert System initialized successfully.")
        except Exception as e:
            print(f"Error initializing Gemini: {e}.")
            self.model = None

    def _generate_audio_data(self):
        """Generates a unique alert message and returns it as audio bytes."""
        if not self.model:
            alert_text = "Stay alert!"
        else:
            prompt = "You are an AI driving assistant. Generate a short, friendly, but firm audio alert (under 10 words) for a driver showing signs of drowsiness."
            try:
                response = self.model.generate_content(prompt)
                alert_text = response.text.strip().replace('*', '')
            except Exception as e:
                print(f"Error generating alert text with Gemini: {e}")
                alert_text = "Wake up please!"
        
        print(f"Generated Alert Text: '{alert_text}'")
        try:
            # Generate TTS audio in memory
            mp3_fp = io.BytesIO()
            tts = gTTS(text=alert_text, lang='en')
            tts.write_to_fp(mp3_fp)
            mp3_fp.seek(0)
            return mp3_fp.getvalue()
        except Exception as e:
            print(f"Error generating TTS audio: {e}")
            return None

    def trigger_alert(self):
        current_time = time.time()
        if (current_time - self.last_alert_time) > self.cooldown:
            if not self.alert_on and self.model:
                self.last_alert_time = current_time
                self.alert_on = True
                return self._generate_audio_data() # Return the audio data
        return None


def get_alerter(config, api_key=None):
    """Factory to get the appropriate alerter based on config."""
    use_gemini = config.get('gemini_api', {}).get('enabled', False)
    
    if use_gemini and api_key:
        print("Initializing Gemini Alert System.")
        return GeminiAlertSystem(config, api_key)
    else:
        print("Initializing standard File Alert System.")
        return FileAlertSystem(config)