import streamlit as st from openai import OpenAI import time import os import uuid import firebase_admin from firebase_admin import credentials, firestore # ------------------ Authentication ------------------ VALID_USERS = { "andrew@lortechnologies.com": "Pass.123", } def login(): st.title("🔐 Login Required") email = st.text_input("Email") password = st.text_input("Password", type="password") if st.button("Login"): if VALID_USERS.get(email) == password: st.session_state.authenticated = True st.rerun() else: st.error("❌ Incorrect email or password.") if "authenticated" not in st.session_state: st.session_state.authenticated = False if not st.session_state.authenticated: login() st.stop() # 🔐 Firebase setup if not firebase_admin._apps: cred = credentials.Certificate("firebase-service-account.json") firebase_admin.initialize_app(cred) db = firestore.client() # 🔐 OpenAI setup openai_key = os.getenv("openai_key") assistant_id = os.getenv("ASSISTANT_ID") client = OpenAI(api_key=openai_key) # 🌐 Streamlit Config st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide") # 🎯 Session + User ID if "user_id" not in st.session_state: st.session_state["user_id"] = str(uuid.uuid4()) user_id = st.session_state["user_id"] # 🖼️ LOR Branding + Styling st.markdown(""" """, unsafe_allow_html=True) st.markdown("""
Powered by LOR Technologies
""", unsafe_allow_html=True) # 🔁 Get or create a thread ID def get_or_create_thread_id(): doc_ref = db.collection("users").document(user_id) doc = doc_ref.get() if doc.exists: return doc.to_dict()["thread_id"] else: thread = client.beta.threads.create() doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP}) return thread.id # 💾 Save a message def save_message(role, content): db.collection("users").document(user_id).collection("messages").add({ "role": role, "content": content, "timestamp": firestore.SERVER_TIMESTAMP }) # 💬 Display chat history def display_chat_history(): messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream() assistant_icon_html = "" for msg in list(messages)[::-1]: data = msg.to_dict() if data["role"] == "user": st.markdown(f"
👤 You: {data['content']}
", unsafe_allow_html=True) else: st.markdown(f"
{assistant_icon_html} LOR Assistant: {data['content']}
", unsafe_allow_html=True) # 🚀 Main Chat UI input_col, clear_col = st.columns([9, 1]) with input_col: user_input = st.chat_input("Type your message here...") with clear_col: if st.button("🗑️", key="clear-chat", help="Clear Chat"): try: user_doc_ref = db.collection("users").document(user_id) for msg in user_doc_ref.collection("messages").stream(): msg.reference.delete() user_doc_ref.delete() st.session_state.clear() st.rerun() except Exception as e: st.error(f"Failed to clear chat: {e}") thread_id = get_or_create_thread_id() display_chat_history() if user_input: # Send user message to OpenAI thread client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input) save_message("user", user_input) with st.spinner("Thinking and typing... 💭"): run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id) while True: run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) if run_status.status == "completed": break time.sleep(1) messages_response = client.beta.threads.messages.list(thread_id=thread_id) latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1] assistant_message = latest_response.content[0].text.value save_message("assistant", assistant_message) time.sleep(0.5) st.rerun()