Spaces:
Running
Running
File size: 6,173 Bytes
59d778e 0e1b8a9 98e1cd9 0e1b8a9 acc6be8 0e1b8a9 9cf31e7 0e1b8a9 9cf31e7 0e1b8a9 6cff19d acc6be8 0e1b8a9 b3b692e 0e1b8a9 e98df8c 0e1b8a9 b3b692e 0e1b8a9 59d778e b3b692e 59d778e b3b692e 59d778e b3b692e 59d778e 0e1b8a9 05eed53 0fc17c0 b3b692e 59d778e b3b692e 59d778e 05eed53 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import streamlit as st
from openai import OpenAI
import time
import os
import uuid
import firebase_admin
from firebase_admin import credentials, firestore
# π Firebase setup
if not firebase_admin._apps:
cred = credentials.Certificate("firebase-service-account.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# π OpenAI setup
openai_key = os.getenv("openai_key")
assistant_id = os.getenv("assistant_id")
client = OpenAI(api_key=openai_key)
# π Streamlit Config
st.set_page_config(page_title="LOR Technologies AI Assistant", layout="wide")
# π― Session + User ID
if "user_id" not in st.session_state:
st.session_state["user_id"] = str(uuid.uuid4())
user_id = st.session_state["user_id"]
# πΌοΈ LORTech Branding + Styling
st.markdown("""
<style>
.block-container {padding-top: 1rem; padding-bottom: 0rem;}
header {visibility: hidden;}
.stChatMessage { max-width: 85%; border-radius: 12px; padding: 8px; margin-bottom: 10px; }
.stChatMessage[data-testid="stChatMessage-user"] { background: #f0f0f0; color: #000000; }
.stChatMessage[data-testid="stChatMessage-assistant"] { background: #e3f2fd; color: #000000; }
.lt-logo { vertical-align: middle; }
</style>
""", unsafe_allow_html=True)
st.markdown("""
<div style='text-align: center; margin-top: 20px; margin-bottom: -10px;'>
<span style='display: inline-flex; align-items: center; gap: 8px;'>
<img src='https://lortechnologies.com/wp-content/uploads/2023/03/LOR-Online-Logo.svg' width='100' class='lor-logo'/>
<span style='font-size: 12px; color: gray;'>Powered by LOR Technologies</span>
</span>
</div>
""", unsafe_allow_html=True)
# π Get or create a thread ID
def get_or_create_thread_id():
doc_ref = db.collection("users").document(user_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()["thread_id"]
else:
thread = client.beta.threads.create()
doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP})
return thread.id
# πΎ Save a message
def save_message(role, content):
db.collection("users").document(user_id).collection("messages").add({
"role": role,
"content": content,
"timestamp": firestore.SERVER_TIMESTAMP
})
# π¬ Display chat history
def display_chat_history():
messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream()
assistant_icon_html = "<img src='https://lortechnologies.com/wp-content/uploads/2023/03/LOR-Online-Logo.svg' width='20' style='vertical-align:middle;'/>"
for msg in list(messages)[::-1]:
data = msg.to_dict()
if data["role"] == "user":
st.markdown(f"<div class='stChatMessage' data-testid='stChatMessage-user'>π€ <strong>You:</strong> {data['content']}</div>", unsafe_allow_html=True)
else:
st.markdown(f"<div class='stChatMessage' data-testid='stChatMessage-assistant'>{assistant_icon_html} <strong>LORAIN:</strong> {data['content']}</div>", unsafe_allow_html=True)
# --- Main Chat UI ---
input_col, clear_col = st.columns([9, 1])
with input_col:
user_input = st.chat_input("Type your message here...")
with clear_col:
if st.button("ποΈ", key="clear-chat", help="Clear Chat"):
try:
user_doc_ref = db.collection("users").document(user_id)
for msg in user_doc_ref.collection("messages").stream():
msg.reference.delete()
user_doc_ref.delete()
st.session_state.clear()
st.rerun()
except Exception as e:
st.error(f"Failed to clear chat: {e}")
thread_id = get_or_create_thread_id()
display_chat_history()
if "mute_voice" not in st.session_state:
st.session_state["mute_voice"] = False
if "last_tts_text" not in st.session_state:
st.session_state["last_tts_text"] = ""
def synthesize_voice(text, user_id):
audio_path = f"output_{user_id}.mp3"
# Only synthesize if text changed or file doesn't exist
if st.session_state["last_tts_text"] != text or not os.path.exists(audio_path):
with st.spinner("Synthesizing voice with GPT-4o..."):
speech_response = client.audio.speech.create(
model="tts-1",
voice="nova",
input=text,
response_format="mp3"
)
with open(audio_path, "wb") as f:
f.write(speech_response.content)
st.session_state["last_tts_text"] = text
return audio_path
if user_input:
# Send user message to OpenAI thread
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input)
save_message("user", user_input)
with st.spinner("Thinking and typing... π"):
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id)
while True:
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages_response = client.beta.threads.messages.list(thread_id=thread_id)
latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1]
assistant_message = latest_response.content[0].text.value
save_message("assistant", assistant_message)
# Voice autoplay unless muted
mute_voice = st.session_state.get("mute_voice", False)
audio_path = synthesize_voice(assistant_message, user_id) if not mute_voice else None
if not mute_voice and audio_path:
st.audio(audio_path, format="audio/mp3", autoplay=True)
elif mute_voice:
st.info("π Voice is muted. To enable assistant speech, click 'Unmute Voice' below and ask another question.")
# Single mute/unmute button
if not mute_voice:
if st.button("π Mute Voice"):
st.session_state["mute_voice"] = True
st.rerun()
else:
if st.button("π Unmute Voice"):
st.session_state["mute_voice"] = False
st.rerun()
time.sleep(0.5)
st.rerun()
|