IAMTFRMZA's picture
Update app.py
044981b verified
import streamlit as st
import asyncio
import edge_tts
import time
import os
import uuid
import re
import html
import firebase_admin
from firebase_admin import credentials, firestore
from openai import OpenAI
# ---- Firebase setup ----
if not firebase_admin._apps:
cred = credentials.Certificate("firebase-service-account.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# ---- OpenAI setup ----
openai_key = os.getenv("openai_key")
assistant_id = os.getenv("assistant_id")
client = OpenAI(api_key=openai_key)
VOICE_OPTIONS = {
"Jenny (US, Female)": "en-US-JennyNeural",
"Aria (US, Female)": "en-US-AriaNeural",
"Ryan (UK, Male)": "en-GB-RyanNeural",
"Natasha (AU, Female)": "en-AU-NatashaNeural",
"William (AU, Male)": "en-AU-WilliamNeural",
"Libby (UK, Female)": "en-GB-LibbyNeural",
"Leah (SA, Female)": "en-ZA-LeahNeural",
"Luke (SA, Male)": "en-ZA-LukeNeural"
}
st.set_page_config(page_title="GameOn AI Assistant", layout="wide")
# --- State setup
if "user_id" not in st.session_state:
st.session_state["user_id"] = str(uuid.uuid4())
user_id = st.session_state["user_id"]
if "mute_voice" not in st.session_state:
st.session_state["mute_voice"] = False
if "last_tts_text" not in st.session_state:
st.session_state["last_tts_text"] = ""
if "last_audio_path" not in st.session_state:
st.session_state["last_audio_path"] = ""
if "selected_voice" not in st.session_state:
st.session_state["selected_voice"] = "Jenny (US, Female)"
# --- CSS ---
st.markdown("""
<style>
.block-container {padding-top: 1rem;}
header {visibility: hidden;}
.logo-mini {
width: 75px !important;
margin: 0 auto 0.25em auto;
display: block;
}
.lor-brand-bar {
width: 100vw;
text-align: center;
background: none;
margin-bottom: 0.5em;
margin-top: 0.1em;
position: relative;
}
.clear-chat-btn-top {
position: absolute;
top: 10px;
right: 50px;
font-size: 1.4em;
color: #ccc;
background: none;
border: none;
cursor: pointer;
z-index: 1000;
transition: color 0.2s ease;
}
.clear-chat-btn-top:hover {
color: #fff;
}
.stChatMessage { max-width: 85%; border-radius: 12px; padding: 8px; margin-bottom: 10px; }
.stChatMessage[data-testid="stChatMessage-user"] { background: #f0f0f0; color: #000000; }
.stChatMessage[data-testid="stChatMessage-assistant"] { background: #e3f2fd; color: #000000; }
.chat-history-wrapper {
margin-top: 0.5em;
padding-bottom: 9em;
min-height: 60vh;
}
.input-bottom-bar {
position: fixed;
bottom: 3.5em;
width: 100%;
background: #191b22;
padding: 0.5em 0.6em;
border-top: 1px solid #22232c;
z-index: 999;
}
</style>
""", unsafe_allow_html=True)
# --- Top Branding + clear button ---
st.markdown("""
<div class="lor-brand-bar">
<img src="https://www.gameonmobile.co.za/images/GameON!-Mobile-Training-Button-logo-Medium.png" class="logo-mini" />
<div style="font-size: 13px; color: #888;">Powered by GameOn</div>
<button class="clear-chat-btn-top" onclick="window.location.href='?clear=1'">🗑️</button>
</div>
""", unsafe_allow_html=True)
# --- Sidebar: voice settings ---
with st.sidebar:
st.markdown("### Voice Settings & Controls")
selected_voice = st.selectbox(
"Select assistant voice", list(VOICE_OPTIONS.keys()),
index=list(VOICE_OPTIONS.keys()).index(st.session_state["selected_voice"])
)
st.session_state["selected_voice"] = selected_voice
last_audio = st.session_state.get("last_audio_path")
mute_voice = st.session_state.get("mute_voice", False)
if last_audio and os.path.exists(last_audio):
st.audio(last_audio, format="audio/mp3", autoplay=not mute_voice)
if st.button("🔁 Replay Voice"):
st.audio(last_audio, format="audio/mp3", autoplay=True)
if not mute_voice:
if st.button("🔇 Mute Voice"):
st.session_state["mute_voice"] = True
st.rerun()
else:
if st.button("🔊 Unmute Voice"):
st.session_state["mute_voice"] = False
st.rerun()
# --- Firestore helpers ---
def get_or_create_thread_id():
doc_ref = db.collection("users").document(user_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()["thread_id"]
else:
thread = client.beta.threads.create()
doc_ref.set({"thread_id": thread.id, "created_at": firestore.SERVER_TIMESTAMP})
return thread.id
def save_message(role, content):
db.collection("users").document(user_id).collection("messages").add({
"role": role,
"content": content,
"timestamp": firestore.SERVER_TIMESTAMP
})
def clear_chat_history():
user_doc_ref = db.collection("users").document(user_id)
for msg in user_doc_ref.collection("messages").stream():
msg.reference.delete()
user_doc_ref.delete()
st.session_state.clear()
st.rerun()
def display_chat_history():
messages = db.collection("users").document(user_id).collection("messages").order_by("timestamp").stream()
assistant_icon_html = "<img src='https://github.com/AndrewLORTech/gameonwebaiassistant/blob/main/GameGuy.jpg?raw=true' width='22' style='vertical-align:middle; border-radius:50%;'/>"
chat_msgs = []
for msg in list(messages)[::-1]:
data = msg.to_dict()
if data["role"] == "user":
chat_msgs.append(
f"<div class='stChatMessage' data-testid='stChatMessage-user'>👤 <strong>You:</strong> {data['content']}</div>"
)
else:
chat_msgs.append(
f"<div class='stChatMessage' data-testid='stChatMessage-assistant'>{assistant_icon_html} <strong>GameGuy:</strong> {data['content']}</div>"
)
st.markdown('<div class="chat-history-wrapper">' + "".join(chat_msgs) + '</div>', unsafe_allow_html=True)
st.markdown('<div id="chat-top-anchor"></div>', unsafe_allow_html=True)
# --- TTS sanitization ---
def sanitize_for_tts(text):
text = html.unescape(text)
text = re.sub(r'[^\x00-\x7F]+', ' ', text)
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
text = re.sub(r'(\*\*|__)(.*?)\1', r'\2', text)
text = re.sub(r'(\*|_)(.*?)\1', r'\2', text)
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*[-*+]\s+', ' • ', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d+\.\s+', ' • ', text, flags=re.MULTILINE)
text = re.sub(r'[!?]{2,}', '.', text)
text = re.sub(r'\.{3,}', '.', text)
text = re.sub(r'\n{2,}', '. ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
# --- Edge TTS synth ---
async def edge_tts_synthesize(text, voice, user_id):
out_path = f"output_{user_id}.mp3"
communicate = edge_tts.Communicate(text, voice)
await communicate.save(out_path)
return out_path
def synthesize_voice(text, voice_key, user_id):
voice = VOICE_OPTIONS[voice_key]
out_path = f"output_{user_id}.mp3"
if st.session_state["last_tts_text"] != text or not os.path.exists(out_path) or st.session_state.get("last_voice") != voice:
with st.spinner(f"Generating voice ({voice_key})..."):
asyncio.run(edge_tts_synthesize(text, voice, user_id))
st.session_state["last_tts_text"] = text
st.session_state["last_audio_path"] = out_path
st.session_state["last_voice"] = voice
return out_path
# --- CHAT DISPLAY ---
display_chat_history()
# --- Bottom chat input ---
with st.container():
st.markdown('<div class="input-bottom-bar">', unsafe_allow_html=True)
user_input = st.chat_input("Type your message here...")
st.markdown('</div>', unsafe_allow_html=True)
# --- JS auto-scroll ---
st.markdown("""
<script>
window.onload = function() {
var anchor = document.getElementById("chat-top-anchor");
if(anchor){ anchor.scrollIntoView({ behavior: "smooth", block: "start" }); }
};
</script>
""", unsafe_allow_html=True)
# --- Handle clear button ---
if st.query_params.get("clear") == "1":
clear_chat_history()
# --- Handle user input ---
if user_input:
thread_id = get_or_create_thread_id()
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_input)
save_message("user", user_input)
with st.spinner("Thinking and typing... 💭"):
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=assistant_id)
while True:
run_status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages_response = client.beta.threads.messages.list(thread_id=thread_id)
latest_response = sorted(messages_response.data, key=lambda x: x.created_at)[-1]
assistant_message = latest_response.content[0].text.value
save_message("assistant", assistant_message)
mute_voice = st.session_state.get("mute_voice", False)
audio_path = None
if not mute_voice and assistant_message.strip():
clean_text = sanitize_for_tts(assistant_message)
audio_path = synthesize_voice(clean_text, st.session_state["selected_voice"], user_id)
st.session_state["last_audio_path"] = audio_path
time.sleep(0.2)
st.rerun()