Spaces:
Running
Running
File size: 7,480 Bytes
59d778e 0b3f9f3 0e1b8a9 0b3f9f3 0e1b8a9 0b3f9f3 0e1b8a9 0b3f9f3 0e1b8a9 98e1cd9 0e1b8a9 0b3f9f3 acc6be8 0e1b8a9 0b3f9f3 0e1b8a9 9cf31e7 ba79a7e 0e1b8a9 6cff19d acc6be8 0e1b8a9 ba79a7e 0b3f9f3 0e1b8a9 7ba5a66 0e1b8a9 e98df8c 0e1b8a9 0b3f9f3 ba79a7e 0e1b8a9 ba79a7e 0e1b8a9 0b3f9f3 0e1b8a9 05eed53 0fc17c0 0b3f9f3 b3b692e 0b3f9f3 ba79a7e b3b692e 0b3f9f3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
import streamlit as st
import asyncio
import edge_tts
import time
import os
import uuid
import firebase_admin
from firebase_admin import credentials, firestore
from openai import OpenAI
# ---- Firebase setup ----
if not firebase_admin._apps:
cred = credentials.Certificate("firebase-service-account.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# ---- OpenAI setup ----
openai_key = os.getenv("openai_key")
assistant_id = os.getenv("assistant_id")
client = OpenAI(api_key=openai_key)
# ---- Edge TTS voices ----
VOICE_OPTIONS = {
"Jenny (US, Female)": "en-US-JennyNeural",
"Aria (US, Female)": "en-US-AriaNeural",
"Ryan (UK, Male)": "en-GB-RyanNeural",
"Natasha (AU, Female)": "en-AU-NatashaNeural",
"William (AU, Male)": "en-AU-WilliamNeural",
"Libby (UK, Female)": "en-GB-LibbyNeural",
"Leah (SA, Female)": "en-ZA-LeahNeural",
"Luke (SA, Male)": "en-ZA-LukeNeural"
}
st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide")
if "user_id" not in st.session_state:
st.session_state["user_id"] = str(uuid.uuid4())
user_id = st.session_state["user_id"]
if "mute_voice" not in st.session_state:
st.session_state["mute_voice"] = False
if "last_tts_text" not in st.session_state:
st.session_state["last_tts_text"] = ""
if "last_audio_path" not in st.session_state:
st.session_state["last_audio_path"] = ""
if "selected_voice" not in st.session_state:
st.session_state["selected_voice"] = "Jenny (US, Female)"
# --- Branding & Styling ---
st.markdown("""
<style>
.block-container {padding-top: 1rem; padding-bottom: 0rem;}
header {visibility: hidden;}
.stChatMessage { max-width: 85%; border-radius: 12px; padding: 8px; margin-bottom: 10px; }
.stChatMessage[data-testid="stChatMessage-user"] { background: #f0f0f0; color: #000000; }
.stChatMessage[data-testid="stChatMessage-assistant"] { background: #e3f2fd; color: #000000; }
.lt-logo { vertical-align: middle; }
.st-emotion-cache-1avcm0n { justify-content: flex-end !important; }
.stChatInputContainer { position: fixed !important; bottom: 0; width: 80vw; z-index: 100; left: 10vw; background: #11131a; }
</style>
""", unsafe_allow_html=True)
st.markdown("""
<div style='text-align: center; margin-top: 20px; margin-bottom: -10px;'>
<span style='display: inline-flex; align-items: center; gap: 8px;'>
<img src='https://lortechnologies.com/wp-content/uploads/2023/03/LOR-Online-Logo.svg' width='100' class='lor-logo'/>
<span style='font-size: 12px; color: gray;'>Powered by LOR Technologies</span>
</span>
</div>
""", unsafe_allow_html=True)
# --- Sidebar: All audio/controls here ---
with st.sidebar:
st.markdown("### Voice Settings & Controls")
selected_voice = st.selectbox("Select assistant voice", list(VOICE_OPTIONS.keys()), index=list(VOICE_OPTIONS.keys()).index(st.session_state["selected_voice"]))
st.session_state["selected_voice"] = selected_voice
# Audio player, always present if we have an mp3
last_audio = st.session_state.get("last_audio_path")
mute_voice = st.session_state.get("mute_voice", False)
# Replay button and audio player
if last_audio and os.path.exists(last_audio):
# Autoplay if this was just generated, else manual play
st.audio(last_audio, format="audio/mp3", autoplay=not mute_voice)
if st.button("π Replay Voice"):
st.audio(last_audio, format="audio/mp3", autoplay=True)
# Mute/Unmute
if not mute_voice:
if st.button("π Mute Voice"):
st.session_state["mute_voice"] = True
st.rerun()
else:
if st.button("π Unmute Voice"):
st.session_state["mute_voice"] = False
st.rerun()
# --- Firestore helpers ---
def get_or_create_thread_id():
doc_ref = db.collection("users").document(user_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()["thread_id"]
else:
thread = client.beta.threads.create()
doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP})
return thread.id
def save_message(role, content):
db.collection("users").document(user_id).collection("messages").add({
"role": role,
"content": content,
"timestamp": firestore.SERVER_TIMESTAMP
})
def display_chat_history():
messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream()
assistant_icon_html = "<img src='https://raw.githubusercontent.com/AndrewLORTech/lortechwebsite/main/lorain.jpg' width='24' style='vertical-align:middle; border-radius:50%;'/>"
for msg in list(messages)[::-1]:
data = msg.to_dict()
if data["role"] == "user":
st.markdown(f"<div class='stChatMessage' data-testid='stChatMessage-user'>π€ <strong>You:</strong> {data['content']}</div>", unsafe_allow_html=True)
else:
st.markdown(f"<div class='stChatMessage' data-testid='stChatMessage-assistant'>{assistant_icon_html} <strong>LORAIN:</strong> {data['content']}</div>", unsafe_allow_html=True)
# --- Edge TTS synth ---
async def edge_tts_synthesize(text, voice, user_id):
out_path = f"output_{user_id}.mp3"
communicate = edge_tts.Communicate(text, voice)
await communicate.save(out_path)
return out_path
def synthesize_voice(text, voice_key, user_id):
voice = VOICE_OPTIONS[voice_key]
out_path = f"output_{user_id}.mp3"
# Only synthesize if text changed or file missing or voice changed
if st.session_state["last_tts_text"] != text or not os.path.exists(out_path) or st.session_state.get("last_voice") != voice:
with st.spinner(f"Generating voice ({voice_key})..."):
asyncio.run(edge_tts_synthesize(text, voice, user_id))
st.session_state["last_tts_text"] = text
st.session_state["last_audio_path"] = out_path
st.session_state["last_voice"] = voice
return out_path
# --- Main Chat UI (text only!) ---
thread_id = get_or_create_thread_id()
display_chat_history()
# --- Static Chat Input at Bottom ---
user_input = st.chat_input("Type your message here...")
if user_input:
# --- OpenAI Assistant Response ---
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input)
save_message("user", user_input)
with st.spinner("Thinking and typing... π"):
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id)
while True:
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages_response = client.beta.threads.messages.list(thread_id=thread_id)
latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1]
assistant_message = latest_response.content[0].text.value
save_message("assistant", assistant_message)
# --- TTS: Speak unless muted ---
mute_voice = st.session_state.get("mute_voice", False)
audio_path = None
if not mute_voice and assistant_message.strip():
audio_path = synthesize_voice(assistant_message, st.session_state["selected_voice"], user_id)
st.session_state["last_audio_path"] = audio_path
# No audio in main area! Sidebar will autoplay due to above code.
time.sleep(0.2)
st.rerun()
|