Spaces:
Running
Running
File size: 8,568 Bytes
59d778e 0b3f9f3 0e1b8a9 0b3f9f3 0e1b8a9 0b3f9f3 0e1b8a9 0b3f9f3 0e1b8a9 98e1cd9 0e1b8a9 0b3f9f3 acc6be8 0e1b8a9 0b3f9f3 0e1b8a9 0b3f9f3 0e1b8a9 9cf31e7 0e1b8a9 6cff19d acc6be8 0e1b8a9 0b3f9f3 0e1b8a9 c6aeced 0e1b8a9 e98df8c 0e1b8a9 0b3f9f3 b3b692e 0e1b8a9 0b3f9f3 0e1b8a9 05eed53 0fc17c0 0b3f9f3 b3b692e 0b3f9f3 b3b692e 0b3f9f3 b3b692e 0b3f9f3 b3b692e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import streamlit as st
import asyncio
import edge_tts
import time
import os
import uuid
import firebase_admin
from firebase_admin import credentials, firestore
from openai import OpenAI
# ---- Firebase setup ----
if not firebase_admin._apps:
cred = credentials.Certificate("firebase-service-account.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# ---- OpenAI setup ----
openai_key = os.getenv("openai_key")
assistant_id = os.getenv("assistant_id")
client = OpenAI(api_key=openai_key)
# ---- Edge TTS voices ----
VOICE_OPTIONS = {
"Jenny (US, Female)": "en-US-JennyNeural",
"Aria (US, Female)": "en-US-AriaNeural",
"Ryan (UK, Male)": "en-GB-RyanNeural",
"Natasha (AU, Female)": "en-AU-NatashaNeural",
"William (AU, Male)": "en-AU-WilliamNeural",
"Libby (UK, Female)": "en-GB-LibbyNeural",
"Leah (SA, Female)": "en-ZA-LeahNeural",
"Luke (SA, Male)": "en-ZA-LukeNeural"
}
# --- Streamlit Config ---
st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide")
# --- User/session state ---
if "user_id" not in st.session_state:
st.session_state["user_id"] = str(uuid.uuid4())
user_id = st.session_state["user_id"]
if "mute_voice" not in st.session_state:
st.session_state["mute_voice"] = False
if "last_tts_text" not in st.session_state:
st.session_state["last_tts_text"] = ""
if "last_audio_path" not in st.session_state:
st.session_state["last_audio_path"] = ""
if "selected_voice" not in st.session_state:
st.session_state["selected_voice"] = "Jenny (US, Female)"
# --- Sidebar for Voice Selection ---
with st.sidebar:
st.markdown("### Voice Settings")
selected_voice = st.selectbox("Select assistant voice", list(VOICE_OPTIONS.keys()), index=list(VOICE_OPTIONS.keys()).index(st.session_state["selected_voice"]))
st.session_state["selected_voice"] = selected_voice
# --- Branding & Styling ---
st.markdown("""
<style>
.block-container {padding-top: 1rem; padding-bottom: 0rem;}
header {visibility: hidden;}
.stChatMessage { max-width: 85%; border-radius: 12px; padding: 8px; margin-bottom: 10px; }
.stChatMessage[data-testid="stChatMessage-user"] { background: #f0f0f0; color: #000000; }
.stChatMessage[data-testid="stChatMessage-assistant"] { background: #e3f2fd; color: #000000; }
.lt-logo { vertical-align: middle; }
</style>
""", unsafe_allow_html=True)
st.markdown("""
<div style='text-align: center; margin-top: 20px; margin-bottom: -10px;'>
<span style='display: inline-flex; align-items: center; gap: 8px;'>
<img src='https://lortechnologies.com/wp-content/uploads/2023/03/LOR-Online-Logo.svg' width='100' class='lor-logo'/>
<span style='font-size: 12px; color: gray;'>Powered by LOR Technologies</span>
</span>
</div>
""", unsafe_allow_html=True)
# --- Firestore helpers ---
def get_or_create_thread_id():
doc_ref = db.collection("users").document(user_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()["thread_id"]
else:
thread = client.beta.threads.create()
doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP})
return thread.id
def save_message(role, content):
db.collection("users").document(user_id).collection("messages").add({
"role": role,
"content": content,
"timestamp": firestore.SERVER_TIMESTAMP
})
def display_chat_history():
messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream()
assistant_icon_html = "<img src='https://lortechnologies.com/wp-content/uploads/2023/03/LOR-Online-Logo.svg' width='20' style='vertical-align:middle;'/>"
for msg in list(messages)[::-1]:
data = msg.to_dict()
if data["role"] == "user":
st.markdown(f"<div class='stChatMessage' data-testid='stChatMessage-user'>π€ <strong>You:</strong> {data['content']}</div>", unsafe_allow_html=True)
else:
st.markdown(f"<div class='stChatMessage' data-testid='stChatMessage-assistant'>{assistant_icon_html} <strong>LORAIN:</strong> {data['content']}</div>", unsafe_allow_html=True)
# --- Edge TTS synth ---
async def edge_tts_synthesize(text, voice, user_id):
out_path = f"output_{user_id}.mp3"
communicate = edge_tts.Communicate(text, voice)
await communicate.save(out_path)
return out_path
def synthesize_voice(text, voice_key, user_id):
voice = VOICE_OPTIONS[voice_key]
out_path = f"output_{user_id}.mp3"
# Only synthesize if text changed or file missing or voice changed
if st.session_state["last_tts_text"] != text or not os.path.exists(out_path) or st.session_state.get("last_voice") != voice:
with st.spinner(f"Generating voice ({voice_key})..."):
asyncio.run(edge_tts_synthesize(text, voice, user_id))
st.session_state["last_tts_text"] = text
st.session_state["last_audio_path"] = out_path
st.session_state["last_voice"] = voice
return out_path
# --- Main Chat UI ---
input_col, clear_col = st.columns([9, 1])
with input_col:
user_input = st.chat_input("Type your message here...")
with clear_col:
if st.button("ποΈ", key="clear-chat", help="Clear Chat"):
try:
user_doc_ref = db.collection("users").document(user_id)
for msg in user_doc_ref.collection("messages").stream():
msg.reference.delete()
user_doc_ref.delete()
st.session_state.clear()
st.rerun()
except Exception as e:
st.error(f"Failed to clear chat: {e}")
thread_id = get_or_create_thread_id()
display_chat_history()
if user_input:
# --- OpenAI Assistant Response ---
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input)
save_message("user", user_input)
with st.spinner("Thinking and typing... π"):
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id)
while True:
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages_response = client.beta.threads.messages.list(thread_id=thread_id)
latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1]
assistant_message = latest_response.content[0].text.value
save_message("assistant", assistant_message)
# --- TTS: Speak unless muted ---
mute_voice = st.session_state.get("mute_voice", False)
audio_path = None
if not mute_voice and assistant_message.strip():
audio_path = synthesize_voice(assistant_message, st.session_state["selected_voice"], user_id)
st.session_state["last_audio_path"] = audio_path
st.audio(audio_path, format="audio/mp3", autoplay=True)
elif mute_voice:
st.info("π Voice is muted. Click Unmute below to enable assistant speech.")
# --- Controls (Mute/Unmute/Replay) ---
col1, col2 = st.columns([1, 1])
with col1:
if not mute_voice and st.button("π Mute Voice"):
st.session_state["mute_voice"] = True
st.rerun()
elif mute_voice and st.button("π Unmute Voice"):
st.session_state["mute_voice"] = False
st.rerun()
with col2:
# Replay button: Always available if last_audio_path exists
if st.session_state.get("last_audio_path") and os.path.exists(st.session_state["last_audio_path"]):
if st.button("π Replay Voice"):
st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=True)
time.sleep(0.2)
st.rerun()
else:
# Always show last audio with replay if available
if st.session_state.get("last_audio_path") and os.path.exists(st.session_state["last_audio_path"]) and not st.session_state["mute_voice"]:
st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=False)
# Controls: Only show Replay when idle
if st.button("π Replay Last Voice"):
st.audio(st.session_state["last_audio_path"], format="audio/mp3", autoplay=True)
# Show mute/unmute in idle state too
if not st.session_state["mute_voice"]:
if st.button("π Mute Voice"):
st.session_state["mute_voice"] = True
st.rerun()
else:
if st.button("π Unmute Voice"):
st.session_state["mute_voice"] = False
st.rerun()
|