import streamlit as st
import os
import json
import time
import hashlib
import glob
import base64
import io
import streamlit.components.v1 as components
import edge_tts
import nest_asyncio
import re
import pytz
import random
import asyncio
from datetime import datetime
from PyPDF2 import PdfReader
import threading
import pandas as pd
from PIL import Image
from streamlit_javascript import st_javascript
# ==============================================================================
# Configuration & Constants
# ==============================================================================
nest_asyncio.apply()
st.set_page_config(
page_title="๐ค๐๏ธ Shared World Builder ๐",
page_icon="๐๏ธ",
layout="wide",
initial_sidebar_state="expanded"
)
icons = '๐ค๐๏ธ๐ฃ๏ธ๐พ'
Site_Name = '๐ค๐๏ธ Shared World Builder ๐ฃ๏ธ'
START_ROOM = "World Lobby ๐"
MEDIA_DIR = "."
STATE_FILE = "user_state.txt"
FUN_USERNAMES = {
"BuilderBot ๐ค": "en-US-AriaNeural", "WorldWeaver ๐ธ๏ธ": "en-US-JennyNeural",
"Terraformer ๐ฑ": "en-GB-SoniaNeural", "SkyArchitect โ๏ธ": "en-AU-NatashaNeural",
"PixelPainter ๐จ": "en-CA-ClaraNeural", "VoxelVortex ๐ช๏ธ": "en-US-GuyNeural",
"CosmicCrafter โจ": "en-GB-RyanNeural", "GeoGuru ๐บ๏ธ": "en-AU-WilliamNeural",
"BlockBard ๐งฑ": "en-CA-LiamNeural", "SoundSculptor ๐": "en-US-AnaNeural",
}
EDGE_TTS_VOICES = list(set(FUN_USERNAMES.values()))
CHAT_DIR = "chat_logs"
AUDIO_CACHE_DIR = "audio_cache"
AUDIO_DIR = "audio_logs"
SAVED_WORLDS_DIR = "saved_worlds"
HISTORY_LOG_DIR = "history_logs"
PLOT_WIDTH = 50.0
PLOT_DEPTH = 50.0
WORLD_STATE_FILE = os.path.join(SAVED_WORLDS_DIR, "history.json")
PLAYER_TIMEOUT_SECONDS = 15 * 60 # 15 minutes
FILE_EMOJIS = {"md": "๐", "mp3": "๐ต", "png": "๐ผ๏ธ", "mp4": "๐ฅ", "zip": "๐ฆ", "json": "๐"}
PRIMITIVE_MAP = {
"๐ณ": "Tree", "๐ฟ": "Rock", "๐๏ธ": "Simple House", "๐ฒ": "Pine Tree", "๐งฑ": "Brick Wall",
"๐ต": "Sphere", "๐ฆ": "Cube", "๐งด": "Cylinder", "๐ฆ": "Cone", "๐ฉ": "Torus",
"๐": "Mushroom", "๐ต": "Cactus", "๐ฅ": "Campfire", "โญ": "Star", "๐": "Gem",
"๐ผ": "Tower", "๐ง": "Barrier", "โฒ": "Fountain", "๐ฎ": "Lantern", "๐ชง": "Sign Post"
}
for d in [CHAT_DIR, AUDIO_DIR, AUDIO_CACHE_DIR, SAVED_WORLDS_DIR, HISTORY_LOG_DIR]:
os.makedirs(d, exist_ok=True)
state_lock = threading.Lock()
# ==============================================================================
# Utility Functions
# ==============================================================================
def get_current_time_str(tz='UTC'):
try:
timezone = pytz.timezone(tz)
now_aware = datetime.now(timezone)
except pytz.UnknownTimeZoneError:
now_aware = datetime.now(pytz.utc)
return now_aware.strftime('%Y%m%d_%H%M%S')
def clean_filename_part(text, max_len=30):
if not isinstance(text, str):
text = "invalid_name"
text = re.sub(r'\s+', '_', text)
text = re.sub(r'[^\w\-.]', '', text)
return text[:max_len]
def ensure_dir(dir_path):
os.makedirs(dir_path, exist_ok=True)
def generate_filename(content, username, extension):
timestamp = get_current_time_str()
content_hash = hashlib.md5(content[:150].encode()).hexdigest()[:6]
clean_username = clean_filename_part(username)
return f"{clean_username}_{timestamp}_{content_hash}.{extension}"
# ==============================================================================
# History File Management
# ==============================================================================
def initialize_history_file():
ensure_dir(SAVED_WORLDS_DIR)
if not os.path.exists(WORLD_STATE_FILE):
initial_state = {"objects": {}, "players": {}, "action_history": []}
with open(WORLD_STATE_FILE, 'w', encoding='utf-8') as f:
json.dump(initial_state, f, indent=2)
def read_history_file():
initialize_history_file()
try:
with open(WORLD_STATE_FILE, 'r', encoding='utf-8') as f:
state = json.load(f)
# Ensure all expected keys exist
state.setdefault("objects", {})
state.setdefault("players", {})
state.setdefault("action_history", [])
return state
except Exception as e:
print(f"Error reading history file: {e}")
return {"objects": {}, "players": {}, "action_history": []}
def write_history_file(state):
with state_lock:
try:
with open(WORLD_STATE_FILE, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
except Exception as e:
print(f"Error writing history file: {e}")
def prune_inactive_players(state):
current_time = time.time()
players = state.get("players", {})
updated_players = {}
for username, data in players.items():
last_action = data.get("last_action_timestamp", 0)
if current_time - last_action <= PLAYER_TIMEOUT_SECONDS:
updated_players[username] = data
state["players"] = updated_players
return state
def update_player_state(username, position=None):
state = read_history_file()
state = prune_inactive_players(state)
players = state.get("players", {})
if username not in players:
players[username] = {"position": position or {"x": PLOT_WIDTH / 2, "y": 0.5, "z": PLOT_DEPTH / 2}, "last_action_timestamp": time.time()}
else:
if position:
players[username]["position"] = position
players[username]["last_action_timestamp"] = time.time()
state["players"] = players
write_history_file(state)
if position:
update_action_history(username, "move", {"position": position}, state)
return state
# ๐ Update Action History: Records actions in session state and history file
def update_action_history(username, action_type, data, state):
timestamp = get_current_time_str()
action_entry = {
"timestamp": timestamp,
"username": username,
"action": action_type,
"data": data
}
if 'action_history' not in st.session_state:
st.session_state.action_history = []
st.session_state.action_history.append(action_entry)
state["action_history"].append(action_entry)
write_history_file(state)
# ๐ ๏ธ Persist World Objects: Validates and saves object data to history.json
def persist_world_objects(obj_data, username, action_type):
if not obj_data or not isinstance(obj_data, dict) or 'obj_id' not in obj_data:
print(f"Invalid object data for {action_type}: {obj_data}")
return read_history_file()
state = read_history_file()
state = prune_inactive_players(state)
if action_type == "place":
if 'type' not in obj_data or obj_data['type'] not in PRIMITIVE_MAP.values():
print(f"Invalid object type: {obj_data.get('type', 'None')}")
return state
state["objects"][obj_data['obj_id']] = obj_data
update_action_history(username, "place", {
"obj_id": obj_data['obj_id'],
"type": obj_data['type'],
"position": obj_data['position']
}, state)
elif action_type == "delete":
if obj_data['obj_id'] in state["objects"]:
obj_info = state["objects"][obj_data['obj_id']]
del state["objects"][obj_data['obj_id']]
update_action_history(username, "delete", {
"obj_id": obj_data['obj_id'],
"type": obj_info['type'],
"position": obj_info['position']
}, state)
write_history_file(state)
log_action(username, action_type, obj_data)
return state
def log_action(username, action_type, data):
timestamp = get_current_time_str()
log_entry = {
"timestamp": timestamp,
"username": username,
"action": action_type,
"data": data
}
shared_log_file = os.path.join(HISTORY_LOG_DIR, "shared_history.jsonl")
try:
with open(shared_log_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(log_entry) + '\n')
except Exception as e:
print(f"Error writing to shared history log: {e}")
clean_username = clean_filename_part(username)
player_log_file = os.path.join(HISTORY_LOG_DIR, f"{clean_username}_history.jsonl")
try:
with open(player_log_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(log_entry) + '\n')
except Exception as e:
print(f"Error writing to player history log {player_log_file}: {e}")
# ๐ฉ Save and Log Chat: Saves chat message to chat_logs and logs to history_logs
async def save_and_log_chat(username, message, voice):
if not message.strip():
print("Empty chat message, skipping save.")
return None, None
timestamp_str = get_current_time_str()
entry = f"[{timestamp_str}] {username} ({voice}): {message}"
md_filename_base = generate_filename(message, username, "md")
md_file_path = os.path.join(CHAT_DIR, md_filename_base)
try:
ensure_dir(CHAT_DIR)
with open(md_file_path, 'w', encoding='utf-8') as f:
f.write(entry)
except Exception as e:
print(f"Error saving chat to {md_file_path}: {e}")
return None, None
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
st.session_state.chat_history.append(entry)
audio_file = None
if st.session_state.get('enable_audio', True):
text_cleaned = clean_text_for_tts(message)
if text_cleaned and text_cleaned != "No text":
filename_base = generate_filename(text_cleaned, username, "mp3")
save_path = os.path.join(AUDIO_DIR, filename_base)
ensure_dir(AUDIO_DIR)
try:
communicate = edge_tts.Communicate(text_cleaned, voice)
await communicate.save(save_path)
if os.path.exists(save_path) and os.path.getsize(save_path) > 0:
audio_file = save_path
else:
print(f"Audio file {save_path} failed generation.")
except Exception as e:
print(f"Edge TTS Error: {e}")
state = read_history_file()
update_action_history(username, "chat", {"message": message}, state)
return md_file_path, audio_file
# ๐พ Save World State: Saves current state to a named file
def save_world_state(world_name):
if not world_name.strip():
st.error("World name cannot be empty.")
return False
clean_name = clean_filename_part(world_name)
timestamp = get_current_time_str()
filename = f"world_{clean_name}_{timestamp}.json"
save_path = os.path.join(SAVED_WORLDS_DIR, filename)
state = read_history_file()
try:
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
print(f"Saved world state to {save_path}")
st.success(f"Saved world as {filename}")
return True
except Exception as e:
print(f"Error saving world state to {save_path}: {e}")
st.error(f"Failed to save world: {e}")
return False
# ๐ Load World State: Loads a saved state from a file
def load_world_state(filename):
load_path = os.path.join(SAVED_WORLDS_DIR, filename)
if not os.path.exists(load_path):
st.error(f"World file not found: {filename}")
return False
try:
with open(load_path, 'r', encoding='utf-8') as f:
state = json.load(f)
state.setdefault("objects", {})
state.setdefault("players", {})
state.setdefault("action_history", [])
write_history_file(state)
st.session_state.world_state = state
st.session_state.action_history = state["action_history"]
print(f"Loaded world state from {load_path}")
st.success(f"Loaded world {filename}")
st.rerun()
return True
except Exception as e:
print(f"Error loading world state from {load_path}: {e}")
st.error(f"Failed to load world: {e}")
return False
# ==============================================================================
# JavaScript Message Handling
# ==============================================================================
def handle_js_messages():
message = st_javascript("""
window.addEventListener('message', (event) => {
return JSON.stringify(event.data);
}, {once: true});
return null;
""")
if message:
try:
data = json.loads(message)
action = data.get("type")
payload = data.get("payload", {})
username = payload.get("username", st.session_state.username)
if action == "place_object":
state = persist_world_objects(payload["object_data"], username, "place")
st.session_state.world_state = state
st.rerun()
elif action == "delete_object":
state = persist_world_objects({"obj_id": payload["obj_id"]}, username, "delete")
st.session_state.world_state = state
st.rerun()
elif action == "move_player":
state = update_player_state(username, payload["position"])
st.session_state.world_state = state
st.rerun()
except json.JSONDecodeError:
print(f"Invalid JS message: {message}")
except Exception as e:
print(f"Error handling JS message: {e}")
# ==============================================================================
# User State & Session Init
# ==============================================================================
def save_username(username):
try:
with open(STATE_FILE, 'w') as f:
f.write(username)
except Exception as e:
print(f"Failed save username: {e}")
def load_username():
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
return f.read().strip()
except Exception as e:
print(f"Failed load username: {e}")
return None
def init_session_state():
defaults = {
'message_counter': 0,
'audio_cache': {},
'tts_voice': "en-US-AriaNeural",
'chat_history': [],
'action_history': [],
'enable_audio': True,
'download_link_cache': {},
'username': None,
'autosend': False,
'last_message': "",
'selected_object': 'None',
'paste_image_base64': "",
'new_world_name': "MyWorld",
'world_state': {"objects": {}, "players": {}, "action_history": []}
}
for k, v in defaults.items():
if k not in st.session_state:
st.session_state[k] = v
if not isinstance(st.session_state.audio_cache, dict):
st.session_state.audio_cache = {}
if not isinstance(st.session_state.download_link_cache, dict):
st.session_state.download_link_cache = {}
if not isinstance(st.session_state.action_history, list):
st.session_state.action_history = []
if 'username' not in st.session_state or not st.session_state.username:
saved_username = load_username()
st.session_state.username = saved_username if saved_username else random.choice(list(FUN_USERNAMES.keys()))
save_username(st.session_state.username)
# Load history on startup
state = read_history_file()
st.session_state.world_state = state
st.session_state.action_history = state.get("action_history", [])
update_player_state(st.session_state.username)
# ==============================================================================
# Audio / TTS / Chat / File Handling Helpers
# ==============================================================================
def clean_text_for_tts(text):
if not isinstance(text, str):
return "No text"
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
text = re.sub(r'[#*_`!]', '', text)
text = ' '.join(text.split())
return text[:250] or "No text"
def create_file(content, username, file_type="md", save_path=None):
if not save_path:
filename = generate_filename(content, username, file_type)
save_path = os.path.join(MEDIA_DIR, filename)
ensure_dir(os.path.dirname(save_path))
try:
with open(save_path, 'w', encoding='utf-8') as f:
f.write(content)
return save_path
except Exception as e:
print(f"Error creating file {save_path}: {e}")
return None
def get_download_link(file_path, file_type="md"):
if not file_path or not os.path.exists(file_path):
basename = os.path.basename(file_path) if file_path else "N/A"
return f"Not found: {basename}"
try:
mtime = os.path.getmtime(file_path)
except OSError:
mtime = 0
cache_key = f"dl_{file_path}_{mtime}"
if 'download_link_cache' not in st.session_state:
st.session_state.download_link_cache = {}
if cache_key not in st.session_state.download_link_cache:
try:
with open(file_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
mime_types = {"md": "text/markdown", "mp3": "audio/mpeg", "png": "image/png", "mp4": "video/mp4", "zip": "application/zip", "json": "application/json"}
basename = os.path.basename(file_path)
link_html = f'{FILE_EMOJIS.get(file_type, "๐")}'
st.session_state.download_link_cache[cache_key] = link_html
except Exception as e:
print(f"Error generating DL link for {file_path}: {e}")
return f"Err"
return st.session_state.download_link_cache.get(cache_key, "CacheErr")
def play_and_download_audio(file_path):
if file_path and os.path.exists(file_path):
try:
st.audio(file_path)
file_type = file_path.split('.')[-1]
st.markdown(get_download_link(file_path, file_type), unsafe_allow_html=True)
except Exception as e:
st.error(f"Audio display error for {os.path.basename(file_path)}: {e}")
async def load_chat_history():
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
if not st.session_state.chat_history:
ensure_dir(CHAT_DIR)
print("Loading chat history from files...")
chat_files = sorted(glob.glob(os.path.join(CHAT_DIR, "*.md")), key=os.path.getmtime)
loaded_count = 0
temp_history = []
for f_path in chat_files:
try:
with open(f_path, 'r', encoding='utf-8') as file:
temp_history.append(file.read().strip())
loaded_count += 1
except Exception as e:
print(f"Err read chat {f_path}: {e}")
st.session_state.chat_history = temp_history
print(f"Loaded {loaded_count} chat entries from files.")
return st.session_state.chat_history
def create_zip_of_files(files_to_zip, prefix="Archive"):
if not files_to_zip:
st.warning("No files provided to zip.")
return None
timestamp = get_current_time_str()
zip_name = f"{prefix}_{timestamp}.zip"
try:
print(f"Creating zip: {zip_name}...")
with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as z:
for f in files_to_zip:
if os.path.exists(f):
z.write(f, os.path.basename(f))
else:
print(f"Skip zip missing: {f}")
print("Zip success.")
st.success(f"Created {zip_name}")
return zip_name
except Exception as e:
print(f"Zip failed: {e}")
st.error(f"Zip failed: {e}")
return None
def delete_files(file_patterns, exclude_files=None):
protected = [STATE_FILE, "app.py", "index.html", "requirements.txt", "README.md", WORLD_STATE_FILE]
if exclude_files:
protected.extend(exclude_files)
deleted_count = 0
errors = 0
for pattern in file_patterns:
pattern_path = pattern
print(f"Attempting to delete files matching: {pattern_path}")
try:
files_to_delete = glob.glob(pattern_path)
if not files_to_delete:
print(f"No files found for pattern: {pattern}")
continue
for f_path in files_to_delete:
basename = os.path.basename(f_path)
if os.path.isfile(f_path) and basename not in protected:
try:
os.remove(f_path)
print(f"Deleted: {f_path}")
deleted_count += 1
except Exception as e:
print(f"Failed delete {f_path}: {e}")
errors += 1
elif os.path.isdir(f_path):
print(f"Skipping directory: {f_path}")
except Exception as glob_e:
print(f"Error matching pattern {pattern}: {glob_e}")
errors += 1
msg = f"Deleted {deleted_count} files."
if errors > 0:
msg += f" Encountered {errors} errors."
st.warning(msg) if errors > 0 else st.success(msg) if deleted_count > 0 else st.info("No matching files found to delete.")
st.session_state['download_link_cache'] = {}
st.session_state['audio_cache'] = {}
async def save_pasted_image(image, username):
if not image:
return None
try:
img_hash = hashlib.md5(image.tobytes()).hexdigest()[:8]
timestamp = get_current_time_str()
filename = f"{timestamp}_pasted_{img_hash}.png"
filepath = os.path.join(MEDIA_DIR, filename)
image.save(filepath, "PNG")
print(f"Pasted image saved: {filepath}")
return filepath
except Exception as e:
print(f"Failed image save: {e}")
return None
def paste_image_component():
pasted_img = None
img_type = None
paste_input = st.text_area("Paste Image Data Here", key="paste_input_area", height=50, value="")
if st.button("Process Pasted Image ๐", key="paste_form_button"):
if paste_input and paste_input.startswith('data:image'):
try:
mime_type = paste_input.split(';')[0].split(':')[1]
base64_str = paste_input.split(',')[1]
img_bytes = base64.b64decode(base64_str)
pasted_img = Image.open(io.BytesIO(img_bytes))
img_type = mime_type.split('/')[1]
st.image(pasted_img, caption=f"Pasted ({img_type.upper()})", width=150)
st.session_state.paste_image_base64 = base64_str
except ImportError:
st.error("Pillow library needed for image pasting.")
except Exception as e:
st.error(f"Img decode err: {e}")
st.session_state.paste_image_base64 = ""
else:
st.warning("No valid image data pasted.")
st.session_state.paste_image_base64 = ""
return pasted_img
class AudioProcessor:
def __init__(self):
self.cache_dir = AUDIO_CACHE_DIR
ensure_dir(self.cache_dir)
self.metadata = json.load(open(f"{self.cache_dir}/metadata.json", 'r')) if os.path.exists(f"{self.cache_dir}/metadata.json") else {}
def _save_metadata(self):
try:
with open(f"{self.cache_dir}/metadata.json", 'w') as f:
json.dump(self.metadata, f, indent=2)
except Exception as e:
print(f"Failed metadata save: {e}")
async def create_audio(self, text, voice='en-US-AriaNeural'):
cache_key = hashlib.md5(f"{text[:150]}:{voice}".encode()).hexdigest()
cache_path = os.path.join(self.cache_dir, f"{cache_key}.mp3")
if cache_key in self.metadata and os.path.exists(cache_path):
return cache_path
text_cleaned = clean_text_for_tts(text)
if not text_cleaned:
return None
ensure_dir(os.path.dirname(cache_path))
try:
communicate = edge_tts.Communicate(text_cleaned, voice)
await communicate.save(cache_path)
if os.path.exists(cache_path) and os.path.getsize(cache_path) > 0:
self.metadata[cache_key] = {'timestamp': datetime.now().isoformat(), 'text_length': len(text_cleaned), 'voice': voice}
self._save_metadata()
return cache_path
else:
return None
except Exception as e:
print(f"TTS Create Audio Error: {e}")
return None
def process_pdf_tab(pdf_file, max_pages, voice):
st.subheader("PDF Processing Results")
if pdf_file is None:
st.info("Upload a PDF file and click 'Process PDF' to begin.")
return
audio_processor = AudioProcessor()
try:
reader = PdfReader(pdf_file)
if reader.is_encrypted:
st.warning("PDF is encrypted.")
return
total_pages = min(len(reader.pages), max_pages)
st.write(f"Processing first {total_pages} pages of '{pdf_file.name}'...")
texts, audios = {}, {}
page_threads = []
results_lock = threading.Lock()
def process_page_sync(page_num, page_text):
async def run_async_audio():
return await audio_processor.create_audio(page_text, voice)
try:
audio_path = asyncio.run(run_async_audio())
if audio_path:
with results_lock:
audios[page_num] = audio_path
except Exception as page_e:
print(f"Err process page {page_num+1}: {page_e}")
for i in range(total_pages):
try:
page = reader.pages[i]
text = page.extract_text()
if text and text.strip():
texts[i] = text
thread = threading.Thread(target=process_page_sync, args=(i, text))
page_threads.append(thread)
thread.start()
else:
texts[i] = "[No text extracted]"
except Exception as extract_e:
texts[i] = f"[Error extract: {extract_e}]"
print(f"Error page {i+1} extract: {extract_e}")
progress_bar = st.progress(0.0, text="Processing pages...")
total_threads = len(page_threads)
start_join_time = time.time()
while any(t.is_alive() for t in page_threads):
completed_threads = total_threads - sum(t.is_alive() for t in page_threads)
progress = completed_threads / total_threads if total_threads > 0 else 1.0
progress_bar.progress(min(progress, 1.0), text=f"Processed {completed_threads}/{total_threads} pages...")
if time.time() - start_join_time > 600:
print("PDF processing timed out.")
break
time.sleep(0.5)
progress_bar.progress(1.0, text="Processing complete.")
for i in range(total_pages):
with st.expander(f"Page {i+1}"):
st.markdown(texts.get(i, "[Error getting text]"))
audio_file = audios.get(i)
if audio_file:
play_and_download_audio(audio_file)
else:
st.caption("Audio generation failed or was skipped.")
except Exception as pdf_e:
st.error(f"Err read PDF: {pdf_e}")
st.exception(pdf_e)
# ==============================================================================
# Streamlit UI Layout Functions
# ==============================================================================
def render_sidebar():
with st.sidebar:
st.header("๐พ World State")
st.caption("Manage the shared world state.")
# Active Players
state = read_history_file()
players = state.get("players", {})
st.subheader("Active Players")
current_time = time.time()
for username, data in players.items():
last_action = data.get("last_action_timestamp", 0)
minutes_ago = (current_time - last_action) / 60
st.write(f"{username}: Last active {minutes_ago:.1f} minutes ago")
# Action History Dataset
st.subheader("Action History")
history_data = []
for entry in st.session_state.action_history:
data = entry["data"]
if entry["action"] in ["place", "delete"]:
pos = data.get("position", {})
position_str = f"({pos.get('x', 0):.1f}, {pos.get('y', 0):.1f}, {pos.get('z', 0):.1f})"
history_data.append({
"Time": entry["timestamp"],
"Player": entry["username"],
"Action": entry["action"].capitalize(),
"Object Type": data.get("type", "N/A"),
"Position": position_str
})
elif entry["action"] == "move":
pos = data.get("position", {})
position_str = f"({pos.get('x', 0):.1f}, {pos.get('y', 0):.1f}, {pos.get('z', 0):.1f})"
history_data.append({
"Time": entry["timestamp"],
"Player": entry["username"],
"Action": "Move",
"Object Type": "Player",
"Position": position_str
})
elif entry["action"] == "chat":
history_data.append({
"Time": entry["timestamp"],
"Player": entry["username"],
"Action": "Chat",
"Object Type": "Message",
"Position": data.get("message", "N/A")[:50]
})
if history_data:
st.dataframe(pd.DataFrame(history_data), height=200, use_container_width=True)
else:
st.caption("No actions recorded yet.")
# Save World
st.subheader("Save World")
world_name = st.text_input("World Name", value="MyWorld", key="save_world_name")
if st.button("๐พ Save World", key="save_world"):
save_world_state(world_name)
# Load World
st.subheader("Load World")
saved_worlds = [os.path.basename(f) for f in glob.glob(os.path.join(SAVED_WORLDS_DIR, "world_*.json"))]
selected_world = st.selectbox("Select Saved World", ["None"] + saved_worlds, key="load_world_select")
if selected_world != "None" and st.button("๐ Load World", key="load_world"):
load_world_state(selected_world)
st.markdown("---")
st.header("๐๏ธ Build Tools")
st.caption("Select an object to place.")
# CSS for tool buttons
st.markdown("""
""", unsafe_allow_html=True)
# Tool buttons
current_tool = st.session_state.get('selected_object', 'None')
cols = st.columns(5)
col_idx = 0
for emoji, name in PRIMITIVE_MAP.items():
button_key = f"primitive_{name}"
button_type = "primary" if current_tool == name else "secondary"
if cols[col_idx % 5].button(emoji, key=button_key, help=name, type=button_type, use_container_width=True):
if st.session_state.selected_object != name:
st.session_state.selected_object = name
update_player_state(st.session_state.username)
update_action_history(st.session_state.username, "tool_change", {"tool": name}, read_history_file())
st.rerun()
col_idx += 1
st.markdown("---")
if st.button("๐ซ Clear Tool", key="clear_tool", use_container_width=True):
if st.session_state.selected_object != 'None':
st.session_state.selected_object = 'None'
update_player_state(st.session_state.username)
update_action_history(st.session_state.username, "tool_change", {"tool": "None"}, read_history_file())
st.rerun()
st.markdown("---")
st.header("๐ฃ๏ธ Voice & User")
current_username = st.session_state.get('username', list(FUN_USERNAMES.keys())[0])
username_options = list(FUN_USERNAMES.keys())
current_index = 0
try:
current_index = username_options.index(current_username)
except ValueError:
current_index = 0
new_username = st.selectbox("Change Name/Voice", options=username_options, index=current_index, key="username_select", format_func=lambda x: x.split(" ")[0])
if new_username != st.session_state.username:
old_username = st.session_state.username
state = read_history_file()
if old_username in state["players"]:
state["players"][new_username] = state["players"].pop(old_username)
write_history_file(state)
st.session_state.username = new_username
st.session_state.tts_voice = FUN_USERNAMES[new_username]
save_username(st.session_state.username)
update_player_state(st.session_state.username)
update_action_history(st.session_state.username, "rename", {"old_username": old_username, "new_username": new_username}, state)
st.rerun()
st.session_state['enable_audio'] = st.toggle("Enable TTS Audio", value=st.session_state.get('enable_audio', True))
def render_main_content():
st.title(f"{Site_Name} - User: {st.session_state.username}")
tab_world, tab_chat, tab_pdf, tab_files = st.tabs(["๐๏ธ World Builder", "๐ฃ๏ธ Chat", "๐ PDF Tools", "๐ Files & Settings"])
with tab_world:
st.header("Shared 3D World")
st.caption("Click to place objects with the selected tool, or click to move player. Right-click to delete. State is saved in history.json.")
state = st.session_state.world_state
html_file_path = 'index.html'
try:
with open(html_file_path, 'r', encoding='utf-8') as f:
html_template = f.read()
js_injection_script = f""""""
html_content_with_state = html_template.replace('', js_injection_script + '\n', 1)
components.html(html_content_with_state, height=700, scrolling=False)
handle_js_messages()
except FileNotFoundError:
st.error(f"CRITICAL ERROR: Could not find '{html_file_path}'.")
except Exception as e:
st.error(f"Error loading 3D component: {e}")
st.exception(e)
with tab_chat:
st.header(f"{START_ROOM} Chat")
chat_history_task = asyncio.run(load_chat_history())
chat_container = st.container(height=500)
with chat_container:
if chat_history_task:
st.markdown("----\n".join(reversed(chat_history_task[-50:])))
else:
st.caption("No chat messages yet.")
message_value = st.text_input("Your Message:", key=f"message_input_{st.session_state.get('message_counter', 0)}", label_visibility="collapsed")
send_button_clicked = st.button("Send Chat ๐ฌ", key="send_chat_button")
should_autosend = st.session_state.get('autosend', False) and message_value
if send_button_clicked or should_autosend:
message_to_send = message_value
if message_to_send.strip() and message_to_send != st.session_state.get('last_message', ''):
st.session_state.last_message = message_to_send
voice = FUN_USERNAMES.get(st.session_state.username, "en-US-AriaNeural")
asyncio.run(save_and_log_chat(st.session_state.username, message_to_send, voice))
update_player_state(st.session_state.username)
st.session_state.message_counter = st.session_state.get('message_counter', 0) + 1
st.rerun()
elif send_button_clicked:
st.toast("Message empty or same as last.")
st.checkbox("Autosend Chat", key="autosend")
with tab_files:
st.header("๐ Files & Settings")
st.subheader("๐พ World State Management")
if st.button("Clear World State", key="clear_world_state"):
state = {"objects": {}, "players": {}, "action_history": []}
write_history_file(state)
st.session_state.world_state = state
st.session_state.action_history = []
st.success("World state cleared!")
st.rerun()
st.subheader("๐ฆ Download Archives")
zip_files = sorted(glob.glob(os.path.join(MEDIA_DIR, "*.zip")), key=os.path.getmtime, reverse=True)
if zip_files:
col_zip1, col_zip2, col_zip3 = st.columns(3)
with col_zip1:
if st.button("Zip Worlds"):
create_zip_of_files([WORLD_STATE_FILE] + glob.glob(os.path.join(SAVED_WORLDS_DIR, "world_*.json")), "Worlds")
with col_zip2:
if st.button("Zip Chats"):
create_zip_of_files(glob.glob(os.path.join(CHAT_DIR, "*.md")), "Chats")
with col_zip3:
if st.button("Zip Audio"):
create_zip_of_files(glob.glob(os.path.join(AUDIO_DIR, "*.mp3")) + glob.glob(os.path.join(AUDIO_CACHE_DIR, "*.mp3")), "Audio")
st.caption("Existing Zip Files:")
for zip_file in zip_files:
st.markdown(get_download_link(zip_file, "zip"), unsafe_allow_html=True)
else:
st.caption("No zip archives found.")
# ==============================================================================
# Main Execution Logic
# ==============================================================================
if __name__ == "__main__":
init_session_state()
render_sidebar()
render_main_content()