IAMTFRMZA's picture
Update app.py
e76a3ee verified
raw
history blame
8.01 kB
import streamlit as st
import asyncio
import edge_tts
import time
import os
import uuid
import firebase_admin
from firebase_admin import credentials, firestore
from openai import OpenAI
# Firebase setup
if not firebase_admin._apps:
cred = credentials.Certificate("firebase-service-account.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# OpenAI setup
openai_key = os.getenv("openai_key")
assistant_id = os.getenv("assistant_id")
client = OpenAI(api_key=openai_key)
VOICE_OPTIONS = {
"Jenny (US, Female)": "en-US-JennyNeural",
"Aria (US, Female)": "en-US-AriaNeural",
"Ryan (UK, Male)": "en-GB-RyanNeural",
"Natasha (AU, Female)": "en-AU-NatashaNeural",
"William (AU, Male)": "en-AU-WilliamNeural",
"Libby (UK, Female)": "en-GB-LibbyNeural",
"Leah (SA, Female)": "en-ZA-LeahNeural",
"Luke (SA, Male)": "en-ZA-LukeNeural"
}
st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide")
# State
if "user_id" not in st.session_state:
st.session_state["user_id"] = str(uuid.uuid4())
user_id = st.session_state["user_id"]
if "mute_voice" not in st.session_state:
st.session_state["mute_voice"] = False
if "last_tts_text" not in st.session_state:
st.session_state["last_tts_text"] = ""
if "last_audio_path" not in st.session_state:
st.session_state["last_audio_path"] = ""
if "selected_voice" not in st.session_state:
st.session_state["selected_voice"] = "Jenny (US, Female)"
# CSS for floating bar illusion (margin for chat, input pinned visually)
st.markdown("""
<style>
.block-container {padding-top: 1rem;}
.stChatMessage { max-width: 85%; border-radius: 12px; padding: 8px; margin-bottom: 10px; }
.stChatMessage[data-testid="stChatMessage-user"] { background: #f0f0f0; color: #000000; }
.stChatMessage[data-testid="stChatMessage-assistant"] { background: #e3f2fd; color: #000000; }
.lt-logo { vertical-align: middle; }
.footer-fakebar {
position: fixed;
left: 0; bottom: 0;
width: 100vw;
background: #181a22;
box-shadow: 0 -2px 8px rgba(0,0,0,0.10);
padding: 1.2em 0.5em 0.8em 0.5em;
z-index: 9999;
}
.footer-fakebar .element-container { flex: 1 1 auto; }
.footer-fakebar input { font-size: 1.15em !important; }
.footer-placeholder { height: 90px; }
</style>
""", unsafe_allow_html=True)
st.markdown("""
<div style='text-align: center; margin-top: 20px; margin-bottom: -10px;'>
<span style='display: inline-flex; align-items: center; gap: 8px;'>
<img src='https://lortechnologies.com/wp-content/uploads/2023/03/LOR-Online-Logo.svg' width='100' class='lor-logo'/>
<span style='font-size: 12px; color: gray;'>Powered by LOR Technologies</span>
</span>
</div>
""", unsafe_allow_html=True)
# Sidebar: audio/voice controls
with st.sidebar:
st.markdown("### Voice Settings & Controls")
selected_voice = st.selectbox(
"Select assistant voice", list(VOICE_OPTIONS.keys()),
index=list(VOICE_OPTIONS.keys()).index(st.session_state["selected_voice"])
)
st.session_state["selected_voice"] = selected_voice
last_audio = st.session_state.get("last_audio_path")
mute_voice = st.session_state.get("mute_voice", False)
if last_audio and os.path.exists(last_audio):
st.audio(last_audio, format="audio/mp3", autoplay=not mute_voice)
if st.button("πŸ” Replay Voice"):
st.audio(last_audio, format="audio/mp3", autoplay=True)
if not mute_voice:
if st.button("πŸ”‡ Mute Voice"):
st.session_state["mute_voice"] = True
st.rerun()
else:
if st.button("πŸ”Š Unmute Voice"):
st.session_state["mute_voice"] = False
st.rerun()
def get_or_create_thread_id():
doc_ref = db.collection("users").document(user_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()["thread_id"]
else:
thread = client.beta.threads.create()
doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP})
return thread.id
def save_message(role, content):
db.collection("users").document(user_id).collection("messages").add({
"role": role,
"content": content,
"timestamp": firestore.SERVER_TIMESTAMP
})
def clear_chat_history():
user_doc_ref = db.collection("users").document(user_id)
for msg in user_doc_ref.collection("messages").stream():
msg.reference.delete()
user_doc_ref.delete()
st.session_state.clear()
st.rerun()
def display_chat_history():
messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream()
assistant_icon_html = "<img src='https://raw.githubusercontent.com/AndrewLORTech/lortechwebsite/main/lorain.jpg' width='24' style='vertical-align:middle; border-radius:50%;'/>"
chat_msgs = []
for msg in list(messages)[::-1]:
data = msg.to_dict()
if data["role"] == "user":
chat_msgs.append(
f"<div class='stChatMessage' data-testid='stChatMessage-user'>πŸ‘€ <strong>You:</strong> {data['content']}</div>"
)
else:
chat_msgs.append(
f"<div class='stChatMessage' data-testid='stChatMessage-assistant'>{assistant_icon_html} <strong>LORAIN:</strong> {data['content']}</div>"
)
st.markdown("".join(chat_msgs[::-1]), unsafe_allow_html=True)
# TTS
async def edge_tts_synthesize(text, voice, user_id):
out_path = f"output_{user_id}.mp3"
communicate = edge_tts.Communicate(text, voice)
await communicate.save(out_path)
return out_path
def synthesize_voice(text, voice_key, user_id):
voice = VOICE_OPTIONS[voice_key]
out_path = f"output_{user_id}.mp3"
if st.session_state["last_tts_text"] != text or not os.path.exists(out_path) or st.session_state.get("last_voice") != voice:
with st.spinner(f"Generating voice ({voice_key})..."):
asyncio.run(edge_tts_synthesize(text, voice, user_id))
st.session_state["last_tts_text"] = text
st.session_state["last_audio_path"] = out_path
st.session_state["last_voice"] = voice
return out_path
# --- Chat history and spacer ---
display_chat_history()
st.markdown('<div class="footer-placeholder"></div>', unsafe_allow_html=True)
# --- "Floating" Chat Input and Clear Chat button ---
with st.container():
st.markdown('<div class="footer-fakebar">', unsafe_allow_html=True)
col1, col2 = st.columns([10, 1])
user_input = col1.chat_input("Type your message here...")
if col2.button("πŸ—‘οΈ", help="Clear Chat", key="clear-chat-bottom"):
clear_chat_history()
st.markdown('</div>', unsafe_allow_html=True)
if user_input:
# --- OpenAI Assistant Response ---
client.beta.threads.messages.create(thread_id=get_or_create_thread_id(), role="user", content=user_input)
save_message("user", user_input)
with st.spinner("Thinking and typing... πŸ’­"):
run = client.beta.threads.runs.create(thread_id=get_or_create_thread_id(), assistant_id=assistant_id)
while True:
run_status = client.beta.threads.runs.retrieve(thread_id=get_or_create_thread_id(), run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages_response = client.beta.threads.messages.list(thread_id=get_or_create_thread_id())
latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1]
assistant_message = latest_response.content[0].text.value
save_message("assistant", assistant_message)
mute_voice = st.session_state.get("mute_voice", False)
audio_path = None
if not mute_voice and assistant_message.strip():
audio_path = synthesize_voice(assistant_message, st.session_state["selected_voice"], user_id)
st.session_state["last_audio_path"] = audio_path
time.sleep(0.2)
st.rerun()