import streamlit as st from openai import OpenAI import time import os import uuid import firebase_admin from firebase_admin import credentials, firestore # 🔐 Firebase setup if not firebase_admin._apps: cred = credentials.Certificate("firebase-service-account.json") firebase_admin.initialize_app(cred) db = firestore.client() # 🔐 OpenAI setup openai_key = os.getenv("openai_key") assistant_id = os.getenv("assistant_id") client = OpenAI(api_key=openai_key) # 🌐 Streamlit Config st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide") # 🎯 Session + User ID if "user_id" not in st.session_state: st.session_state["user_id"] = str(uuid.uuid4()) user_id = st.session_state["user_id"] # 🖼️ LORTech Branding + Styling st.markdown(""" """, unsafe_allow_html=True) st.markdown("""
Powered by LOR Technologies
""", unsafe_allow_html=True) # 🔁 Get or create a thread ID def get_or_create_thread_id(): doc_ref = db.collection("users").document(user_id) doc = doc_ref.get() if doc.exists: return doc.to_dict()["thread_id"] else: thread = client.beta.threads.create() doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP}) return thread.id # 💾 Save a message def save_message(role, content): db.collection("users").document(user_id).collection("messages").add({ "role": role, "content": content, "timestamp": firestore.SERVER_TIMESTAMP }) # 💬 Display chat history def display_chat_history(): messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream() assistant_icon_html = "" for msg in list(messages)[::-1]: data = msg.to_dict() if data["role"] == "user": st.markdown(f"
👤 You: {data['content']}
", unsafe_allow_html=True) else: st.markdown(f"
{assistant_icon_html} LORAIN: {data['content']}
", unsafe_allow_html=True) # --- Main Chat UI --- input_col, clear_col = st.columns([9, 1]) with input_col: user_input = st.chat_input("Type your message here...") with clear_col: if st.button("🗑️", key="clear-chat", help="Clear Chat"): try: user_doc_ref = db.collection("users").document(user_id) for msg in user_doc_ref.collection("messages").stream(): msg.reference.delete() user_doc_ref.delete() st.session_state.clear() st.rerun() except Exception as e: st.error(f"Failed to clear chat: {e}") thread_id = get_or_create_thread_id() display_chat_history() if "mute_voice" not in st.session_state: st.session_state["mute_voice"] = False if "last_tts_text" not in st.session_state: st.session_state["last_tts_text"] = "" def synthesize_voice(text, user_id): audio_path = f"output_{user_id}.mp3" # Only synthesize if text changed or file doesn't exist if st.session_state["last_tts_text"] != text or not os.path.exists(audio_path): with st.spinner("Synthesizing voice with GPT-4o..."): speech_response = client.audio.speech.create( model="tts-1", voice="nova", input=text, response_format="mp3" ) with open(audio_path, "wb") as f: f.write(speech_response.content) st.session_state["last_tts_text"] = text return audio_path if user_input: # Send user message to OpenAI thread client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input) save_message("user", user_input) with st.spinner("Thinking and typing... 💭"): run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id) while True: run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) if run_status.status == "completed": break time.sleep(1) messages_response = client.beta.threads.messages.list(thread_id=thread_id) latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1] assistant_message = latest_response.content[0].text.value save_message("assistant", assistant_message) # Voice autoplay unless muted mute_voice = st.session_state.get("mute_voice", False) audio_path = synthesize_voice(assistant_message, user_id) if not mute_voice else None if not mute_voice and audio_path: st.audio(audio_path, format="audio/mp3", autoplay=True) elif mute_voice: st.info("🔇 Voice is muted. To enable assistant speech, click 'Unmute Voice' below and ask another question.") # Single mute/unmute button if not mute_voice: if st.button("🔇 Mute Voice"): st.session_state["mute_voice"] = True st.rerun() else: if st.button("🔊 Unmute Voice"): st.session_state["mute_voice"] = False st.rerun() time.sleep(0.5) st.rerun()