Spaces:
Sleeping
Sleeping
import os | |
import re | |
import requests | |
import torch | |
import streamlit as st | |
from langchain_huggingface import HuggingFaceEndpoint | |
from langchain_core.prompts import PromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from transformers import pipeline | |
from langdetect import detect # Ensure this package is installed | |
# β Check for GPU or Default to CPU | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"β Using device: {device}") # Debugging info | |
# β Environment Variables | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
if HF_TOKEN is None: | |
raise ValueError("HF_TOKEN is not set. Please add it to your environment variables.") | |
NASA_API_KEY = os.getenv("NASA_API_KEY") | |
if NASA_API_KEY is None: | |
raise ValueError("NASA_API_KEY is not set. Please add it to your environment variables.") | |
# β Set Up Streamlit | |
st.set_page_config(page_title="HAL - NASA ChatBot", page_icon="π") | |
# β Initialize Session State Variables (Ensuring Chat History Persists) | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [{"role": "assistant", "content": "Hello! I'm HAL, your NASA AI Assistant. You can speak to me directly or type your questions. How can I help you today?"}] | |
if "auto_speak" not in st.session_state: | |
st.session_state.auto_speak = True | |
# β Initialize Hugging Face Model (Explicitly Set to CPU/GPU) | |
def get_llm_hf_inference(model_id="meta-llama/Llama-2-7b-chat-hf", max_new_tokens=800, temperature=0.3): | |
return HuggingFaceEndpoint( | |
repo_id=model_id, | |
max_new_tokens=max_new_tokens, | |
temperature=temperature, # π₯ Lowered temperature for more factual and structured responses | |
token=HF_TOKEN, | |
task="text-generation", | |
device=-1 if device == "cpu" else 0 # β Force CPU (-1) or GPU (0) | |
) | |
# β Ensure English Responses | |
def ensure_english(text): | |
try: | |
detected_lang = detect(text) | |
if detected_lang != "en": | |
return "β οΈ Sorry, I only respond in English. Can you rephrase your question?" | |
except: | |
return "β οΈ Language detection failed. Please ask your question again." | |
return text | |
# β Main Response Function (Fixing Repetition & Context) | |
def get_response(system_message, chat_history, user_text, max_new_tokens=800): | |
# β Ensure conversation history is included correctly | |
filtered_history = "\n".join( | |
f"{msg['role'].capitalize()}: {msg['content']}" | |
for msg in chat_history[-5:] # β Only keep the last 5 exchanges to prevent overflow | |
) | |
prompt = PromptTemplate.from_template( | |
"[INST] You are a highly knowledgeable AI assistant. Answer concisely, avoid repetition, and structure responses well." | |
"\n\nCONTEXT:\n{chat_history}\n" | |
"\nLATEST USER INPUT:\nUser: {user_text}\n" | |
"\n[END CONTEXT]\n" | |
"Assistant:" | |
) | |
# β Invoke Hugging Face Model | |
hf = get_llm_hf_inference(max_new_tokens=max_new_tokens, temperature=0.3) # π₯ Lowered temperature | |
chat = prompt | hf.bind(skip_prompt=True) | StrOutputParser(output_key='content') | |
response = chat.invoke(input=dict(system_message=system_message, user_text=user_text, chat_history=filtered_history)) | |
# Clean up the response - remove any "HAL:" prefix if present | |
response = response.split("HAL:")[-1].strip() if "HAL:" in response else response.strip() | |
response = ensure_english(response) | |
if not response: | |
response = "I'm sorry, but I couldn't generate a response. Can you rephrase your question?" | |
# β Update conversation history | |
chat_history.append({'role': 'user', 'content': user_text}) | |
chat_history.append({'role': 'assistant', 'content': response}) | |
# β Keep only last 10 exchanges to prevent unnecessary repetition | |
return response, chat_history[-10:] | |
# β NASA API Function to get space data | |
def get_nasa_data(query): | |
try: | |
if "apod" in query.lower() or "picture of the day" in query.lower(): | |
response = requests.get(f"https://api.nasa.gov/planetary/apod?api_key={NASA_API_KEY}") | |
if response.status_code == 200: | |
data = response.json() | |
return { | |
"title": data.get("title", "NASA Image"), | |
"date": data.get("date", ""), | |
"explanation": data.get("explanation", ""), | |
"url": data.get("url", "") | |
} | |
return None | |
except Exception as e: | |
print(f"Error fetching NASA data: {e}") | |
return None | |
# β Streamlit UI | |
st.title("π HAL - NASA AI Assistant") | |
# β Add styles and speech recognition JavaScript | |
st.markdown(""" | |
<style> | |
.user-msg, .assistant-msg { | |
padding: 11px; | |
border-radius: 10px; | |
margin-bottom: 5px; | |
width: fit-content; | |
max-width: 80%; | |
text-align: justify; | |
} | |
.user-msg { background-color: #696969; color: white; } | |
.assistant-msg { background-color: #333333; color: white; } | |
.container { display: flex; flex-direction: column; align-items: flex-start; } | |
.speak-button { | |
background-color: #2196F3; | |
border: none; | |
color: grey; | |
padding: 5px 10px; | |
text-align: center; | |
text-decoration: none; | |
display: inline-block; | |
font-size: 12px; | |
margin: 2px 2px; | |
cursor: pointer; | |
border-radius: 12px; | |
} | |
.voice-indicator { | |
display: inline-block; | |
width: 12px; | |
height: 12px; | |
border-radius: 50%; | |
margin-left: 8px; | |
vertical-align: middle; | |
background-color: #ccc; | |
} | |
.voice-indicator.active { | |
background-color: #4CAF50; | |
animation: pulse 1.5s infinite; | |
} | |
.status-bar { | |
padding: 6px 12px; | |
border-radius: 5px; | |
background-color: #f1f1f1; | |
display: flex; | |
align-items: center; | |
margin-bottom: 10px; | |
font-size: 14px; | |
} | |
@keyframes pulse { | |
0% { opacity: 1; } | |
50% { opacity: 0.5; } | |
100% { opacity: 1; } | |
} | |
@media (max-width: 600px) { .user-msg, .assistant-msg { font-size: 16px; max-width: 100%; } } | |
</style> | |
<script> | |
// Speech Recognition Setup with continuous mode | |
let recognition; | |
let isListening = false; | |
let silenceTimer; | |
let lastSpeechTime = Date.now(); | |
let lastTranscript = ''; | |
const SILENCE_THRESHOLD = 3000; // Submit after 3 seconds of silence | |
function setupSpeechRecognition() { | |
try { | |
window.SpeechRecognition = window.SpeechRecognition || window.webkitSpeechRecognition; | |
recognition = new SpeechRecognition(); | |
recognition.lang = 'en-US'; | |
recognition.interimResults = true; | |
recognition.continuous = true; | |
recognition.onstart = function() { | |
isListening = true; | |
document.getElementById('voice-indicator').classList.add('active'); | |
document.getElementById('voice-status').textContent = 'Listening...'; | |
}; | |
recognition.onresult = function(event) { | |
lastSpeechTime = Date.now(); | |
clearTimeout(silenceTimer); | |
// Get the latest transcript | |
let interimTranscript = ''; | |
let finalTranscript = ''; | |
for (let i = event.resultIndex; i < event.results.length; i++) { | |
const transcript = event.results[i][0].transcript; | |
if (event.results[i].isFinal) { | |
finalTranscript += transcript + ' '; | |
} else { | |
interimTranscript += transcript; | |
} | |
} | |
// Update the hidden input with the latest transcript | |
const speechResult = (finalTranscript || interimTranscript).trim(); | |
if (speechResult && speechResult !== lastTranscript) { | |
document.getElementById('speech-result').value = speechResult; | |
document.getElementById('voice-status').textContent = 'I heard: ' + speechResult; | |
lastTranscript = speechResult; | |
// Set a timer to submit after silence | |
silenceTimer = setTimeout(() => { | |
if (speechResult) { | |
document.getElementById('submit-speech').click(); | |
lastTranscript = ''; | |
document.getElementById('speech-result').value = ''; | |
} | |
}, SILENCE_THRESHOLD); | |
} | |
}; | |
recognition.onerror = function(event) { | |
console.error('Speech recognition error:', event.error); | |
if (event.error === 'no-speech') { | |
// Just restart listening if there was no speech detected | |
restartRecognition(); | |
} else { | |
isListening = false; | |
document.getElementById('voice-indicator').classList.remove('active'); | |
document.getElementById('voice-status').textContent = 'Voice recognition paused. Click to restart.'; | |
} | |
}; | |
recognition.onend = function() { | |
// Auto restart if it ends unintentionally | |
if (isListening) { | |
restartRecognition(); | |
} else { | |
document.getElementById('voice-indicator').classList.remove('active'); | |
document.getElementById('voice-status').textContent = 'Voice recognition disabled.'; | |
} | |
}; | |
return true; | |
} catch (error) { | |
console.error('Speech recognition not supported:', error); | |
document.getElementById('voice-status').textContent = 'Voice recognition not supported in this browser.'; | |
return false; | |
} | |
} | |
function toggleVoiceRecognition() { | |
if (!recognition) { | |
if (!setupSpeechRecognition()) { | |
return; | |
} | |
} | |
if (isListening) { | |
recognition.stop(); | |
isListening = false; | |
document.getElementById('voice-indicator').classList.remove('active'); | |
document.getElementById('voice-status').textContent = 'Voice recognition paused. Click to restart.'; | |
} else { | |
startRecognition(); | |
} | |
} | |
function startRecognition() { | |
try { | |
recognition.start(); | |
document.getElementById('voice-status').textContent = 'Listening...'; | |
} catch (e) { | |
console.error('Error starting recognition:', e); | |
setTimeout(startRecognition, 200); | |
} | |
} | |
function restartRecognition() { | |
try { | |
recognition.stop(); | |
} catch (e) {} | |
setTimeout(startRecognition, 200); | |
} | |
// Text-to-Speech functionality | |
function speakText(text) { | |
const utterance = new SpeechSynthesisUtterance(text); | |
utterance.lang = 'en-US'; | |
utterance.pitch = 1; | |
utterance.rate = 1; | |
window.speechSynthesis.speak(utterance); | |
} | |
// Auto speak the latest response | |
function autoSpeakLatest() { | |
const messages = document.querySelectorAll('.assistant-msg'); | |
if (messages.length > 0) { | |
const latestMessage = messages[messages.length - 1]; | |
const messageId = latestMessage.querySelector('span[id^="msg-"]').id; | |
speakText(document.getElementById(messageId).textContent); | |
} | |
} | |
// Initialize after the page loads | |
document.addEventListener('DOMContentLoaded', function() { | |
setupSpeechRecognition(); | |
// Start listening automatically | |
setTimeout(startRecognition, 1000); | |
}); | |
// Handle speech input submission | |
document.getElementById('submit-speech').addEventListener('click', function() { | |
const speechResult = document.getElementById('speech-result').value; | |
if (speechResult) { | |
// Update the Streamlit text input with the speech result | |
const textInputs = document.querySelectorAll('input[type="text"]'); | |
if (textInputs.length > 0) { | |
const lastInput = textInputs[0]; | |
lastInput.value = speechResult; | |
lastInput.dispatchEvent(new Event('input', { bubbles: true })); | |
// Find and click the submit button | |
setTimeout(() => { | |
const buttons = document.querySelectorAll('button[kind="primaryForm"]'); | |
for (const button of buttons) { | |
if (button.textContent.includes('Submit')) { | |
button.click(); | |
break; | |
} | |
} | |
}, 100); | |
} | |
} | |
}); | |
// Auto-speak for newest message if enabled | |
function checkForNewMessages() { | |
const autoSpeakEnabled = document.querySelector('input[type="checkbox"][aria-label="Auto-speak responses"]').checked; | |
if (autoSpeakEnabled) { | |
const messages = document.querySelectorAll('.assistant-msg'); | |
if (messages.length > 0) { | |
const latestMessage = messages[messages.length - 1]; | |
const messageId = latestMessage.querySelector('span[id^="msg-"]').id; | |
// Only speak if this is a new message | |
if (!latestMessage.hasAttribute('data-spoken')) { | |
speakText(document.getElementById(messageId).textContent); | |
latestMessage.setAttribute('data-spoken', 'true'); | |
} | |
} | |
} | |
} | |
// Check for new messages every second | |
setInterval(checkForNewMessages, 1000); | |
</script> | |
""", unsafe_allow_html=True) | |
# Add voice status indicator | |
st.markdown(""" | |
<div class="status-bar"> | |
<span id="voice-status">Initializing voice recognition...</span> | |
<span id="voice-indicator" class="voice-indicator" onclick="toggleVoiceRecognition()"></span> | |
</div> | |
""", unsafe_allow_html=True) | |
# Regular text input | |
user_input = st.chat_input("Type your message here or just speak...") | |
# Hidden input for speech results | |
speech_result = st.text_input("Speech Result", key="speech_input", label_visibility="collapsed") | |
# Hidden button to submit speech | |
st.markdown('<button id="submit-speech" style="display:none;">Submit Speech</button>', unsafe_allow_html=True) | |
# Auto-speak toggle | |
st.checkbox("Auto-speak responses", value=st.session_state.auto_speak, key="auto_speak_toggle", | |
on_change=lambda: setattr(st.session_state, "auto_speak", st.session_state.auto_speak_toggle)) | |
# Display chat history | |
for i, msg in enumerate(st.session_state.chat_history): | |
if msg["role"] == "user": | |
st.markdown(f'<div class="container"><div class="user-msg">You: {msg["content"]}</div></div>', unsafe_allow_html=True) | |
else: | |
msg_id = f"msg-{i}" | |
st.markdown(f'<div class="container"><div class="assistant-msg">HAL: <span id="{msg_id}">{msg["content"]}</span> <button class="speak-button" onclick="speakText(document.getElementById(\'{msg_id}\').textContent)">π</button></div></div>', unsafe_allow_html=True) | |
# Process user input | |
if user_input or speech_result: | |
# Prioritize speech result if available | |
query = speech_result if speech_result else user_input | |
# Get NASA data if applicable | |
nasa_data = get_nasa_data(query) | |
# Generate response | |
system_message = "You are HAL, an AI assistant specialized in NASA and space knowledge. Provide concise, factual responses." | |
response, st.session_state.chat_history = get_response(system_message, st.session_state.chat_history, query) | |
# Display NASA image if available | |
if nasa_data: | |
st.image(nasa_data["url"], caption=f"{nasa_data['title']} - {nasa_data['date']}") | |
st.write(nasa_data["explanation"]) | |
# Force a rerun to update the chat display | |
st.rerun() | |
# Add JavaScript to ensure the page scrolls to the bottom on new messages | |
st.markdown(""" | |
<script> | |
// Scroll to bottom of page on load | |
window.onload = function() { | |
window.scrollTo(0, document.body.scrollHeight); | |
} | |
</script> | |
""", unsafe_allow_html=True) |