import streamlit as st import os import random import time from PIL import Image import json from datetime import datetime from pathlib import Path import base64 from io import BytesIO import numpy as np import asyncio import threading from typing import Optional import queue import atexit # ๐ŸŽฎ Game Constants GRID_WIDTH = 16 GRID_HEIGHT = 9 REFRESH_RATE = 5 INTERACTION_RADIUS = 2 POINTS_PER_INTERACTION = 1 from streamlit.components.v1 import html # ๐Ÿ•’ Timer Management class GameTimer: def __init__(self): self.running = True self._loop: Optional[asyncio.AbstractEventLoop] = None self.update_queue = queue.Queue() self.thread = threading.Thread(target=self._run_timer, daemon=True) self.thread.start() def _run_timer(self): self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self._loop.run_until_complete(self._timer_loop()) async def _timer_loop(self): while self.running: try: current_time = time.time() self.update_queue.put(('update', current_time)) await asyncio.sleep(REFRESH_RATE) except Exception as e: print(f"Timer error: {e}") await asyncio.sleep(1) def stop(self): self.running = False if self._loop: self._loop.stop() # First define the load_game_state function def load_game_state(): if 'player_name' in st.query_params: st.session_state.player_name = st.query_params.player_name st.session_state.position = { 'x': int(st.query_params.get('x', random.randint(0, GRID_WIDTH - 1))), 'y': int(st.query_params.get('y', random.randint(0, GRID_HEIGHT - 1))) } st.session_state.character_stats = { 'STR': int(st.query_params.get('STR', 10)), 'DEX': int(st.query_params.get('DEX', 10)), 'CON': int(st.query_params.get('CON', 10)), 'INT': int(st.query_params.get('INT', 10)), 'WIS': int(st.query_params.get('WIS', 10)), 'CHA': int(st.query_params.get('CHA', 10)), 'HP': int(st.query_params.get('HP', 20)), 'MAX_HP': int(st.query_params.get('MAX_HP', 40)), 'score': int(st.query_params.get('score', 0)), 'created_at': float(st.query_params.get('created_at', time.time())) } # Then initialize session state if 'initialized' not in st.session_state: st.session_state.initialized = False st.session_state.game_state = { 'players': {}, 'chat_messages': [], 'last_sync': time.time() } st.session_state.player_name = None st.session_state.character_stats = { 'STR': 10, 'DEX': 10, 'CON': 10, 'INT': 10, 'WIS': 10, 'CHA': 10, 'HP': 20, 'MAX_HP': 40, 'score': 0, 'created_at': time.time() } st.session_state.position = { 'x': random.randint(0, GRID_WIDTH - 1), 'y': random.randint(0, GRID_HEIGHT - 1) } st.session_state.last_move = time.time() st.session_state.nearby_players = [] # Now call load_game_state after it's been defined load_game_state() # Make sure all other functions that use load_game_state are defined after this point # Initialize timer in session state if 'game_timer' not in st.session_state: st.session_state.game_timer = GameTimer() # Also modify the initialization section to ensure character_stats is properly initialized if 'initialized' not in st.session_state: st.session_state.initialized = False st.session_state.game_state = { 'players': {}, 'chat_messages': [], 'last_sync': time.time() } st.session_state.player_name = None st.session_state.character_stats = { 'STR': 10, 'DEX': 10, 'CON': 10, 'INT': 10, 'WIS': 10, 'CHA': 10, 'HP': 20, 'MAX_HP': 40, 'score': 0, 'created_at': time.time() } st.session_state.position = { 'x': random.randint(0, GRID_WIDTH - 1), 'y': random.randint(0, GRID_HEIGHT - 1) } st.session_state.last_move = time.time() st.session_state.nearby_players = [] load_game_state() # ๐Ÿ“ Name Generation def generate_fantasy_name(): prefixes = ['Aer', 'Bal', 'Cal', 'Dor', 'El', 'Fae', 'Gor', 'Hel', 'Il', 'Jor', 'Kal', 'Lyr', 'Mel', 'Nym', 'Oro', 'Pyr', 'Qar', 'Ryn', 'Syl', 'Tyr'] suffixes = ['ian', 'or', 'ion', 'us', 'ix', 'ar', 'en', 'yr', 'el', 'an', 'is', 'ax', 'on', 'ir', 'ex', 'az', 'er', 'eth', 'ys', 'ix'] return random.choice(prefixes) + random.choice(suffixes) # ๐ŸŽจ Resource Management @st.cache_resource(show_spinner="Loading game resources...") def get_game_images(): return { 'tile': Image.new('RGB', (50, 50), color='green'), 'player': Image.new('RGB', (50, 50), color='blue'), 'other_player': Image.new('RGB', (50, 50), color='red') } # ๐Ÿ”„ Player Synchronization @st.cache_data(ttl=2) def load_all_players(timestamp): players = {} if os.path.exists('players'): for filename in os.listdir('players'): if filename.endswith('.json'): with open(f"players/{filename}", 'r') as f: try: player_data = json.load(f) if time.time() - player_data['last_update'] < 60: players[player_data['name']] = player_data except json.JSONDecodeError: continue return {'players': players, 'last_update': timestamp} def clear_caches(): st.cache_resource.clear() st.cache_data.clear() # ๐Ÿ”„ Auto-Refresh Implementation def create_autorefresh(): refresh_html = """ """ st.components.v1.html(refresh_html, height=0) # ๐Ÿ’พ State Management def save_game_state(): if st.session_state.player_name and st.session_state.character_stats: params = { 'player_name': st.session_state.player_name, 'x': str(st.session_state.position['x']), 'y': str(st.session_state.position['y']), **{k: str(v) for k, v in st.session_state.character_stats.items()}, 'last_sync': str(time.time()) } st.query_params.from_dict(params) def load_game_state(): if 'player_name' in st.query_params: st.session_state.player_name = st.query_params.player_name st.session_state.position = { 'x': int(st.query_params.get('x', random.randint(0, GRID_WIDTH - 1))), 'y': int(st.query_params.get('y', random.randint(0, GRID_HEIGHT - 1))) } st.session_state.character_stats = { 'STR': int(st.query_params.get('STR', 10)), 'DEX': int(st.query_params.get('DEX', 10)), 'CON': int(st.query_params.get('CON', 10)), 'INT': int(st.query_params.get('INT', 10)), 'WIS': int(st.query_params.get('WIS', 10)), 'CHA': int(st.query_params.get('CHA', 10)), 'HP': int(st.query_params.get('HP', 20)), 'MAX_HP': int(st.query_params.get('MAX_HP', 40)), 'score': int(st.query_params.get('score', 0)), 'created_at': float(st.query_params.get('created_at', time.time())) } # ๐ŸŽฒ Game State Initialization if 'initialized' not in st.session_state: st.session_state.initialized = False st.session_state.game_state = { 'players': {}, 'chat_messages': [], 'last_sync': time.time() } st.session_state.player_name = None st.session_state.character_stats = None st.session_state.position = { 'x': random.randint(0, GRID_WIDTH - 1), 'y': random.randint(0, GRID_HEIGHT - 1) } st.session_state.last_move = time.time() st.session_state.nearby_players = [] load_game_state() # ๐ŸŽฎ Player State Management def save_player_state(): if st.session_state.player_name: player_data = { 'name': st.session_state.player_name, 'position': st.session_state.position, 'stats': st.session_state.character_stats, 'last_update': time.time() } os.makedirs('players', exist_ok=True) with open(f"players/{st.session_state.player_name}.json", 'w') as f: json.dump(player_data, f) save_game_state() # ๐Ÿ“ Distance Calculation def calculate_distance(pos1, pos2): dx = min(abs(pos1['x'] - pos2['x']), GRID_WIDTH - abs(pos1['x'] - pos2['x'])) dy = min(abs(pos1['y'] - pos2['y']), GRID_HEIGHT - abs(pos1['y'] - pos2['y'])) return dx + dy # ๐Ÿƒโ€โ™‚๏ธ Movement System def update_position(direction): if direction == "up": st.session_state.position['y'] = (st.session_state.position['y'] - 1) % GRID_HEIGHT elif direction == "down": st.session_state.position['y'] = (st.session_state.position['y'] + 1) % GRID_HEIGHT elif direction == "left": st.session_state.position['x'] = (st.session_state.position['x'] - 1) % GRID_WIDTH elif direction == "right": st.session_state.position['x'] = (st.session_state.position['x'] + 1) % GRID_WIDTH st.session_state.last_move = time.time() save_player_state() # ๐Ÿ‘ฅ Nearby Players Update def update_nearby_players(): current_time = time.time() all_players = load_all_players(current_time)['players'] nearby = [] score_gain = 0 for player_name, player_data in all_players.items(): if player_name != st.session_state.player_name: distance = calculate_distance(st.session_state.position, player_data['position']) if distance <= INTERACTION_RADIUS: nearby.append({ 'name': player_name, 'distance': distance, 'score': player_data['stats']['score'], 'last_seen': current_time - player_data['last_update'] }) score_gain += POINTS_PER_INTERACTION if score_gain > 0: st.session_state.character_stats['score'] += score_gain save_player_state() st.session_state.nearby_players = nearby # Modify the create_game_board function to add safety checks def create_game_board(): current_time = time.time() all_players = load_all_players(current_time)['players'] images = get_game_images() # Add safety check for character_stats if not st.session_state.character_stats: st.session_state.character_stats = { 'STR': 10, 'DEX': 10, 'CON': 10, 'INT': 10, 'WIS': 10, 'CHA': 10, 'HP': 20, 'MAX_HP': 40, 'score': 0, 'created_at': time.time() } save_player_state() for y in range(GRID_HEIGHT): cols = st.columns(GRID_WIDTH) for x in range(GRID_WIDTH): player_here = None for player_name, player_data in all_players.items(): if (player_data['position']['x'] == x and player_data['position']['y'] == y): player_here = player_name if x == st.session_state.position['x'] and y == st.session_state.position['y']: cols[x].image(images['player'], use_column_width=True) # Add safety check when accessing score score = st.session_state.character_stats.get('score', 0) cols[x].markdown(f"**You** ({score} pts)") elif player_here: cols[x].image(images['other_player'], use_column_width=True) player_score = all_players[player_here]['stats'].get('score', 0) cols[x].markdown(f"**{player_here}** ({player_score} pts)") else: cols[x].image(images['tile'], use_column_width=True) # ๐ŸŽฎ Main Game Loop def main(): create_autorefresh() # Process any queued updates try: while not st.session_state.game_timer.update_queue.empty(): update_type, timestamp = st.session_state.game_timer.update_queue.get_nowait() if update_type == 'update': update_nearby_players() save_player_state() except queue.Empty: pass st.sidebar.title("Player Info") if st.session_state.player_name is None: default_name = generate_fantasy_name() player_name = st.sidebar.text_input("Enter your name or use generated name:", value=default_name) if st.sidebar.button("Start Playing"): st.session_state.player_name = player_name if st.session_state.character_stats is None: st.session_state.character_stats = { 'STR': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), 'DEX': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), 'CON': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), 'INT': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), 'WIS': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), 'CHA': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), 'HP': random.randint(1, 20) * 2 + random.randint(1, 20), 'MAX_HP': 40, 'score': 0, 'created_at': time.time() } save_player_state() st.rerun() else: st.sidebar.markdown("### Nearby Players") for player in st.session_state.nearby_players: st.sidebar.markdown( f"**{player['name']}** - {player['distance']} tiles away - {player['score']} pts\n" f"Last seen: {player['last_seen']:.1f}s ago" ) st.sidebar.markdown("### Movement Controls") move_cols = st.sidebar.columns(3) if move_cols[1].button("โฌ†๏ธ", key="up"): update_position("up") st.rerun() cols = st.sidebar.columns(3) if cols[0].button("โฌ…๏ธ", key="left"): update_position("left") st.rerun() if cols[1].button("โฌ‡๏ธ", key="down"): update_position("down") st.rerun() if cols[2].button("โžก๏ธ", key="right"): update_position("right") st.rerun() if st.sidebar.button("Clear Game State"): st.session_state.game_timer.stop() st.query_params.clear() clear_caches() st.rerun() st.title("Multiplayer Tile Game") create_game_board() # Add this in your main() function, right after create_autorefresh() # Add the time display component st.markdown("""
""", unsafe_allow_html=True) # Mount the React component html("""
""", height=200) # ๐Ÿงน Cleanup on exit def cleanup(): if 'game_timer' in st.session_state: st.session_state.game_timer.stop() # Register cleanup atexit.register(cleanup) if __name__ == "__main__": main()