NASA-AI-Voice / app.py
CCockrum's picture
Update app.py
cffd56f verified
import os
import re
import requests
import torch
import streamlit as st
from langchain_huggingface import HuggingFaceEndpoint
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from transformers import pipeline
from langdetect import detect # Ensure this package is installed
# βœ… Check for GPU or Default to CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"βœ… Using device: {device}") # Debugging info
# βœ… Environment Variables
HF_TOKEN = os.getenv("HF_TOKEN")
if HF_TOKEN is None:
raise ValueError("HF_TOKEN is not set. Please add it to your environment variables.")
NASA_API_KEY = os.getenv("NASA_API_KEY")
if NASA_API_KEY is None:
raise ValueError("NASA_API_KEY is not set. Please add it to your environment variables.")
# βœ… Set Up Streamlit
st.set_page_config(page_title="HAL - NASA ChatBot", page_icon="πŸš€")
# βœ… Initialize Session State Variables (Ensuring Chat History Persists)
if "chat_history" not in st.session_state:
st.session_state.chat_history = [{"role": "assistant", "content": "Hello! I'm HAL, your NASA AI Assistant. You can speak to me directly or type your questions. How can I help you today?"}]
if "auto_speak" not in st.session_state:
st.session_state.auto_speak = True
# βœ… Initialize Hugging Face Model (Explicitly Set to CPU/GPU)
def get_llm_hf_inference(model_id="meta-llama/Llama-2-7b-chat-hf", max_new_tokens=800, temperature=0.3):
return HuggingFaceEndpoint(
repo_id=model_id,
max_new_tokens=max_new_tokens,
temperature=temperature, # πŸ”₯ Lowered temperature for more factual and structured responses
token=HF_TOKEN,
task="text-generation",
device=-1 if device == "cpu" else 0 # βœ… Force CPU (-1) or GPU (0)
)
# βœ… Ensure English Responses
def ensure_english(text):
try:
detected_lang = detect(text)
if detected_lang != "en":
return "⚠️ Sorry, I only respond in English. Can you rephrase your question?"
except:
return "⚠️ Language detection failed. Please ask your question again."
return text
# βœ… Main Response Function (Fixing Repetition & Context)
def get_response(system_message, chat_history, user_text, max_new_tokens=800):
# βœ… Ensure conversation history is included correctly
filtered_history = "\n".join(
f"{msg['role'].capitalize()}: {msg['content']}"
for msg in chat_history[-5:] # βœ… Only keep the last 5 exchanges to prevent overflow
)
prompt = PromptTemplate.from_template(
"[INST] You are a highly knowledgeable AI assistant. Answer concisely, avoid repetition, and structure responses well."
"\n\nCONTEXT:\n{chat_history}\n"
"\nLATEST USER INPUT:\nUser: {user_text}\n"
"\n[END CONTEXT]\n"
"Assistant:"
)
# βœ… Invoke Hugging Face Model
hf = get_llm_hf_inference(max_new_tokens=max_new_tokens, temperature=0.3) # πŸ”₯ Lowered temperature
chat = prompt | hf.bind(skip_prompt=True) | StrOutputParser(output_key='content')
response = chat.invoke(input=dict(system_message=system_message, user_text=user_text, chat_history=filtered_history))
# Clean up the response - remove any "HAL:" prefix if present
response = response.split("HAL:")[-1].strip() if "HAL:" in response else response.strip()
response = ensure_english(response)
if not response:
response = "I'm sorry, but I couldn't generate a response. Can you rephrase your question?"
# βœ… Update conversation history
chat_history.append({'role': 'user', 'content': user_text})
chat_history.append({'role': 'assistant', 'content': response})
# βœ… Keep only last 10 exchanges to prevent unnecessary repetition
return response, chat_history[-10:]
# βœ… NASA API Function to get space data
def get_nasa_data(query):
try:
if "apod" in query.lower() or "picture of the day" in query.lower():
response = requests.get(f"https://api.nasa.gov/planetary/apod?api_key={NASA_API_KEY}")
if response.status_code == 200:
data = response.json()
return {
"title": data.get("title", "NASA Image"),
"date": data.get("date", ""),
"explanation": data.get("explanation", ""),
"url": data.get("url", "")
}
return None
except Exception as e:
print(f"Error fetching NASA data: {e}")
return None
# βœ… Streamlit UI
st.title("πŸš€ HAL - NASA AI Assistant")
# βœ… Add styles and speech recognition JavaScript
st.markdown("""
<style>
.user-msg, .assistant-msg {
padding: 11px;
border-radius: 10px;
margin-bottom: 5px;
width: fit-content;
max-width: 80%;
text-align: justify;
}
.user-msg { background-color: #696969; color: white; }
.assistant-msg { background-color: #333333; color: white; }
.container { display: flex; flex-direction: column; align-items: flex-start; }
.speak-button {
background-color: #2196F3;
border: none;
color: grey;
padding: 5px 10px;
text-align: center;
text-decoration: none;
display: inline-block;
font-size: 12px;
margin: 2px 2px;
cursor: pointer;
border-radius: 12px;
}
.voice-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-left: 8px;
vertical-align: middle;
background-color: #ccc;
}
.voice-indicator.active {
background-color: #4CAF50;
animation: pulse 1.5s infinite;
}
.status-bar {
padding: 6px 12px;
border-radius: 5px;
background-color: #f1f1f1;
display: flex;
align-items: center;
margin-bottom: 10px;
font-size: 14px;
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.5; }
100% { opacity: 1; }
}
@media (max-width: 600px) { .user-msg, .assistant-msg { font-size: 16px; max-width: 100%; } }
</style>
<script>
// Speech Recognition Setup with continuous mode
let recognition;
let isListening = false;
let silenceTimer;
let lastSpeechTime = Date.now();
let lastTranscript = '';
const SILENCE_THRESHOLD = 3000; // Submit after 3 seconds of silence
function setupSpeechRecognition() {
try {
window.SpeechRecognition = window.SpeechRecognition || window.webkitSpeechRecognition;
recognition = new SpeechRecognition();
recognition.lang = 'en-US';
recognition.interimResults = true;
recognition.continuous = true;
recognition.onstart = function() {
isListening = true;
document.getElementById('voice-indicator').classList.add('active');
document.getElementById('voice-status').textContent = 'Listening...';
};
recognition.onresult = function(event) {
lastSpeechTime = Date.now();
clearTimeout(silenceTimer);
// Get the latest transcript
let interimTranscript = '';
let finalTranscript = '';
for (let i = event.resultIndex; i < event.results.length; i++) {
const transcript = event.results[i][0].transcript;
if (event.results[i].isFinal) {
finalTranscript += transcript + ' ';
} else {
interimTranscript += transcript;
}
}
// Update the hidden input with the latest transcript
const speechResult = (finalTranscript || interimTranscript).trim();
if (speechResult && speechResult !== lastTranscript) {
document.getElementById('speech-result').value = speechResult;
document.getElementById('voice-status').textContent = 'I heard: ' + speechResult;
lastTranscript = speechResult;
// Set a timer to submit after silence
silenceTimer = setTimeout(() => {
if (speechResult) {
document.getElementById('submit-speech').click();
lastTranscript = '';
document.getElementById('speech-result').value = '';
}
}, SILENCE_THRESHOLD);
}
};
recognition.onerror = function(event) {
console.error('Speech recognition error:', event.error);
if (event.error === 'no-speech') {
// Just restart listening if there was no speech detected
restartRecognition();
} else {
isListening = false;
document.getElementById('voice-indicator').classList.remove('active');
document.getElementById('voice-status').textContent = 'Voice recognition paused. Click to restart.';
}
};
recognition.onend = function() {
// Auto restart if it ends unintentionally
if (isListening) {
restartRecognition();
} else {
document.getElementById('voice-indicator').classList.remove('active');
document.getElementById('voice-status').textContent = 'Voice recognition disabled.';
}
};
return true;
} catch (error) {
console.error('Speech recognition not supported:', error);
document.getElementById('voice-status').textContent = 'Voice recognition not supported in this browser.';
return false;
}
}
function toggleVoiceRecognition() {
if (!recognition) {
if (!setupSpeechRecognition()) {
return;
}
}
if (isListening) {
recognition.stop();
isListening = false;
document.getElementById('voice-indicator').classList.remove('active');
document.getElementById('voice-status').textContent = 'Voice recognition paused. Click to restart.';
} else {
startRecognition();
}
}
function startRecognition() {
try {
recognition.start();
document.getElementById('voice-status').textContent = 'Listening...';
} catch (e) {
console.error('Error starting recognition:', e);
setTimeout(startRecognition, 200);
}
}
function restartRecognition() {
try {
recognition.stop();
} catch (e) {}
setTimeout(startRecognition, 200);
}
// Text-to-Speech functionality
function speakText(text) {
const utterance = new SpeechSynthesisUtterance(text);
utterance.lang = 'en-US';
utterance.pitch = 1;
utterance.rate = 1;
window.speechSynthesis.speak(utterance);
}
// Auto speak the latest response
function autoSpeakLatest() {
const messages = document.querySelectorAll('.assistant-msg');
if (messages.length > 0) {
const latestMessage = messages[messages.length - 1];
const messageId = latestMessage.querySelector('span[id^="msg-"]').id;
speakText(document.getElementById(messageId).textContent);
}
}
// Initialize after the page loads
document.addEventListener('DOMContentLoaded', function() {
setupSpeechRecognition();
// Start listening automatically
setTimeout(startRecognition, 1000);
});
// Handle speech input submission
document.getElementById('submit-speech').addEventListener('click', function() {
const speechResult = document.getElementById('speech-result').value;
if (speechResult) {
// Update the Streamlit text input with the speech result
const textInputs = document.querySelectorAll('input[type="text"]');
if (textInputs.length > 0) {
const lastInput = textInputs[0];
lastInput.value = speechResult;
lastInput.dispatchEvent(new Event('input', { bubbles: true }));
// Find and click the submit button
setTimeout(() => {
const buttons = document.querySelectorAll('button[kind="primaryForm"]');
for (const button of buttons) {
if (button.textContent.includes('Submit')) {
button.click();
break;
}
}
}, 100);
}
}
});
// Auto-speak for newest message if enabled
function checkForNewMessages() {
const autoSpeakEnabled = document.querySelector('input[type="checkbox"][aria-label="Auto-speak responses"]').checked;
if (autoSpeakEnabled) {
const messages = document.querySelectorAll('.assistant-msg');
if (messages.length > 0) {
const latestMessage = messages[messages.length - 1];
const messageId = latestMessage.querySelector('span[id^="msg-"]').id;
// Only speak if this is a new message
if (!latestMessage.hasAttribute('data-spoken')) {
speakText(document.getElementById(messageId).textContent);
latestMessage.setAttribute('data-spoken', 'true');
}
}
}
}
// Check for new messages every second
setInterval(checkForNewMessages, 1000);
</script>
""", unsafe_allow_html=True)
# Add voice status indicator
st.markdown("""
<div class="status-bar">
<span id="voice-status">Initializing voice recognition...</span>
<span id="voice-indicator" class="voice-indicator" onclick="toggleVoiceRecognition()"></span>
</div>
""", unsafe_allow_html=True)
# Regular text input
user_input = st.chat_input("Type your message here or just speak...")
# Hidden input for speech results
speech_result = st.text_input("Speech Result", key="speech_input", label_visibility="collapsed")
# Hidden button to submit speech
st.markdown('<button id="submit-speech" style="display:none;">Submit Speech</button>', unsafe_allow_html=True)
# Auto-speak toggle
st.checkbox("Auto-speak responses", value=st.session_state.auto_speak, key="auto_speak_toggle",
on_change=lambda: setattr(st.session_state, "auto_speak", st.session_state.auto_speak_toggle))
# Display chat history
for i, msg in enumerate(st.session_state.chat_history):
if msg["role"] == "user":
st.markdown(f'<div class="container"><div class="user-msg">You: {msg["content"]}</div></div>', unsafe_allow_html=True)
else:
msg_id = f"msg-{i}"
st.markdown(f'<div class="container"><div class="assistant-msg">HAL: <span id="{msg_id}">{msg["content"]}</span> <button class="speak-button" onclick="speakText(document.getElementById(\'{msg_id}\').textContent)">πŸ”Š</button></div></div>', unsafe_allow_html=True)
# Process user input
if user_input or speech_result:
# Prioritize speech result if available
query = speech_result if speech_result else user_input
# Get NASA data if applicable
nasa_data = get_nasa_data(query)
# Generate response
system_message = "You are HAL, an AI assistant specialized in NASA and space knowledge. Provide concise, factual responses."
response, st.session_state.chat_history = get_response(system_message, st.session_state.chat_history, query)
# Display NASA image if available
if nasa_data:
st.image(nasa_data["url"], caption=f"{nasa_data['title']} - {nasa_data['date']}")
st.write(nasa_data["explanation"])
# Force a rerun to update the chat display
st.rerun()
# Add JavaScript to ensure the page scrolls to the bottom on new messages
st.markdown("""
<script>
// Scroll to bottom of page on load
window.onload = function() {
window.scrollTo(0, document.body.scrollHeight);
}
</script>
""", unsafe_allow_html=True)