import streamlit as st import asyncio import edge_tts import time import os import uuid import firebase_admin from firebase_admin import credentials, firestore from openai import OpenAI # ---- Firebase setup ---- if not firebase_admin._apps: cred = credentials.Certificate("firebase-service-account.json") firebase_admin.initialize_app(cred) db = firestore.client() # ---- OpenAI setup ---- openai_key = os.getenv("openai_key") assistant_id = os.getenv("assistant_id") client = OpenAI(api_key=openai_key) # ---- Edge TTS voices ---- VOICE_OPTIONS = { "Jenny (US, Female)": "en-US-JennyNeural", "Aria (US, Female)": "en-US-AriaNeural", "Ryan (UK, Male)": "en-GB-RyanNeural", "Natasha (AU, Female)": "en-AU-NatashaNeural", "William (AU, Male)": "en-AU-WilliamNeural", "Libby (UK, Female)": "en-GB-LibbyNeural", "Leah (SA, Female)": "en-ZA-LeahNeural", "Luke (SA, Male)": "en-ZA-LukeNeural" } # --- Streamlit Config --- st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide") # --- User/session state --- if "user_id" not in st.session_state: st.session_state["user_id"] = str(uuid.uuid4()) user_id = st.session_state["user_id"] if "mute_voice" not in st.session_state: st.session_state["mute_voice"] = False if "last_tts_text" not in st.session_state: st.session_state["last_tts_text"] = "" if "last_audio_path" not in st.session_state: st.session_state["last_audio_path"] = "" if "selected_voice" not in st.session_state: st.session_state["selected_voice"] = "Jenny (US, Female)" # --- Sidebar for Voice Selection --- with st.sidebar: st.markdown("### Voice Settings") selected_voice = st.selectbox("Select assistant voice", list(VOICE_OPTIONS.keys()), index=list(VOICE_OPTIONS.keys()).index(st.session_state["selected_voice"])) st.session_state["selected_voice"] = selected_voice # --- Branding & Styling --- st.markdown(""" """, unsafe_allow_html=True) st.markdown("""
Powered by LOR Technologies
""", unsafe_allow_html=True) # --- Firestore helpers --- def get_or_create_thread_id(): doc_ref = db.collection("users").document(user_id) doc = doc_ref.get() if doc.exists: return doc.to_dict()["thread_id"] else: thread = client.beta.threads.create() doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP}) return thread.id def save_message(role, content): db.collection("users").document(user_id).collection("messages").add({ "role": role, "content": content, "timestamp": firestore.SERVER_TIMESTAMP }) def display_chat_history(): messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream() assistant_icon_html = "" for msg in list(messages)[::-1]: data = msg.to_dict() if data["role"] == "user": st.markdown(f"
👤 You: {data['content']}
", unsafe_allow_html=True) else: st.markdown(f"
{assistant_icon_html} LORAIN: {data['content']}
", unsafe_allow_html=True) # --- Edge TTS synth --- async def edge_tts_synthesize(text, voice, user_id): out_path = f"output_{user_id}.mp3" communicate = edge_tts.Communicate(text, voice) await communicate.save(out_path) return out_path def synthesize_voice(text, voice_key, user_id): voice = VOICE_OPTIONS[voice_key] out_path = f"output_{user_id}.mp3" # Only synthesize if text changed or file missing or voice changed if st.session_state["last_tts_text"] != text or not os.path.exists(out_path) or st.session_state.get("last_voice") != voice: with st.spinner(f"Generating voice ({voice_key})..."): asyncio.run(edge_tts_synthesize(text, voice, user_id)) st.session_state["last_tts_text"] = text st.session_state["last_audio_path"] = out_path st.session_state["last_voice"] = voice return out_path # --- Main Chat UI --- input_col, clear_col = st.columns([9, 1]) with input_col: user_input = st.chat_input("Type your message here...") with clear_col: if st.button("🗑️", key="clear-chat", help="Clear Chat"): try: user_doc_ref = db.collection("users").document(user_id) for msg in user_doc_ref.collection("messages").stream(): msg.reference.delete() user_doc_ref.delete() st.session_state.clear() st.rerun() except Exception as e: st.error(f"Failed to clear chat: {e}") thread_id = get_or_create_thread_id() display_chat_history() if user_input: # --- OpenAI Assistant Response --- client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input) save_message("user", user_input) with st.spinner("Thinking and typing... 💭"): run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id) while True: run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) if run_status.status == "completed": break time.sleep(1) messages_response = client.beta.threads.messages.list(thread_id=thread_id) latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1] assistant_message = latest_response.content[0].text.value save_message("assistant", assistant_message) # --- TTS: Speak unless muted --- mute_voice = st.session_state.get("mute_voice", False) audio_path = None if not mute_voice and assistant_message.strip(): audio_path = synthesize_voice(assistant_message, st.session_state["selected_voice"], user_id) st.session_state["last_audio_path"] = audio_path st.audio(audio_path, format="audio/mp3", autoplay=True) elif mute_voice: st.info("🔇 Voice is muted. Click Unmute below to enable assistant speech.") # --- Controls (Mute/Unmute/Replay) --- col1, col2 = st.columns([1, 1]) with col1: if not mute_voice and st.button("🔇 Mute Voice"): st.session_state["mute_voice"] = True st.rerun() elif mute_voice and st.button("🔊 Unmute Voice"): st.session_state["mute_voice"] = False st.rerun() with col2: # Replay button: Always available if last_audio_path exists if st.session_state.get("last_audio_path") and os.path.exists(st.session_state["last_audio_path"]): if st.button("🔁 Replay Voice"): st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=True) time.sleep(0.2) st.rerun() else: # Always show last audio with replay if available if st.session_state.get("last_audio_path") and os.path.exists(st.session_state["last_audio_path"]) and not st.session_state["mute_voice"]: st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=False) # Controls: Only show Replay when idle if st.button("🔁 Replay Last Voice"): st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=True) # Show mute/unmute in idle state too if not st.session_state["mute_voice"]: if st.button("🔇 Mute Voice"): st.session_state["mute_voice"] = True st.rerun() else: if st.button("🔊 Unmute Voice"): st.session_state["mute_voice"] = False st.rerun()