import streamlit as st import os import json import time import hashlib import glob import base64 import io import streamlit.components.v1 as components import edge_tts import nest_asyncio import re import pytz import random import asyncio from datetime import datetime from PyPDF2 import PdfReader import threading import pandas as pd from PIL import Image from streamlit_javascript import st_javascript # ============================================================================== # Configuration & Constants # ============================================================================== nest_asyncio.apply() st.set_page_config( page_title="๐Ÿค–๐Ÿ—๏ธ Shared World Builder ๐Ÿ†", page_icon="๐Ÿ—๏ธ", layout="wide", initial_sidebar_state="expanded" ) icons = '๐Ÿค–๐Ÿ—๏ธ๐Ÿ—ฃ๏ธ๐Ÿ’พ' Site_Name = '๐Ÿค–๐Ÿ—๏ธ Shared World Builder ๐Ÿ—ฃ๏ธ' START_ROOM = "World Lobby ๐ŸŒ" MEDIA_DIR = "." STATE_FILE = "user_state.txt" FUN_USERNAMES = { "BuilderBot ๐Ÿค–": "en-US-AriaNeural", "WorldWeaver ๐Ÿ•ธ๏ธ": "en-US-JennyNeural", "Terraformer ๐ŸŒฑ": "en-GB-SoniaNeural", "SkyArchitect โ˜๏ธ": "en-AU-NatashaNeural", "PixelPainter ๐ŸŽจ": "en-CA-ClaraNeural", "VoxelVortex ๐ŸŒช๏ธ": "en-US-GuyNeural", "CosmicCrafter โœจ": "en-GB-RyanNeural", "GeoGuru ๐Ÿ—บ๏ธ": "en-AU-WilliamNeural", "BlockBard ๐Ÿงฑ": "en-CA-LiamNeural", "SoundSculptor ๐Ÿ”Š": "en-US-AnaNeural", } EDGE_TTS_VOICES = list(set(FUN_USERNAMES.values())) CHAT_DIR = "chat_logs" AUDIO_CACHE_DIR = "audio_cache" AUDIO_DIR = "audio_logs" SAVED_WORLDS_DIR = "saved_worlds" HISTORY_LOG_DIR = "history_logs" PLOT_WIDTH = 50.0 PLOT_DEPTH = 50.0 WORLD_STATE_FILE = os.path.join(SAVED_WORLDS_DIR, "history.json") PLAYER_TIMEOUT_SECONDS = 15 * 60 # 15 minutes FILE_EMOJIS = {"md": "๐Ÿ“", "mp3": "๐ŸŽต", "png": "๐Ÿ–ผ๏ธ", "mp4": "๐ŸŽฅ", "zip": "๐Ÿ“ฆ", "json": "๐Ÿ“„"} PRIMITIVE_MAP = { "๐ŸŒณ": "Tree", "๐Ÿ—ฟ": "Rock", "๐Ÿ›๏ธ": "Simple House", "๐ŸŒฒ": "Pine Tree", "๐Ÿงฑ": "Brick Wall", "๐Ÿ”ต": "Sphere", "๐Ÿ“ฆ": "Cube", "๐Ÿงด": "Cylinder", "๐Ÿฆ": "Cone", "๐Ÿฉ": "Torus", "๐Ÿ„": "Mushroom", "๐ŸŒต": "Cactus", "๐Ÿ”ฅ": "Campfire", "โญ": "Star", "๐Ÿ’Ž": "Gem", "๐Ÿ—ผ": "Tower", "๐Ÿšง": "Barrier", "โ›ฒ": "Fountain", "๐Ÿฎ": "Lantern", "๐Ÿชง": "Sign Post" } for d in [CHAT_DIR, AUDIO_DIR, AUDIO_CACHE_DIR, SAVED_WORLDS_DIR, HISTORY_LOG_DIR]: os.makedirs(d, exist_ok=True) state_lock = threading.Lock() # ============================================================================== # Utility Functions # ============================================================================== def get_current_time_str(tz='UTC'): try: timezone = pytz.timezone(tz) now_aware = datetime.now(timezone) except pytz.UnknownTimeZoneError: now_aware = datetime.now(pytz.utc) return now_aware.strftime('%Y%m%d_%H%M%S') def clean_filename_part(text, max_len=30): if not isinstance(text, str): text = "invalid_name" text = re.sub(r'\s+', '_', text) text = re.sub(r'[^\w\-.]', '', text) return text[:max_len] def ensure_dir(dir_path): os.makedirs(dir_path, exist_ok=True) def generate_filename(content, username, extension): timestamp = get_current_time_str() content_hash = hashlib.md5(content[:150].encode()).hexdigest()[:6] clean_username = clean_filename_part(username) return f"{clean_username}_{timestamp}_{content_hash}.{extension}" # ============================================================================== # History File Management # ============================================================================== def initialize_history_file(): ensure_dir(SAVED_WORLDS_DIR) if not os.path.exists(WORLD_STATE_FILE): initial_state = {"objects": {}, "players": {}, "action_history": []} with open(WORLD_STATE_FILE, 'w', encoding='utf-8') as f: json.dump(initial_state, f, indent=2) def read_history_file(): initialize_history_file() try: with open(WORLD_STATE_FILE, 'r', encoding='utf-8') as f: state = json.load(f) # Ensure all expected keys exist state.setdefault("objects", {}) state.setdefault("players", {}) state.setdefault("action_history", []) return state except Exception as e: print(f"Error reading history file: {e}") return {"objects": {}, "players": {}, "action_history": []} def write_history_file(state): with state_lock: try: with open(WORLD_STATE_FILE, 'w', encoding='utf-8') as f: json.dump(state, f, indent=2) except Exception as e: print(f"Error writing history file: {e}") def prune_inactive_players(state): current_time = time.time() players = state.get("players", {}) updated_players = {} for username, data in players.items(): last_action = data.get("last_action_timestamp", 0) if current_time - last_action <= PLAYER_TIMEOUT_SECONDS: updated_players[username] = data state["players"] = updated_players return state def update_player_state(username, position=None): state = read_history_file() state = prune_inactive_players(state) players = state.get("players", {}) if username not in players: players[username] = {"position": position or {"x": PLOT_WIDTH / 2, "y": 0.5, "z": PLOT_DEPTH / 2}, "last_action_timestamp": time.time()} else: if position: players[username]["position"] = position players[username]["last_action_timestamp"] = time.time() state["players"] = players write_history_file(state) if position: update_action_history(username, "move", {"position": position}, state) return state # ๐Ÿ“Š Update Action History: Records actions in session state and history file def update_action_history(username, action_type, data, state): timestamp = get_current_time_str() action_entry = { "timestamp": timestamp, "username": username, "action": action_type, "data": data } if 'action_history' not in st.session_state: st.session_state.action_history = [] st.session_state.action_history.append(action_entry) state["action_history"].append(action_entry) write_history_file(state) # ๐Ÿ› ๏ธ Persist World Objects: Validates and saves object data to history.json def persist_world_objects(obj_data, username, action_type): if not obj_data or not isinstance(obj_data, dict) or 'obj_id' not in obj_data: print(f"Invalid object data for {action_type}: {obj_data}") return read_history_file() state = read_history_file() state = prune_inactive_players(state) if action_type == "place": if 'type' not in obj_data or obj_data['type'] not in PRIMITIVE_MAP.values(): print(f"Invalid object type: {obj_data.get('type', 'None')}") return state state["objects"][obj_data['obj_id']] = obj_data update_action_history(username, "place", { "obj_id": obj_data['obj_id'], "type": obj_data['type'], "position": obj_data['position'] }, state) elif action_type == "delete": if obj_data['obj_id'] in state["objects"]: obj_info = state["objects"][obj_data['obj_id']] del state["objects"][obj_data['obj_id']] update_action_history(username, "delete", { "obj_id": obj_data['obj_id'], "type": obj_info['type'], "position": obj_info['position'] }, state) write_history_file(state) log_action(username, action_type, obj_data) return state def log_action(username, action_type, data): timestamp = get_current_time_str() log_entry = { "timestamp": timestamp, "username": username, "action": action_type, "data": data } shared_log_file = os.path.join(HISTORY_LOG_DIR, "shared_history.jsonl") try: with open(shared_log_file, 'a', encoding='utf-8') as f: f.write(json.dumps(log_entry) + '\n') except Exception as e: print(f"Error writing to shared history log: {e}") clean_username = clean_filename_part(username) player_log_file = os.path.join(HISTORY_LOG_DIR, f"{clean_username}_history.jsonl") try: with open(player_log_file, 'a', encoding='utf-8') as f: f.write(json.dumps(log_entry) + '\n') except Exception as e: print(f"Error writing to player history log {player_log_file}: {e}") # ๐Ÿ“ฉ Save and Log Chat: Saves chat message to chat_logs and logs to history_logs async def save_and_log_chat(username, message, voice): if not message.strip(): print("Empty chat message, skipping save.") return None, None timestamp_str = get_current_time_str() entry = f"[{timestamp_str}] {username} ({voice}): {message}" md_filename_base = generate_filename(message, username, "md") md_file_path = os.path.join(CHAT_DIR, md_filename_base) try: ensure_dir(CHAT_DIR) with open(md_file_path, 'w', encoding='utf-8') as f: f.write(entry) except Exception as e: print(f"Error saving chat to {md_file_path}: {e}") return None, None if 'chat_history' not in st.session_state: st.session_state.chat_history = [] st.session_state.chat_history.append(entry) audio_file = None if st.session_state.get('enable_audio', True): text_cleaned = clean_text_for_tts(message) if text_cleaned and text_cleaned != "No text": filename_base = generate_filename(text_cleaned, username, "mp3") save_path = os.path.join(AUDIO_DIR, filename_base) ensure_dir(AUDIO_DIR) try: communicate = edge_tts.Communicate(text_cleaned, voice) await communicate.save(save_path) if os.path.exists(save_path) and os.path.getsize(save_path) > 0: audio_file = save_path else: print(f"Audio file {save_path} failed generation.") except Exception as e: print(f"Edge TTS Error: {e}") state = read_history_file() update_action_history(username, "chat", {"message": message}, state) return md_file_path, audio_file # ๐Ÿ’พ Save World State: Saves current state to a named file def save_world_state(world_name): if not world_name.strip(): st.error("World name cannot be empty.") return False clean_name = clean_filename_part(world_name) timestamp = get_current_time_str() filename = f"world_{clean_name}_{timestamp}.json" save_path = os.path.join(SAVED_WORLDS_DIR, filename) state = read_history_file() try: with open(save_path, 'w', encoding='utf-8') as f: json.dump(state, f, indent=2) print(f"Saved world state to {save_path}") st.success(f"Saved world as {filename}") return True except Exception as e: print(f"Error saving world state to {save_path}: {e}") st.error(f"Failed to save world: {e}") return False # ๐Ÿ“‚ Load World State: Loads a saved state from a file def load_world_state(filename): load_path = os.path.join(SAVED_WORLDS_DIR, filename) if not os.path.exists(load_path): st.error(f"World file not found: {filename}") return False try: with open(load_path, 'r', encoding='utf-8') as f: state = json.load(f) state.setdefault("objects", {}) state.setdefault("players", {}) state.setdefault("action_history", []) write_history_file(state) st.session_state.world_state = state st.session_state.action_history = state["action_history"] print(f"Loaded world state from {load_path}") st.success(f"Loaded world {filename}") st.rerun() return True except Exception as e: print(f"Error loading world state from {load_path}: {e}") st.error(f"Failed to load world: {e}") return False # ============================================================================== # JavaScript Message Handling # ============================================================================== def handle_js_messages(): message = st_javascript(""" window.addEventListener('message', (event) => { return JSON.stringify(event.data); }, {once: true}); return null; """) if message: try: data = json.loads(message) action = data.get("type") payload = data.get("payload", {}) username = payload.get("username", st.session_state.username) if action == "place_object": state = persist_world_objects(payload["object_data"], username, "place") st.session_state.world_state = state st.rerun() elif action == "delete_object": state = persist_world_objects({"obj_id": payload["obj_id"]}, username, "delete") st.session_state.world_state = state st.rerun() elif action == "move_player": state = update_player_state(username, payload["position"]) st.session_state.world_state = state st.rerun() except json.JSONDecodeError: print(f"Invalid JS message: {message}") except Exception as e: print(f"Error handling JS message: {e}") # ============================================================================== # User State & Session Init # ============================================================================== def save_username(username): try: with open(STATE_FILE, 'w') as f: f.write(username) except Exception as e: print(f"Failed save username: {e}") def load_username(): if os.path.exists(STATE_FILE): try: with open(STATE_FILE, 'r') as f: return f.read().strip() except Exception as e: print(f"Failed load username: {e}") return None def init_session_state(): defaults = { 'message_counter': 0, 'audio_cache': {}, 'tts_voice': "en-US-AriaNeural", 'chat_history': [], 'action_history': [], 'enable_audio': True, 'download_link_cache': {}, 'username': None, 'autosend': False, 'last_message': "", 'selected_object': 'None', 'paste_image_base64': "", 'new_world_name': "MyWorld", 'world_state': {"objects": {}, "players": {}, "action_history": []} } for k, v in defaults.items(): if k not in st.session_state: st.session_state[k] = v if not isinstance(st.session_state.audio_cache, dict): st.session_state.audio_cache = {} if not isinstance(st.session_state.download_link_cache, dict): st.session_state.download_link_cache = {} if not isinstance(st.session_state.action_history, list): st.session_state.action_history = [] if 'username' not in st.session_state or not st.session_state.username: saved_username = load_username() st.session_state.username = saved_username if saved_username else random.choice(list(FUN_USERNAMES.keys())) save_username(st.session_state.username) # Load history on startup state = read_history_file() st.session_state.world_state = state st.session_state.action_history = state.get("action_history", []) update_player_state(st.session_state.username) # ============================================================================== # Audio / TTS / Chat / File Handling Helpers # ============================================================================== def clean_text_for_tts(text): if not isinstance(text, str): return "No text" text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text) text = re.sub(r'[#*_`!]', '', text) text = ' '.join(text.split()) return text[:250] or "No text" def create_file(content, username, file_type="md", save_path=None): if not save_path: filename = generate_filename(content, username, file_type) save_path = os.path.join(MEDIA_DIR, filename) ensure_dir(os.path.dirname(save_path)) try: with open(save_path, 'w', encoding='utf-8') as f: f.write(content) return save_path except Exception as e: print(f"Error creating file {save_path}: {e}") return None def get_download_link(file_path, file_type="md"): if not file_path or not os.path.exists(file_path): basename = os.path.basename(file_path) if file_path else "N/A" return f"Not found: {basename}" try: mtime = os.path.getmtime(file_path) except OSError: mtime = 0 cache_key = f"dl_{file_path}_{mtime}" if 'download_link_cache' not in st.session_state: st.session_state.download_link_cache = {} if cache_key not in st.session_state.download_link_cache: try: with open(file_path, "rb") as f: b64 = base64.b64encode(f.read()).decode() mime_types = {"md": "text/markdown", "mp3": "audio/mpeg", "png": "image/png", "mp4": "video/mp4", "zip": "application/zip", "json": "application/json"} basename = os.path.basename(file_path) link_html = f'{FILE_EMOJIS.get(file_type, "๐Ÿ“„")}' st.session_state.download_link_cache[cache_key] = link_html except Exception as e: print(f"Error generating DL link for {file_path}: {e}") return f"Err" return st.session_state.download_link_cache.get(cache_key, "CacheErr") def play_and_download_audio(file_path): if file_path and os.path.exists(file_path): try: st.audio(file_path) file_type = file_path.split('.')[-1] st.markdown(get_download_link(file_path, file_type), unsafe_allow_html=True) except Exception as e: st.error(f"Audio display error for {os.path.basename(file_path)}: {e}") async def load_chat_history(): if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if not st.session_state.chat_history: ensure_dir(CHAT_DIR) print("Loading chat history from files...") chat_files = sorted(glob.glob(os.path.join(CHAT_DIR, "*.md")), key=os.path.getmtime) loaded_count = 0 temp_history = [] for f_path in chat_files: try: with open(f_path, 'r', encoding='utf-8') as file: temp_history.append(file.read().strip()) loaded_count += 1 except Exception as e: print(f"Err read chat {f_path}: {e}") st.session_state.chat_history = temp_history print(f"Loaded {loaded_count} chat entries from files.") return st.session_state.chat_history def create_zip_of_files(files_to_zip, prefix="Archive"): if not files_to_zip: st.warning("No files provided to zip.") return None timestamp = get_current_time_str() zip_name = f"{prefix}_{timestamp}.zip" try: print(f"Creating zip: {zip_name}...") with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as z: for f in files_to_zip: if os.path.exists(f): z.write(f, os.path.basename(f)) else: print(f"Skip zip missing: {f}") print("Zip success.") st.success(f"Created {zip_name}") return zip_name except Exception as e: print(f"Zip failed: {e}") st.error(f"Zip failed: {e}") return None def delete_files(file_patterns, exclude_files=None): protected = [STATE_FILE, "app.py", "index.html", "requirements.txt", "README.md", WORLD_STATE_FILE] if exclude_files: protected.extend(exclude_files) deleted_count = 0 errors = 0 for pattern in file_patterns: pattern_path = pattern print(f"Attempting to delete files matching: {pattern_path}") try: files_to_delete = glob.glob(pattern_path) if not files_to_delete: print(f"No files found for pattern: {pattern}") continue for f_path in files_to_delete: basename = os.path.basename(f_path) if os.path.isfile(f_path) and basename not in protected: try: os.remove(f_path) print(f"Deleted: {f_path}") deleted_count += 1 except Exception as e: print(f"Failed delete {f_path}: {e}") errors += 1 elif os.path.isdir(f_path): print(f"Skipping directory: {f_path}") except Exception as glob_e: print(f"Error matching pattern {pattern}: {glob_e}") errors += 1 msg = f"Deleted {deleted_count} files." if errors > 0: msg += f" Encountered {errors} errors." st.warning(msg) if errors > 0 else st.success(msg) if deleted_count > 0 else st.info("No matching files found to delete.") st.session_state['download_link_cache'] = {} st.session_state['audio_cache'] = {} async def save_pasted_image(image, username): if not image: return None try: img_hash = hashlib.md5(image.tobytes()).hexdigest()[:8] timestamp = get_current_time_str() filename = f"{timestamp}_pasted_{img_hash}.png" filepath = os.path.join(MEDIA_DIR, filename) image.save(filepath, "PNG") print(f"Pasted image saved: {filepath}") return filepath except Exception as e: print(f"Failed image save: {e}") return None def paste_image_component(): pasted_img = None img_type = None paste_input = st.text_area("Paste Image Data Here", key="paste_input_area", height=50, value="") if st.button("Process Pasted Image ๐Ÿ“‹", key="paste_form_button"): if paste_input and paste_input.startswith('data:image'): try: mime_type = paste_input.split(';')[0].split(':')[1] base64_str = paste_input.split(',')[1] img_bytes = base64.b64decode(base64_str) pasted_img = Image.open(io.BytesIO(img_bytes)) img_type = mime_type.split('/')[1] st.image(pasted_img, caption=f"Pasted ({img_type.upper()})", width=150) st.session_state.paste_image_base64 = base64_str except ImportError: st.error("Pillow library needed for image pasting.") except Exception as e: st.error(f"Img decode err: {e}") st.session_state.paste_image_base64 = "" else: st.warning("No valid image data pasted.") st.session_state.paste_image_base64 = "" return pasted_img class AudioProcessor: def __init__(self): self.cache_dir = AUDIO_CACHE_DIR ensure_dir(self.cache_dir) self.metadata = json.load(open(f"{self.cache_dir}/metadata.json", 'r')) if os.path.exists(f"{self.cache_dir}/metadata.json") else {} def _save_metadata(self): try: with open(f"{self.cache_dir}/metadata.json", 'w') as f: json.dump(self.metadata, f, indent=2) except Exception as e: print(f"Failed metadata save: {e}") async def create_audio(self, text, voice='en-US-AriaNeural'): cache_key = hashlib.md5(f"{text[:150]}:{voice}".encode()).hexdigest() cache_path = os.path.join(self.cache_dir, f"{cache_key}.mp3") if cache_key in self.metadata and os.path.exists(cache_path): return cache_path text_cleaned = clean_text_for_tts(text) if not text_cleaned: return None ensure_dir(os.path.dirname(cache_path)) try: communicate = edge_tts.Communicate(text_cleaned, voice) await communicate.save(cache_path) if os.path.exists(cache_path) and os.path.getsize(cache_path) > 0: self.metadata[cache_key] = {'timestamp': datetime.now().isoformat(), 'text_length': len(text_cleaned), 'voice': voice} self._save_metadata() return cache_path else: return None except Exception as e: print(f"TTS Create Audio Error: {e}") return None def process_pdf_tab(pdf_file, max_pages, voice): st.subheader("PDF Processing Results") if pdf_file is None: st.info("Upload a PDF file and click 'Process PDF' to begin.") return audio_processor = AudioProcessor() try: reader = PdfReader(pdf_file) if reader.is_encrypted: st.warning("PDF is encrypted.") return total_pages = min(len(reader.pages), max_pages) st.write(f"Processing first {total_pages} pages of '{pdf_file.name}'...") texts, audios = {}, {} page_threads = [] results_lock = threading.Lock() def process_page_sync(page_num, page_text): async def run_async_audio(): return await audio_processor.create_audio(page_text, voice) try: audio_path = asyncio.run(run_async_audio()) if audio_path: with results_lock: audios[page_num] = audio_path except Exception as page_e: print(f"Err process page {page_num+1}: {page_e}") for i in range(total_pages): try: page = reader.pages[i] text = page.extract_text() if text and text.strip(): texts[i] = text thread = threading.Thread(target=process_page_sync, args=(i, text)) page_threads.append(thread) thread.start() else: texts[i] = "[No text extracted]" except Exception as extract_e: texts[i] = f"[Error extract: {extract_e}]" print(f"Error page {i+1} extract: {extract_e}") progress_bar = st.progress(0.0, text="Processing pages...") total_threads = len(page_threads) start_join_time = time.time() while any(t.is_alive() for t in page_threads): completed_threads = total_threads - sum(t.is_alive() for t in page_threads) progress = completed_threads / total_threads if total_threads > 0 else 1.0 progress_bar.progress(min(progress, 1.0), text=f"Processed {completed_threads}/{total_threads} pages...") if time.time() - start_join_time > 600: print("PDF processing timed out.") break time.sleep(0.5) progress_bar.progress(1.0, text="Processing complete.") for i in range(total_pages): with st.expander(f"Page {i+1}"): st.markdown(texts.get(i, "[Error getting text]")) audio_file = audios.get(i) if audio_file: play_and_download_audio(audio_file) else: st.caption("Audio generation failed or was skipped.") except Exception as pdf_e: st.error(f"Err read PDF: {pdf_e}") st.exception(pdf_e) # ============================================================================== # Streamlit UI Layout Functions # ============================================================================== def render_sidebar(): with st.sidebar: st.header("๐Ÿ’พ World State") st.caption("Manage the shared world state.") # Active Players state = read_history_file() players = state.get("players", {}) st.subheader("Active Players") current_time = time.time() for username, data in players.items(): last_action = data.get("last_action_timestamp", 0) minutes_ago = (current_time - last_action) / 60 st.write(f"{username}: Last active {minutes_ago:.1f} minutes ago") # Action History Dataset st.subheader("Action History") history_data = [] for entry in st.session_state.action_history: data = entry["data"] if entry["action"] in ["place", "delete"]: pos = data.get("position", {}) position_str = f"({pos.get('x', 0):.1f}, {pos.get('y', 0):.1f}, {pos.get('z', 0):.1f})" history_data.append({ "Time": entry["timestamp"], "Player": entry["username"], "Action": entry["action"].capitalize(), "Object Type": data.get("type", "N/A"), "Position": position_str }) elif entry["action"] == "move": pos = data.get("position", {}) position_str = f"({pos.get('x', 0):.1f}, {pos.get('y', 0):.1f}, {pos.get('z', 0):.1f})" history_data.append({ "Time": entry["timestamp"], "Player": entry["username"], "Action": "Move", "Object Type": "Player", "Position": position_str }) elif entry["action"] == "chat": history_data.append({ "Time": entry["timestamp"], "Player": entry["username"], "Action": "Chat", "Object Type": "Message", "Position": data.get("message", "N/A")[:50] }) if history_data: st.dataframe(pd.DataFrame(history_data), height=200, use_container_width=True) else: st.caption("No actions recorded yet.") # Save World st.subheader("Save World") world_name = st.text_input("World Name", value="MyWorld", key="save_world_name") if st.button("๐Ÿ’พ Save World", key="save_world"): save_world_state(world_name) # Load World st.subheader("Load World") saved_worlds = [os.path.basename(f) for f in glob.glob(os.path.join(SAVED_WORLDS_DIR, "world_*.json"))] selected_world = st.selectbox("Select Saved World", ["None"] + saved_worlds, key="load_world_select") if selected_world != "None" and st.button("๐Ÿ“‚ Load World", key="load_world"): load_world_state(selected_world) st.markdown("---") st.header("๐Ÿ—๏ธ Build Tools") st.caption("Select an object to place.") # CSS for tool buttons st.markdown(""" """, unsafe_allow_html=True) # Tool buttons current_tool = st.session_state.get('selected_object', 'None') cols = st.columns(5) col_idx = 0 for emoji, name in PRIMITIVE_MAP.items(): button_key = f"primitive_{name}" button_type = "primary" if current_tool == name else "secondary" if cols[col_idx % 5].button(emoji, key=button_key, help=name, type=button_type, use_container_width=True): if st.session_state.selected_object != name: st.session_state.selected_object = name update_player_state(st.session_state.username) update_action_history(st.session_state.username, "tool_change", {"tool": name}, read_history_file()) st.rerun() col_idx += 1 st.markdown("---") if st.button("๐Ÿšซ Clear Tool", key="clear_tool", use_container_width=True): if st.session_state.selected_object != 'None': st.session_state.selected_object = 'None' update_player_state(st.session_state.username) update_action_history(st.session_state.username, "tool_change", {"tool": "None"}, read_history_file()) st.rerun() st.markdown("---") st.header("๐Ÿ—ฃ๏ธ Voice & User") current_username = st.session_state.get('username', list(FUN_USERNAMES.keys())[0]) username_options = list(FUN_USERNAMES.keys()) current_index = 0 try: current_index = username_options.index(current_username) except ValueError: current_index = 0 new_username = st.selectbox("Change Name/Voice", options=username_options, index=current_index, key="username_select", format_func=lambda x: x.split(" ")[0]) if new_username != st.session_state.username: old_username = st.session_state.username state = read_history_file() if old_username in state["players"]: state["players"][new_username] = state["players"].pop(old_username) write_history_file(state) st.session_state.username = new_username st.session_state.tts_voice = FUN_USERNAMES[new_username] save_username(st.session_state.username) update_player_state(st.session_state.username) update_action_history(st.session_state.username, "rename", {"old_username": old_username, "new_username": new_username}, state) st.rerun() st.session_state['enable_audio'] = st.toggle("Enable TTS Audio", value=st.session_state.get('enable_audio', True)) def render_main_content(): st.title(f"{Site_Name} - User: {st.session_state.username}") tab_world, tab_chat, tab_pdf, tab_files = st.tabs(["๐Ÿ—๏ธ World Builder", "๐Ÿ—ฃ๏ธ Chat", "๐Ÿ“š PDF Tools", "๐Ÿ“‚ Files & Settings"]) with tab_world: st.header("Shared 3D World") st.caption("Click to place objects with the selected tool, or click to move player. Right-click to delete. State is saved in history.json.") state = st.session_state.world_state html_file_path = 'index.html' try: with open(html_file_path, 'r', encoding='utf-8') as f: html_template = f.read() js_injection_script = f"""""" html_content_with_state = html_template.replace('', js_injection_script + '\n', 1) components.html(html_content_with_state, height=700, scrolling=False) handle_js_messages() except FileNotFoundError: st.error(f"CRITICAL ERROR: Could not find '{html_file_path}'.") except Exception as e: st.error(f"Error loading 3D component: {e}") st.exception(e) with tab_chat: st.header(f"{START_ROOM} Chat") chat_history_task = asyncio.run(load_chat_history()) chat_container = st.container(height=500) with chat_container: if chat_history_task: st.markdown("----\n".join(reversed(chat_history_task[-50:]))) else: st.caption("No chat messages yet.") message_value = st.text_input("Your Message:", key=f"message_input_{st.session_state.get('message_counter', 0)}", label_visibility="collapsed") send_button_clicked = st.button("Send Chat ๐Ÿ’ฌ", key="send_chat_button") should_autosend = st.session_state.get('autosend', False) and message_value if send_button_clicked or should_autosend: message_to_send = message_value if message_to_send.strip() and message_to_send != st.session_state.get('last_message', ''): st.session_state.last_message = message_to_send voice = FUN_USERNAMES.get(st.session_state.username, "en-US-AriaNeural") asyncio.run(save_and_log_chat(st.session_state.username, message_to_send, voice)) update_player_state(st.session_state.username) st.session_state.message_counter = st.session_state.get('message_counter', 0) + 1 st.rerun() elif send_button_clicked: st.toast("Message empty or same as last.") st.checkbox("Autosend Chat", key="autosend") with tab_files: st.header("๐Ÿ“‚ Files & Settings") st.subheader("๐Ÿ’พ World State Management") if st.button("Clear World State", key="clear_world_state"): state = {"objects": {}, "players": {}, "action_history": []} write_history_file(state) st.session_state.world_state = state st.session_state.action_history = [] st.success("World state cleared!") st.rerun() st.subheader("๐Ÿ“ฆ Download Archives") zip_files = sorted(glob.glob(os.path.join(MEDIA_DIR, "*.zip")), key=os.path.getmtime, reverse=True) if zip_files: col_zip1, col_zip2, col_zip3 = st.columns(3) with col_zip1: if st.button("Zip Worlds"): create_zip_of_files([WORLD_STATE_FILE] + glob.glob(os.path.join(SAVED_WORLDS_DIR, "world_*.json")), "Worlds") with col_zip2: if st.button("Zip Chats"): create_zip_of_files(glob.glob(os.path.join(CHAT_DIR, "*.md")), "Chats") with col_zip3: if st.button("Zip Audio"): create_zip_of_files(glob.glob(os.path.join(AUDIO_DIR, "*.mp3")) + glob.glob(os.path.join(AUDIO_CACHE_DIR, "*.mp3")), "Audio") st.caption("Existing Zip Files:") for zip_file in zip_files: st.markdown(get_download_link(zip_file, "zip"), unsafe_allow_html=True) else: st.caption("No zip archives found.") # ============================================================================== # Main Execution Logic # ============================================================================== if __name__ == "__main__": init_session_state() render_sidebar() render_main_content()