import streamlit as st
from openai import OpenAI
import time
import os
import uuid
import firebase_admin
from firebase_admin import credentials, firestore
# ------------------ Authentication ------------------
VALID_USERS = {
"andrew@lortechnologies.com": "Pass.123",
}
def login():
st.title("🔐 Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# 🔐 Firebase setup
if not firebase_admin._apps:
cred = credentials.Certificate("firebase-service-account.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# 🔐 OpenAI setup
openai_key = os.getenv("openai_key")
assistant_id = os.getenv("ASSISTANT_ID")
client = OpenAI(api_key=openai_key)
# 🌐 Streamlit Config
st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide")
# 🎯 Session + User ID
if "user_id" not in st.session_state:
st.session_state["user_id"] = str(uuid.uuid4())
user_id = st.session_state["user_id"]
# 🖼️ LOR Branding + Styling
st.markdown("""
""", unsafe_allow_html=True)
st.markdown("""
Powered by LOR Technologies
""", unsafe_allow_html=True)
# 🔁 Get or create a thread ID
def get_or_create_thread_id():
doc_ref = db.collection("users").document(user_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()["thread_id"]
else:
thread = client.beta.threads.create()
doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP})
return thread.id
# 💾 Save a message
def save_message(role, content):
db.collection("users").document(user_id).collection("messages").add({
"role": role,
"content": content,
"timestamp": firestore.SERVER_TIMESTAMP
})
# 💬 Display chat history
def display_chat_history():
messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream()
assistant_icon_html = "
"
for msg in list(messages)[::-1]:
data = msg.to_dict()
if data["role"] == "user":
st.markdown(f"👤 You: {data['content']}
", unsafe_allow_html=True)
else:
st.markdown(f"{assistant_icon_html} LOR Assistant: {data['content']}
", unsafe_allow_html=True)
# 🚀 Main Chat UI
input_col, clear_col = st.columns([9, 1])
with input_col:
user_input = st.chat_input("Type your message here...")
with clear_col:
if st.button("🗑️", key="clear-chat", help="Clear Chat"):
try:
user_doc_ref = db.collection("users").document(user_id)
for msg in user_doc_ref.collection("messages").stream():
msg.reference.delete()
user_doc_ref.delete()
st.session_state.clear()
st.rerun()
except Exception as e:
st.error(f"Failed to clear chat: {e}")
thread_id = get_or_create_thread_id()
display_chat_history()
if user_input:
# Send user message to OpenAI thread
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input)
save_message("user", user_input)
with st.spinner("Thinking and typing... 💭"):
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id)
while True:
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages_response = client.beta.threads.messages.list(thread_id=thread_id)
latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1]
assistant_message = latest_response.content[0].text.value
save_message("assistant", assistant_message)
time.sleep(0.5)
st.rerun()