import streamlit as st
import asyncio
import edge_tts
import time
import os
import uuid
import firebase_admin
from firebase_admin import credentials, firestore
from openai import OpenAI
# ---- Firebase setup ----
if not firebase_admin._apps:
cred = credentials.Certificate("firebase-service-account.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# ---- OpenAI setup ----
openai_key = os.getenv("openai_key")
assistant_id = os.getenv("assistant_id")
client = OpenAI(api_key=openai_key)
# ---- Edge TTS voices ----
VOICE_OPTIONS = {
"Jenny (US, Female)": "en-US-JennyNeural",
"Aria (US, Female)": "en-US-AriaNeural",
"Ryan (UK, Male)": "en-GB-RyanNeural",
"Natasha (AU, Female)": "en-AU-NatashaNeural",
"William (AU, Male)": "en-AU-WilliamNeural",
"Libby (UK, Female)": "en-GB-LibbyNeural",
"Leah (SA, Female)": "en-ZA-LeahNeural",
"Luke (SA, Male)": "en-ZA-LukeNeural"
}
# --- Streamlit Config ---
st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide")
# --- User/session state ---
if "user_id" not in st.session_state:
st.session_state["user_id"] = str(uuid.uuid4())
user_id = st.session_state["user_id"]
if "mute_voice" not in st.session_state:
st.session_state["mute_voice"] = False
if "last_tts_text" not in st.session_state:
st.session_state["last_tts_text"] = ""
if "last_audio_path" not in st.session_state:
st.session_state["last_audio_path"] = ""
if "selected_voice" not in st.session_state:
st.session_state["selected_voice"] = "Jenny (US, Female)"
# --- Sidebar for Voice Selection ---
with st.sidebar:
st.markdown("### Voice Settings")
selected_voice = st.selectbox("Select assistant voice", list(VOICE_OPTIONS.keys()), index=list(VOICE_OPTIONS.keys()).index(st.session_state["selected_voice"]))
st.session_state["selected_voice"] = selected_voice
# --- Branding & Styling ---
st.markdown("""
""", unsafe_allow_html=True)
st.markdown("""
Powered by LOR Technologies
""", unsafe_allow_html=True)
# --- Firestore helpers ---
def get_or_create_thread_id():
doc_ref = db.collection("users").document(user_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()["thread_id"]
else:
thread = client.beta.threads.create()
doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP})
return thread.id
def save_message(role, content):
db.collection("users").document(user_id).collection("messages").add({
"role": role,
"content": content,
"timestamp": firestore.SERVER_TIMESTAMP
})
def display_chat_history():
messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream()
assistant_icon_html = "
"
for msg in list(messages)[::-1]:
data = msg.to_dict()
if data["role"] == "user":
st.markdown(f"👤 You: {data['content']}
", unsafe_allow_html=True)
else:
st.markdown(f"{assistant_icon_html} LORAIN: {data['content']}
", unsafe_allow_html=True)
# --- Edge TTS synth ---
async def edge_tts_synthesize(text, voice, user_id):
out_path = f"output_{user_id}.mp3"
communicate = edge_tts.Communicate(text, voice)
await communicate.save(out_path)
return out_path
def synthesize_voice(text, voice_key, user_id):
voice = VOICE_OPTIONS[voice_key]
out_path = f"output_{user_id}.mp3"
# Only synthesize if text changed or file missing or voice changed
if st.session_state["last_tts_text"] != text or not os.path.exists(out_path) or st.session_state.get("last_voice") != voice:
with st.spinner(f"Generating voice ({voice_key})..."):
asyncio.run(edge_tts_synthesize(text, voice, user_id))
st.session_state["last_tts_text"] = text
st.session_state["last_audio_path"] = out_path
st.session_state["last_voice"] = voice
return out_path
# --- Main Chat UI ---
input_col, clear_col = st.columns([9, 1])
with input_col:
user_input = st.chat_input("Type your message here...")
with clear_col:
if st.button("🗑️", key="clear-chat", help="Clear Chat"):
try:
user_doc_ref = db.collection("users").document(user_id)
for msg in user_doc_ref.collection("messages").stream():
msg.reference.delete()
user_doc_ref.delete()
st.session_state.clear()
st.rerun()
except Exception as e:
st.error(f"Failed to clear chat: {e}")
thread_id = get_or_create_thread_id()
display_chat_history()
if user_input:
# --- OpenAI Assistant Response ---
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input)
save_message("user", user_input)
with st.spinner("Thinking and typing... 💭"):
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id)
while True:
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages_response = client.beta.threads.messages.list(thread_id=thread_id)
latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1]
assistant_message = latest_response.content[0].text.value
save_message("assistant", assistant_message)
# --- TTS: Speak unless muted ---
mute_voice = st.session_state.get("mute_voice", False)
audio_path = None
if not mute_voice and assistant_message.strip():
audio_path = synthesize_voice(assistant_message, st.session_state["selected_voice"], user_id)
st.session_state["last_audio_path"] = audio_path
st.audio(audio_path, format="audio/mp3", autoplay=True)
elif mute_voice:
st.info("🔇 Voice is muted. Click Unmute below to enable assistant speech.")
# --- Controls (Mute/Unmute/Replay) ---
col1, col2 = st.columns([1, 1])
with col1:
if not mute_voice and st.button("🔇 Mute Voice"):
st.session_state["mute_voice"] = True
st.rerun()
elif mute_voice and st.button("🔊 Unmute Voice"):
st.session_state["mute_voice"] = False
st.rerun()
with col2:
# Replay button: Always available if last_audio_path exists
if st.session_state.get("last_audio_path") and os.path.exists(st.session_state["last_audio_path"]):
if st.button("🔁 Replay Voice"):
st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=True)
time.sleep(0.2)
st.rerun()
else:
# Always show last audio with replay if available
if st.session_state.get("last_audio_path") and os.path.exists(st.session_state["last_audio_path"]) and not st.session_state["mute_voice"]:
st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=False)
# Controls: Only show Replay when idle
if st.button("🔁 Replay Last Voice"):
st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=True)
# Show mute/unmute in idle state too
if not st.session_state["mute_voice"]:
if st.button("🔇 Mute Voice"):
st.session_state["mute_voice"] = True
st.rerun()
else:
if st.button("🔊 Unmute Voice"):
st.session_state["mute_voice"] = False
st.rerun()