Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import requests | |
| import torch | |
| import streamlit as st | |
| from langchain_huggingface import HuggingFaceEndpoint | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from transformers import pipeline | |
| from langdetect import detect # Ensure this package is installed | |
| # β Check for GPU or Default to CPU | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"β Using device: {device}") # Debugging info | |
| # β Environment Variables | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if HF_TOKEN is None: | |
| raise ValueError("HF_TOKEN is not set. Please add it to your environment variables.") | |
| NASA_API_KEY = os.getenv("NASA_API_KEY") | |
| if NASA_API_KEY is None: | |
| raise ValueError("NASA_API_KEY is not set. Please add it to your environment variables.") | |
| # β Set Up Streamlit | |
| st.set_page_config(page_title="HAL - NASA ChatBot", page_icon="π") | |
| # β Initialize Session State Variables (Ensuring Chat History Persists) | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [{"role": "assistant", "content": "Hello! I'm HAL, your NASA AI Assistant. You can speak to me directly or type your questions. How can I help you today?"}] | |
| if "auto_speak" not in st.session_state: | |
| st.session_state.auto_speak = True | |
| # β Initialize Hugging Face Model (Explicitly Set to CPU/GPU) | |
| def get_llm_hf_inference(model_id="meta-llama/Llama-2-7b-chat-hf", max_new_tokens=800, temperature=0.3): | |
| return HuggingFaceEndpoint( | |
| repo_id=model_id, | |
| max_new_tokens=max_new_tokens, | |
| temperature=temperature, # π₯ Lowered temperature for more factual and structured responses | |
| token=HF_TOKEN, | |
| task="text-generation", | |
| device=-1 if device == "cpu" else 0 # β Force CPU (-1) or GPU (0) | |
| ) | |
| # β Ensure English Responses | |
| def ensure_english(text): | |
| try: | |
| detected_lang = detect(text) | |
| if detected_lang != "en": | |
| return "β οΈ Sorry, I only respond in English. Can you rephrase your question?" | |
| except: | |
| return "β οΈ Language detection failed. Please ask your question again." | |
| return text | |
| # β Main Response Function (Fixing Repetition & Context) | |
| def get_response(system_message, chat_history, user_text, max_new_tokens=800): | |
| # β Ensure conversation history is included correctly | |
| filtered_history = "\n".join( | |
| f"{msg['role'].capitalize()}: {msg['content']}" | |
| for msg in chat_history[-5:] # β Only keep the last 5 exchanges to prevent overflow | |
| ) | |
| prompt = PromptTemplate.from_template( | |
| "[INST] You are a highly knowledgeable AI assistant. Answer concisely, avoid repetition, and structure responses well." | |
| "\n\nCONTEXT:\n{chat_history}\n" | |
| "\nLATEST USER INPUT:\nUser: {user_text}\n" | |
| "\n[END CONTEXT]\n" | |
| "Assistant:" | |
| ) | |
| # β Invoke Hugging Face Model | |
| hf = get_llm_hf_inference(max_new_tokens=max_new_tokens, temperature=0.3) # π₯ Lowered temperature | |
| chat = prompt | hf.bind(skip_prompt=True) | StrOutputParser(output_key='content') | |
| response = chat.invoke(input=dict(system_message=system_message, user_text=user_text, chat_history=filtered_history)) | |
| # Clean up the response - remove any "HAL:" prefix if present | |
| response = response.split("HAL:")[-1].strip() if "HAL:" in response else response.strip() | |
| response = ensure_english(response) | |
| if not response: | |
| response = "I'm sorry, but I couldn't generate a response. Can you rephrase your question?" | |
| # β Update conversation history | |
| chat_history.append({'role': 'user', 'content': user_text}) | |
| chat_history.append({'role': 'assistant', 'content': response}) | |
| # β Keep only last 10 exchanges to prevent unnecessary repetition | |
| return response, chat_history[-10:] | |
| # β NASA API Function to get space data | |
| def get_nasa_data(query): | |
| try: | |
| if "apod" in query.lower() or "picture of the day" in query.lower(): | |
| response = requests.get(f"https://api.nasa.gov/planetary/apod?api_key={NASA_API_KEY}") | |
| if response.status_code == 200: | |
| data = response.json() | |
| return { | |
| "title": data.get("title", "NASA Image"), | |
| "date": data.get("date", ""), | |
| "explanation": data.get("explanation", ""), | |
| "url": data.get("url", "") | |
| } | |
| return None | |
| except Exception as e: | |
| print(f"Error fetching NASA data: {e}") | |
| return None | |
| # β Streamlit UI | |
| st.title("π HAL - NASA AI Assistant") | |
| # β Add styles and speech recognition JavaScript | |
| st.markdown(""" | |
| <style> | |
| .user-msg, .assistant-msg { | |
| padding: 11px; | |
| border-radius: 10px; | |
| margin-bottom: 5px; | |
| width: fit-content; | |
| max-width: 80%; | |
| text-align: justify; | |
| } | |
| .user-msg { background-color: #696969; color: white; } | |
| .assistant-msg { background-color: #333333; color: white; } | |
| .container { display: flex; flex-direction: column; align-items: flex-start; } | |
| .speak-button { | |
| background-color: #2196F3; | |
| border: none; | |
| color: white; | |
| padding: 5px 10px; | |
| text-align: center; | |
| text-decoration: none; | |
| display: inline-block; | |
| font-size: 12px; | |
| margin: 2px 2px; | |
| cursor: pointer; | |
| border-radius: 12px; | |
| } | |
| .voice-indicator { | |
| display: inline-block; | |
| width: 12px; | |
| height: 12px; | |
| border-radius: 50%; | |
| margin-left: 8px; | |
| vertical-align: middle; | |
| background-color: #ccc; | |
| } | |
| .voice-indicator.active { | |
| background-color: #4CAF50; | |
| animation: pulse 1.5s infinite; | |
| } | |
| .status-bar { | |
| padding: 6px 12px; | |
| border-radius: 5px; | |
| background-color: #f1f1f1; | |
| display: flex; | |
| align-items: center; | |
| margin-bottom: 10px; | |
| font-size: 14px; | |
| } | |
| @keyframes pulse { | |
| 0% { opacity: 1; } | |
| 50% { opacity: 0.5; } | |
| 100% { opacity: 1; } | |
| } | |
| @media (max-width: 600px) { .user-msg, .assistant-msg { font-size: 16px; max-width: 100%; } } | |
| </style> | |
| <script> | |
| // Speech Recognition Setup with continuous mode | |
| let recognition; | |
| let isListening = false; | |
| let silenceTimer; | |
| let lastSpeechTime = Date.now(); | |
| let lastTranscript = ''; | |
| const SILENCE_THRESHOLD = 3000; // Submit after 3 seconds of silence | |
| function setupSpeechRecognition() { | |
| try { | |
| window.SpeechRecognition = window.SpeechRecognition || window.webkitSpeechRecognition; | |
| recognition = new SpeechRecognition(); | |
| recognition.lang = 'en-US'; | |
| recognition.interimResults = true; | |
| recognition.continuous = true; | |
| recognition.onstart = function() { | |
| isListening = true; | |
| document.getElementById('voice-indicator').classList.add('active'); | |
| document.getElementById('voice-status').textContent = 'Listening...'; | |
| }; | |
| recognition.onresult = function(event) { | |
| lastSpeechTime = Date.now(); | |
| clearTimeout(silenceTimer); | |
| // Get the latest transcript | |
| let interimTranscript = ''; | |
| let finalTranscript = ''; | |
| for (let i = event.resultIndex; i < event.results.length; i++) { | |
| const transcript = event.results[i][0].transcript; | |
| if (event.results[i].isFinal) { | |
| finalTranscript += transcript + ' '; | |
| } else { | |
| interimTranscript += transcript; | |
| } | |
| } | |
| // Update the hidden input with the latest transcript | |
| const speechResult = (finalTranscript || interimTranscript).trim(); | |
| if (speechResult && speechResult !== lastTranscript) { | |
| document.getElementById('speech-result').value = speechResult; | |
| document.getElementById('voice-status').textContent = 'I heard: ' + speechResult; | |
| lastTranscript = speechResult; | |
| // Set a timer to submit after silence | |
| silenceTimer = setTimeout(() => { | |
| if (speechResult) { | |
| document.getElementById('submit-speech').click(); | |
| lastTranscript = ''; | |
| document.getElementById('speech-result').value = ''; | |
| } | |
| }, SILENCE_THRESHOLD); | |
| } | |
| }; | |
| recognition.onerror = function(event) { | |
| console.error('Speech recognition error:', event.error); | |
| if (event.error === 'no-speech') { | |
| // Just restart listening if there was no speech detected | |
| restartRecognition(); | |
| } else { | |
| isListening = false; | |
| document.getElementById('voice-indicator').classList.remove('active'); | |
| document.getElementById('voice-status').textContent = 'Voice recognition paused. Click to restart.'; | |
| } | |
| }; | |
| recognition.onend = function() { | |
| // Auto restart if it ends unintentionally | |
| if (isListening) { | |
| restartRecognition(); | |
| } else { | |
| document.getElementById('voice-indicator').classList.remove('active'); | |
| document.getElementById('voice-status').textContent = 'Voice recognition disabled.'; | |
| } | |
| }; | |
| return true; | |
| } catch (error) { | |
| console.error('Speech recognition not supported:', error); | |
| document.getElementById('voice-status').textContent = 'Voice recognition not supported in this browser.'; | |
| return false; | |
| } | |
| } | |
| function toggleVoiceRecognition() { | |
| if (!recognition) { | |
| if (!setupSpeechRecognition()) { | |
| return; | |
| } | |
| } | |
| if (isListening) { | |
| recognition.stop(); | |
| isListening = false; | |
| document.getElementById('voice-indicator').classList.remove('active'); | |
| document.getElementById('voice-status').textContent = 'Voice recognition paused. Click to restart.'; | |
| } else { | |
| startRecognition(); | |
| } | |
| } | |
| function startRecognition() { | |
| try { | |
| recognition.start(); | |
| document.getElementById('voice-status').textContent = 'Listening...'; | |
| } catch (e) { | |
| console.error('Error starting recognition:', e); | |
| setTimeout(startRecognition, 200); | |
| } | |
| } | |
| function restartRecognition() { | |
| try { | |
| recognition.stop(); | |
| } catch (e) {} | |
| setTimeout(startRecognition, 200); | |
| } | |
| // Text-to-Speech functionality | |
| function speakText(text) { | |
| const utterance = new SpeechSynthesisUtterance(text); | |
| utterance.lang = 'en-US'; | |
| utterance.pitch = 1; | |
| utterance.rate = 1; | |
| window.speechSynthesis.speak(utterance); | |
| } | |
| // Auto speak the latest response | |
| function autoSpeakLatest() { | |
| const messages = document.querySelectorAll('.assistant-msg'); | |
| if (messages.length > 0) { | |
| const latestMessage = messages[messages.length - 1]; | |
| const messageId = latestMessage.querySelector('span[id^="msg-"]').id; | |
| speakText(document.getElementById(messageId).textContent); | |
| } | |
| } | |
| // Initialize after the page loads | |
| document.addEventListener('DOMContentLoaded', function() { | |
| setupSpeechRecognition(); | |
| // Start listening automatically | |
| setTimeout(startRecognition, 1000); | |
| }); | |
| // Handle speech input submission | |
| document.getElementById('submit-speech').addEventListener('click', function() { | |
| const speechResult = document.getElementById('speech-result').value; | |
| if (speechResult) { | |
| // Update the Streamlit text input with the speech result | |
| const textInputs = document.querySelectorAll('input[type="text"]'); | |
| if (textInputs.length > 0) { | |
| const lastInput = textInputs[0]; | |
| lastInput.value = speechResult; | |
| lastInput.dispatchEvent(new Event('input', { bubbles: true })); | |
| // Find and click the submit button | |
| setTimeout(() => { | |
| const buttons = document.querySelectorAll('button[kind="primaryForm"]'); | |
| for (const button of buttons) { | |
| if (button.textContent.includes('Submit')) { | |
| button.click(); | |
| break; | |
| } | |
| } | |
| }, 100); | |
| } | |
| } | |
| }); | |
| // Auto-speak for newest message if enabled | |
| function checkForNewMessages() { | |
| const autoSpeakEnabled = document.querySelector('input[type="checkbox"][aria-label="Auto-speak responses"]').checked; | |
| if (autoSpeakEnabled) { | |
| const messages = document.querySelectorAll('.assistant-msg'); | |
| if (messages.length > 0) { | |
| const latestMessage = messages[messages.length - 1]; | |
| const messageId = latestMessage.querySelector('span[id^="msg-"]').id; | |
| // Only speak if this is a new message | |
| if (!latestMessage.hasAttribute('data-spoken')) { | |
| speakText(document.getElementById(messageId).textContent); | |
| latestMessage.setAttribute('data-spoken', 'true'); | |
| } | |
| } | |
| } | |
| } | |
| // Check for new messages every second | |
| setInterval(checkForNewMessages, 1000); | |
| </script> | |
| """, unsafe_allow_html=True) | |
| # Add voice status indicator | |
| st.markdown(""" | |
| <div class="status-bar"> | |
| <span id="voice-status">Initializing voice recognition...</span> | |
| <span id="voice-indicator" class="voice-indicator" onclick="toggleVoiceRecognition()"></span> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Regular text input | |
| user_input = st.chat_input("Type your message here or just speak...") | |
| # Hidden input for speech results | |
| speech_result = st.text_input("Speech Result", key="speech_input", label_visibility="collapsed") | |
| # Hidden button to submit speech | |
| st.markdown('<button id="submit-speech" style="display:none;">Submit Speech</button>', unsafe_allow_html=True) | |
| # Auto-speak toggle | |
| st.checkbox("Auto-speak responses", value=st.session_state.auto_speak, key="auto_speak_toggle", | |
| on_change=lambda: setattr(st.session_state, "auto_speak", st.session_state.auto_speak_toggle)) | |
| # Display chat history | |
| for i, msg in enumerate(st.session_state.chat_history): | |
| if msg["role"] == "user": | |
| st.markdown(f'<div class="container"><div class="user-msg">You: {msg["content"]}</div></div>', unsafe_allow_html=True) | |
| else: | |
| msg_id = f"msg-{i}" | |
| st.markdown(f'<div class="container"><div class="assistant-msg">HAL: <span id="{msg_id}">{msg["content"]}</span> <button class="speak-button" onclick="speakText(document.getElementById(\'{msg_id}\').textContent)">π</button></div></div>', unsafe_allow_html=True) | |
| # Process user input | |
| if user_input or speech_result: | |
| # Prioritize speech result if available | |
| query = speech_result if speech_result else user_input | |
| # Get NASA data if applicable | |
| nasa_data = get_nasa_data(query) | |
| # Generate response | |
| system_message = "You are HAL, an AI assistant specialized in NASA and space knowledge. Provide concise, factual responses." | |
| response, st.session_state.chat_history = get_response(system_message, st.session_state.chat_history, query) | |
| # Display NASA image if available | |
| if nasa_data: | |
| st.image(nasa_data["url"], caption=f"{nasa_data['title']} - {nasa_data['date']}") | |
| st.write(nasa_data["explanation"]) | |
| # Force a rerun to update the chat display | |
| st.rerun() | |
| # Add JavaScript to ensure the page scrolls to the bottom on new messages | |
| st.markdown(""" | |
| <script> | |
| // Scroll to bottom of page on load | |
| window.onload = function() { | |
| window.scrollTo(0, document.body.scrollHeight); | |
| } | |
| </script> | |
| """, unsafe_allow_html=True) |