Spaces:
Sleeping
Sleeping
import streamlit as st | |
import asyncio | |
import websockets | |
import uuid | |
from datetime import datetime | |
import os | |
import random | |
import time | |
import hashlib | |
from PIL import Image | |
import glob | |
import base64 | |
import io | |
import streamlit.components.v1 as components | |
import edge_tts | |
from audio_recorder_streamlit import audio_recorder | |
import nest_asyncio | |
import re | |
import pytz | |
from gradio_client import Client | |
from streamlit_marquee import streamlit_marquee | |
from collections import defaultdict, Counter | |
# Patch asyncio for nesting | |
nest_asyncio.apply() | |
# Page Config | |
st.set_page_config( | |
layout="wide", | |
page_title="Colorado Sim: Mountains, Memes, & Mayhem ๐๐๏ธ", | |
page_icon="๐ฆ" | |
) | |
# Static Config with Character-Driven Sim | |
Site_Name = "Colorado Sim Chat ๐๏ธโจ" | |
START_ROOM = "Rocky Mountain Hub ๐ฒ" | |
FUN_USERNAMES = { | |
"Trailblazer Tim ๐": "en-US-GuyNeural", # Adventurous hiker | |
"Meme Queen Mia ๐": "en-US-JennyNeural", # Meme enthusiast | |
"Elk Whisperer Eve ๐ฆ": "en-GB-SoniaNeural", # Wildlife lover | |
"Tech Titan Tara ๐พ": "en-AU-NatashaNeural", # Tech-savvy coder | |
"Ski Guru Sam โท๏ธ": "en-CA-ClaraNeural", # Skiing pro | |
"Cosmic Camper Cal ๐ ": "en-US-AriaNeural", # Stargazing storyteller | |
"Rasta Ranger Rick ๐": "en-GB-RyanNeural", # Chill outdoorsman | |
"Boulder Bro Ben ๐ชจ": "en-AU-WilliamNeural" # Rock-climbing dude | |
} | |
EDGE_TTS_VOICES = list(set(FUN_USERNAMES.values())) | |
FILE_EMOJIS = {"md": "๐", "mp3": "๐ต", "png": "๐ผ๏ธ", "mp4": "๐ฅ", "zip": "๐ฆ"} | |
# Directories | |
for d in ["chat_logs", "audio_logs", "audio_cache"]: | |
os.makedirs(d, exist_ok=True) | |
CHAT_DIR = "chat_logs" | |
AUDIO_CACHE_DIR = "audio_cache" | |
AUDIO_DIR = "audio_logs" | |
STATE_FILE = "user_state.txt" | |
CHAT_FILE = os.path.join(CHAT_DIR, "sim_chat.md") | |
# Timestamp Helper | |
def format_timestamp_prefix(username=""): | |
central = pytz.timezone('US/Central') | |
now = datetime.now(central) | |
return f"{now.strftime('%Y%m%d_%H%M%S')}-by-{username}" | |
# Session State Init | |
def init_session_state(): | |
defaults = { | |
'server_running': False, 'server_task': None, 'active_connections': {}, | |
'last_chat_update': 0, 'message_text': "", 'audio_cache': {}, | |
'transcript_history': [], 'last_transcript': "", 'tts_voice': "en-US-AriaNeural", | |
'chat_history': [], 'marquee_settings': { | |
"background": "#1E1E1E", "color": "#FFFFFF", "font-size": "14px", | |
"animationDuration": "20s", "width": "100%", "lineHeight": "35px" | |
}, 'username': None, 'autosend': True, 'last_message': "", | |
'mp3_files': {}, 'timer_start': time.time(), 'auto_refresh': True, 'refresh_rate': 10 | |
} | |
for k, v in defaults.items(): | |
if k not in st.session_state: | |
st.session_state[k] = v | |
# Marquee Helpers | |
def update_marquee_settings_ui(): | |
st.sidebar.markdown("### ๐ฏ Marquee Settings ๐จ") | |
cols = st.sidebar.columns(2) | |
with cols[0]: | |
st.session_state['marquee_settings']['background'] = st.color_picker("๐ Background", "#1E1E1E") | |
st.session_state['marquee_settings']['color'] = st.color_picker("โ๏ธ Text", "#FFFFFF") | |
with cols[1]: | |
st.session_state['marquee_settings']['font-size'] = f"{st.slider('๐ Size', 10, 24, 14)}px" | |
st.session_state['marquee_settings']['animationDuration'] = f"{st.slider('โฑ๏ธ Speed', 1, 20, 20)}s" | |
def display_marquee(text, settings, key_suffix=""): | |
truncated = text[:280] + "..." if len(text) > 280 else text | |
streamlit_marquee(content=truncated, **settings, key=f"marquee_{key_suffix}") | |
st.write("") | |
# Text & File Helpers | |
def clean_text_for_tts(text): | |
return re.sub(r'[#*!\[\]]+', '', ' '.join(text.split()))[:200] or "No text" | |
def clean_text_for_filename(text): | |
return '_'.join(re.sub(r'[^\w\s-]', '', text.lower()).split())[:50] | |
def generate_filename(prompt, username, file_type="md"): | |
timestamp = format_timestamp_prefix(username) | |
hash_val = hashlib.md5(prompt.encode()).hexdigest()[:8] | |
return f"{timestamp}-{hash_val}.{file_type}" | |
def create_file(prompt, username, file_type="md"): | |
filename = generate_filename(prompt, username, file_type) | |
with open(filename, 'w', encoding='utf-8') as f: | |
f.write(prompt) | |
return filename | |
def get_download_link(file, file_type="mp3"): | |
with open(file, "rb") as f: | |
b64 = base64.b64encode(f.read()).decode() | |
mime_types = {"mp3": "audio/mpeg", "png": "image/png", "md": "text/markdown"} | |
return f'<a href="data:{mime_types.get(file_type, "application/octet-stream")};base64,{b64}" download="{os.path.basename(file)}">{FILE_EMOJIS.get(file_type, "๐ฅ")} Download {os.path.basename(file)}</a>' | |
def save_username(username): | |
with open(STATE_FILE, 'w') as f: | |
f.write(username) | |
def load_username(): | |
if os.path.exists(STATE_FILE): | |
with open(STATE_FILE, 'r') as f: | |
return f.read().strip() | |
return None | |
def load_mp3_viewer(): | |
mp3_files = sorted(glob.glob("*.mp3"), key=os.path.getmtime) | |
for i, mp3 in enumerate(mp3_files, 1): | |
filename = os.path.basename(mp3) | |
if filename not in st.session_state['mp3_files']: | |
st.session_state['mp3_files'][filename] = (i, mp3) | |
# Audio Processing | |
async def async_edge_tts_generate(text, voice, username): | |
cache_key = f"{text[:100]}_{voice}" | |
if cache_key in st.session_state['audio_cache']: | |
return st.session_state['audio_cache'][cache_key] | |
text = clean_text_for_tts(text) | |
filename = f"{format_timestamp_prefix(username)}-{hashlib.md5(text.encode()).hexdigest()[:8]}.mp3" | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(filename) | |
if os.path.exists(filename) and os.path.getsize(filename) > 0: | |
st.session_state['audio_cache'][cache_key] = filename | |
return filename | |
return None | |
def play_and_download_audio(file_path): | |
if file_path and os.path.exists(file_path): | |
st.audio(file_path) | |
st.markdown(get_download_link(file_path), unsafe_allow_html=True) | |
async def save_chat_entry(username, message, voice, is_markdown=False): | |
if not message.strip() or message == st.session_state.last_transcript: | |
return None, None | |
central = pytz.timezone('US/Central') | |
timestamp = datetime.now(central).strftime("%Y-%m-%d %H:%M:%S") | |
entry = f"[{timestamp}] {username} ({voice}): {message}" if not is_markdown else f"[{timestamp}] {username} ({voice}):\n```markdown\n{message}\n```" | |
md_file = create_file(entry, username, "md") | |
with open(CHAT_FILE, 'a') as f: | |
f.write(f"{entry}\n") | |
audio_file = await async_edge_tts_generate(message, voice, username) | |
await broadcast_message(f"{username}|{message}", "chat") | |
st.session_state.last_chat_update = time.time() | |
st.session_state.chat_history.append(entry) | |
st.session_state.last_transcript = message | |
return md_file, audio_file | |
async def load_chat(): | |
if not os.path.exists(CHAT_FILE): | |
with open(CHAT_FILE, 'a') as f: | |
f.write(f"# {START_ROOM} Chat\n\nWelcome to the Colorado Sim! ๐๏ธ\n") | |
with open(CHAT_FILE, 'r') as f: | |
content = f.read().strip() | |
lines = content.split('\n') | |
return list(dict.fromkeys(line for line in lines if line.strip())) | |
# ArXiv Search with Gradio Client | |
async def perform_arxiv_search(query, username): | |
gradio_client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") | |
refs = gradio_client.predict( | |
query, 10, "Semantic Search", "mistralai/Mixtral-8x7B-Instruct-v0.1", api_name="/update_with_rag_md" | |
)[0] | |
result = f"๐ ArXiv Insights:\n\n{refs}" | |
voice = FUN_USERNAMES.get(username, "en-US-AriaNeural") | |
md_file, audio_file = await save_chat_entry(username, result, voice, True) | |
return md_file, audio_file | |
# WebSocket Handling | |
async def websocket_handler(websocket, path): | |
client_id = str(uuid.uuid4()) | |
room_id = "chat" | |
if room_id not in st.session_state.active_connections: | |
st.session_state.active_connections[room_id] = {} | |
st.session_state.active_connections[room_id][client_id] = websocket | |
username = st.session_state.get('username', random.choice(list(FUN_USERNAMES.keys()))) | |
await save_chat_entry("System ๐", f"{username} has joined {START_ROOM}!", "en-US-AriaNeural") | |
try: | |
async for message in websocket: | |
if '|' in message: | |
username, content = message.split('|', 1) | |
voice = FUN_USERNAMES.get(username, "en-US-AriaNeural") | |
await save_chat_entry(username, content, voice) | |
await perform_arxiv_search(content, username) # ArXiv response for every chat input | |
except websockets.ConnectionClosed: | |
await save_chat_entry("System ๐", f"{username} has left {START_ROOM}!", "en-US-AriaNeural") | |
finally: | |
if room_id in st.session_state.active_connections and client_id in st.session_state.active_connections[room_id]: | |
del st.session_state.active_connections[room_id][client_id] | |
async def broadcast_message(message, room_id): | |
if room_id in st.session_state.active_connections: | |
disconnected = [] | |
for client_id, ws in st.session_state.active_connections[room_id].items(): | |
try: | |
await ws.send(message) | |
except websockets.ConnectionClosed: | |
disconnected.append(client_id) | |
for client_id in disconnected: | |
if client_id in st.session_state.active_connections[room_id]: | |
del st.session_state.active_connections[room_id][client_id] | |
async def run_websocket_server(): | |
if not st.session_state.get('server_running', False): | |
server = await websockets.serve(websocket_handler, '0.0.0.0', 8765) | |
st.session_state['server_running'] = True | |
await server.wait_closed() | |
def start_websocket_server(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(run_websocket_server()) | |
# Sim-Oriented Mad Libs | |
def generate_mad_libs_story(username, inputs): | |
story = f""" | |
๐ In the wilds of Colorado, {username} stumbled upon {inputs['plural_noun']}. | |
๐ These {inputs['adjective1']} critters were {inputs['verb_ing']} all over the Rockies, | |
๐ค freaking out climbers like {inputs['celebrity']}. | |
๐ ๏ธ Holding a {inputs['object']}, {username} felt {inputs['emotion']} as a {inputs['animal']} | |
โณ {inputs['verb_past']} past, yelling {inputs['exclamation']} near {inputs['place']}. | |
๐ฝ๏ธ Later, munching {inputs['food']} with a {inputs['adjective2']} {inputs['body_part']}, | |
๐ต {username} looped โ{inputs['song']}โ {inputs['number']} times, | |
๐ผ deciding to {inputs['verb_present']} as a {inputs['occupation']} | |
๐จ in a {inputs['color']} {inputs['noun']}โall ending with a {inputs['sound']}! | |
""" | |
return story.strip() | |
# Main Interface | |
def main(): | |
init_session_state() | |
load_mp3_viewer() | |
saved_username = load_username() | |
if saved_username and saved_username in FUN_USERNAMES: | |
st.session_state.username = saved_username | |
if not st.session_state.username: | |
st.session_state.username = random.choice(list(FUN_USERNAMES.keys())) | |
st.session_state.tts_voice = FUN_USERNAMES[st.session_state.username] | |
asyncio.run(save_chat_entry("System ๐", f"{st.session_state.username} has joined {START_ROOM}!", "en-US-AriaNeural")) | |
save_username(st.session_state.username) | |
st.title(f"๐ {Site_Name} - {st.session_state.username} ๐๏ธ") | |
st.subheader("๐ฒ Chat, Explore Audio, or Create a Sim Story! ๐ค") | |
update_marquee_settings_ui() | |
chat_text = " ".join([line.split(": ")[-1] for line in asyncio.run(load_chat()) if ": " in line]) | |
display_marquee(f"๐๏ธ {START_ROOM} | ๐๏ธ {st.session_state.username} | ๐ฌ {chat_text}", st.session_state['marquee_settings'], "welcome") | |
tab_main = st.radio("๐ฎ Action:", ["๐ค Chat & Voice", "๐ต Audio Gallery", "โจ Mad Libs Sim"], horizontal=True) | |
st.checkbox("๐ Autosend Chat", key="autosend") | |
if tab_main == "๐ค Chat & Voice": | |
st.subheader(f"๐ฌ {START_ROOM} Chat ๐") | |
chat_content = asyncio.run(load_chat()) | |
with st.container(): | |
st.code("\n".join(f"{i+1}. {line}" for i, line in enumerate(chat_content)), language="python") | |
message = st.text_input(f"๐ฌ Message as {st.session_state.username}", key="message_input", placeholder="Type your wild thoughts! ๐") | |
if message and message != st.session_state.last_message: | |
st.session_state.last_message = message | |
col_send = st.columns(1)[0] | |
with col_send: | |
if st.session_state.autosend or st.button("๐ Send Message ๐๏ธ"): | |
voice = FUN_USERNAMES.get(st.session_state.username, "en-US-AriaNeural") | |
md_file, audio_file = asyncio.run(save_chat_entry(st.session_state.username, message, voice, True)) | |
if audio_file: | |
play_and_download_audio(audio_file) | |
st.rerun() | |
elif tab_main == "๐ต Audio Gallery": | |
st.subheader("๐ต Audio Gallery ๐ง") | |
mp3_files = sorted(glob.glob("*.mp3"), key=os.path.getmtime) | |
if mp3_files: | |
for i, mp3 in enumerate(mp3_files): | |
with st.expander(f"๐ต {i+1}. {os.path.basename(mp3)}"): | |
play_and_download_audio(mp3) | |
else: | |
st.write("๐ถ No audio files yetโchat or create a story to generate some! ๐ค") | |
elif tab_main == "โจ Mad Libs Sim": | |
st.subheader("โจ Create Your Colorado Sim Story! ๐") | |
col1, col2 = st.columns(2) | |
inputs = {} | |
with col1: | |
inputs['plural_noun'] = st.text_input("๐ Plural Noun", placeholder="e.g., 'tacos'", key="plural_noun") | |
inputs['adjective1'] = st.text_input("๐จ Adjective", placeholder="e.g., 'spicy'", key="adjective1") | |
inputs['verb_ing'] = st.text_input("โณ Verb ending in -ing", placeholder="e.g., 'yeeting'", key="verb_ing") | |
inputs['celebrity'] = st.text_input("๐ค Celebrity Name", placeholder="e.g., 'Elon Musk'", key="celebrity") | |
inputs['object'] = st.text_input("๐ ๏ธ Random Object", placeholder="e.g., 'toaster'", key="object") | |
inputs['emotion'] = st.text_input("โค๏ธ Emotion", placeholder="e.g., 'salty'", key="emotion") | |
inputs['animal'] = st.text_input("๐ฆ Animal", placeholder="e.g., 'elk'", key="animal") | |
inputs['verb_past'] = st.text_input("โฎ๏ธ Verb (past tense)", placeholder="e.g., 'yeeted'", key="verb_past") | |
inputs['place'] = st.text_input("๐ Place", placeholder="e.g., 'Rockies'", key="place") | |
inputs['exclamation'] = st.text_input("โ Exclamation", placeholder="e.g., 'Bruh!'", key="exclamation") | |
with col2: | |
inputs['food'] = st.text_input("๐ฝ๏ธ Food Item", placeholder="e.g., 'pizza'", key="food") | |
inputs['adjective2'] = st.text_input("โจ Adjective", placeholder="e.g., 'cringe'", key="adjective2") | |
inputs['body_part'] = st.text_input("๐ฆด Body Part", placeholder="e.g., 'elbow'", key="body_part") | |
inputs['number'] = st.text_input("๐ข Number", placeholder="e.g., '69'", key="number") | |
inputs['song'] = st.text_input("๐ต Song Title", placeholder="e.g., 'Sweet Caroline'", key="song") | |
inputs['verb_present'] = st.text_input("โถ๏ธ Verb (present tense)", placeholder="e.g., 'slay'", key="verb_present") | |
inputs['occupation'] = st.text_input("๐ผ Occupation", placeholder="e.g., 'influencer'", key="occupation") | |
inputs['color'] = st.text_input("๐ Color", placeholder="e.g., 'teal'", key="color") | |
inputs['noun'] = st.text_input("๐ Noun", placeholder="e.g., 'chaos'", key="noun") | |
inputs['sound'] = st.text_input("๐ Silly Sound Effect", placeholder="e.g., 'Boop!'", key="sound") | |
if st.button("๐ Generate Sim Story! ๐ค"): | |
story = generate_mad_libs_story(st.session_state.username, inputs) | |
st.markdown("### ๐ Your Colorado Sim Adventure ๐๏ธ") | |
st.write(story) | |
voice = FUN_USERNAMES.get(st.session_state.username, "en-US-AriaNeural") | |
md_file, audio_file = asyncio.run(save_chat_entry(st.session_state.username, story, voice, True)) | |
if audio_file: | |
play_and_download_audio(audio_file) | |
# Sidebar | |
st.sidebar.subheader("๐ญ Character Settings") | |
new_username = st.sidebar.selectbox("๐ Switch Character", list(FUN_USERNAMES.keys()), index=list(FUN_USERNAMES.keys()).index(st.session_state.username)) | |
if new_username != st.session_state.username: | |
asyncio.run(save_chat_entry("System ๐", f"{st.session_state.username} switched to {new_username}", "en-US-AriaNeural")) | |
st.session_state.username = new_username | |
st.session_state.tts_voice = FUN_USERNAMES[new_username] | |
save_username(st.session_state.username) | |
st.rerun() | |
st.sidebar.subheader("๐ฌ Chat History") | |
chat_content = asyncio |