NASA-AI-Voice / app.py
CCockrum's picture
Update app.py
fdad8e8 verified
raw
history blame
13.5 kB
import os
import re
import requests
import torch
import streamlit as st
from langchain_huggingface import HuggingFaceEndpoint
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from transformers import pipeline
from langdetect import detect # Ensure this package is installed
# βœ… Check for GPU or Default to CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"βœ… Using device: {device}") # Debugging info
# βœ… Environment Variables
HF_TOKEN = os.getenv("HF_TOKEN")
if HF_TOKEN is None:
raise ValueError("HF_TOKEN is not set. Please add it to your environment variables.")
NASA_API_KEY = os.getenv("NASA_API_KEY")
if NASA_API_KEY is None:
raise ValueError("NASA_API_KEY is not set. Please add it to your environment variables.")
# βœ… Set Up Streamlit
st.set_page_config(page_title="HAL - NASA ChatBot", page_icon="πŸš€")
# βœ… Initialize Session State Variables (Ensuring Chat History Persists)
if "chat_history" not in st.session_state:
st.session_state.chat_history = [{"role": "assistant", "content": "Hello! I'm HAL, your NASA AI Assistant. You can speak to me directly or type your questions. How can I help you today?"}]
if "auto_speak" not in st.session_state:
st.session_state.auto_speak = True
# βœ… Initialize Hugging Face Model (Explicitly Set to CPU/GPU)
def get_llm_hf_inference(model_id="meta-llama/Llama-2-7b-chat-hf", max_new_tokens=800, temperature=0.3):
return HuggingFaceEndpoint(
repo_id=model_id,
max_new_tokens=max_new_tokens,
temperature=temperature, # πŸ”₯ Lowered temperature for more factual and structured responses
token=HF_TOKEN,
task="text-generation",
device=-1 if device == "cpu" else 0 # βœ… Force CPU (-1) or GPU (0)
)
# βœ… Ensure English Responses
def ensure_english(text):
try:
detected_lang = detect(text)
if detected_lang != "en":
return "⚠️ Sorry, I only respond in English. Can you rephrase your question?"
except:
return "⚠️ Language detection failed. Please ask your question again."
return text
# βœ… Main Response Function (Fixing Repetition & Context)
def get_response(system_message, chat_history, user_text, max_new_tokens=800):
# βœ… Ensure conversation history is included correctly
filtered_history = "\n".join(
f"{msg['role'].capitalize()}: {msg['content']}"
for msg in chat_history[-5:] # βœ… Only keep the last 5 exchanges to prevent overflow
)
prompt = PromptTemplate.from_template(
"[INST] You are a highly knowledgeable AI assistant. Answer concisely, avoid repetition, and structure responses well."
"\n\nCONTEXT:\n{chat_history}\n"
"\nLATEST USER INPUT:\nUser: {user_text}\n"
"\n[END CONTEXT]\n"
"Assistant:"
)
# βœ… Invoke Hugging Face Model
hf = get_llm_hf_inference(max_new_tokens=max_new_tokens, temperature=0.3) # πŸ”₯ Lowered temperature
chat = prompt | hf.bind(skip_prompt=True) | StrOutputParser(output_key='content')
response = chat.invoke(input=dict(system_message=system_message, user_text=user_text, chat_history=filtered_history))
# Clean up the response - remove any "HAL:" prefix if present
response = response.split("HAL:")[-1].strip() if "HAL:" in response else response.strip()
response = ensure_english(response)
if not response:
response = "I'm sorry, but I couldn't generate a response. Can you rephrase your question?"
# βœ… Update conversation history
chat_history.append({'role': 'user', 'content': user_text})
chat_history.append({'role': 'assistant', 'content': response})
# βœ… Keep only last 10 exchanges to prevent unnecessary repetition
return response, chat_history[-10:]
# βœ… Streamlit UI
st.title("πŸš€ HAL - NASA AI Assistant")
# βœ… Add styles and speech recognition JavaScript
st.markdown("""
<style>
.user-msg, .assistant-msg {
padding: 11px;
border-radius: 10px;
margin-bottom: 5px;
width: fit-content;
max-width: 80%;
text-align: justify;
}
.user-msg { background-color: #696969; color: white; }
.assistant-msg { background-color: #333333; color: white; }
.container { display: flex; flex-direction: column; align-items: flex-start; }
.speak-button {
background-color: #2196F3;
border: none;
color: white;
padding: 5px 11px;
text-align: center;
text-decoration: none;
display: inline-block;
font-size: 12px;
margin: 2px 2px;
cursor: pointer;
border-radius: 12px;
}
.voice-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-left: 8px;
vertical-align: middle;
background-color: #ccc;
}
.voice-indicator.active {
background-color: #4CAF50;
animation: pulse 1.5s infinite;
}
.status-bar {
padding: 6px 12px;
border-radius: 5px;
background-color: #f1f1f1;
display: flex;
align-items: center;
margin-bottom: 10px;
font-size: 14px;
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.5; }
100% { opacity: 1; }
}
@media (max-width: 600px) { .user-msg, .assistant-msg { font-size: 16px; max-width: 100%; } }
</style>
<script>
// Speech Recognition Setup with continuous mode
let recognition;
let isListening = false;
let silenceTimer;
let lastSpeechTime = Date.now();
let lastTranscript = '';
const SILENCE_THRESHOLD = 3000; // Submit after 3 seconds of silence
function setupSpeechRecognition() {
try {
window.SpeechRecognition = window.SpeechRecognition || window.webkitSpeechRecognition;
recognition = new SpeechRecognition();
recognition.lang = 'en-US';
recognition.interimResults = true;
recognition.continuous = true;
recognition.onstart = function() {
isListening = true;
document.getElementById('voice-indicator').classList.add('active');
document.getElementById('voice-status').textContent = 'Listening...';
};
recognition.onresult = function(event) {
lastSpeechTime = Date.now();
clearTimeout(silenceTimer);
// Get the latest transcript
let interimTranscript = '';
let finalTranscript = '';
for (let i = event.resultIndex; i < event.results.length; i++) {
const transcript = event.results[i][0].transcript;
if (event.results[i].isFinal) {
finalTranscript += transcript + ' ';
} else {
interimTranscript += transcript;
}
}
// Update the hidden input with the latest transcript
const speechResult = (finalTranscript || interimTranscript).trim();
if (speechResult && speechResult !== lastTranscript) {
document.getElementById('speech-result').value = speechResult;
document.getElementById('voice-status').textContent = 'I heard: ' + speechResult;
lastTranscript = speechResult;
// Set a timer to submit after silence
silenceTimer = setTimeout(() => {
if (speechResult) {
document.getElementById('submit-speech').click();
lastTranscript = '';
document.getElementById('speech-result').value = '';
}
}, SILENCE_THRESHOLD);
}
};
recognition.onerror = function(event) {
console.error('Speech recognition error:', event.error);
if (event.error === 'no-speech') {
// Just restart listening if there was no speech detected
restartRecognition();
} else {
isListening = false;
document.getElementById('voice-indicator').classList.remove('active');
document.getElementById('voice-status').textContent = 'Voice recognition paused. Click to restart.';
}
};
recognition.onend = function() {
// Auto restart if it ends unintentionally
if (isListening) {
restartRecognition();
} else {
document.getElementById('voice-indicator').classList.remove('active');
document.getElementById('voice-status').textContent = 'Voice recognition disabled.';
}
};
return true;
} catch (error) {
console.error('Speech recognition not supported:', error);
document.getElementById('voice-status').textContent = 'Voice recognition not supported in this browser.';
return false;
}
}
function toggleVoiceRecognition() {
if (!recognition) {
if (!setupSpeechRecognition()) {
return;
}
}
if (isListening) {
recognition.stop();
isListening = false;
document.getElementById('voice-indicator').classList.remove('active');
document.getElementById('voice-status').textContent = 'Voice recognition paused. Click to restart.';
} else {
startRecognition();
}
}
function startRecognition() {
try {
recognition.start();
document.getElementById('voice-status').textContent = 'Listening...';
} catch (e) {
console.error('Error starting recognition:', e);
setTimeout(startRecognition, 200);
}
}
function restartRecognition() {
try {
recognition.stop();
} catch (e) {}
setTimeout(startRecognition, 200);
}
// Text-to-Speech functionality
function speakText(text) {
const utterance = new SpeechSynthesisUtterance(text);
utterance.lang = 'en-US';
utterance.pitch = 1;
utterance.rate = 1;
window.speechSynthesis.speak(utterance);
}
// Auto speak the latest response
function autoSpeakLatest() {
const messages = document.querySelectorAll('.assistant-msg');
if (messages.length > 0) {
const latestMessage = messages[messages.length - 1];
const messageId = latestMessage.querySelector('span[id^="msg-"]').id;
speakText(document.getElementById(messageId).textContent);
}
}
// Initialize after the page loads
document.addEventListener('DOMContentLoaded', function() {
setupSpeechRecognition();
// Start listening automatically
setTimeout(startRecognition, 1000);
});
// Handle speech input submission
document.getElementById('submit-speech').addEventListener('click', function() {
const speechResult = document.getElementById('speech-result').value;
if (speechResult) {
// Update the Streamlit text input with the speech result
const textInputs = document.querySelectorAll('input[type="text"]');
if (textInputs.length > 0) {
const lastInput = textInputs[0];
lastInput.value = speechResult;
lastInput.dispatchEvent(new Event('input', { bubbles: true }));
// Find and click the submit button
setTimeout(() => {
const buttons = document.querySelectorAll('button[kind="primaryForm"]');
for (const button of buttons) {
if (button.textContent.includes('Submit')) {
button.click();
break;
}
}
}, 100);
}
}
});
// Auto-speak for newest message if enabled
function checkForNewMessages() {
const autoSpeakEnabled = document.querySelector('input[type="checkbox"][aria-label="Auto-speak responses"]').checked;
if (autoSpeakEnabled) {
const messages = document.querySelectorAll('.assistant-msg');
if (messages.length > 0) {
const latestMessage = messages[messages.length - 1];
const messageId = latestMessage.querySelector('span[id^="msg-"]').id;
// Only speak if this is a new message
if (!latestMessage.hasAttribute('data-spoken')) {
speakText(document.getElementById(messageId).textContent);
latestMessage.setAttribute('data-spoken', 'true');
}
}
}
}
// Check for new messages every second
setInterval(checkForNewMessages, 1000);
</script>
""", unsafe_allow_html=True)
# Add voice status indicator
st.markdown("""
<div class="status