Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import random | |
import time | |
from PIL import Image | |
import json | |
from datetime import datetime | |
from pathlib import Path | |
import base64 | |
from io import BytesIO | |
import numpy as np | |
import asyncio | |
import threading | |
from typing import Optional | |
import queue | |
import atexit | |
# 🎮 Game Constants | |
GRID_WIDTH = 16 | |
GRID_HEIGHT = 9 | |
REFRESH_RATE = 5 | |
INTERACTION_RADIUS = 2 | |
POINTS_PER_INTERACTION = 1 | |
from streamlit.components.v1 import html | |
# 🕒 Timer Management | |
class GameTimer: | |
def __init__(self): | |
self.running = True | |
self._loop: Optional[asyncio.AbstractEventLoop] = None | |
self.update_queue = queue.Queue() | |
self.thread = threading.Thread(target=self._run_timer, daemon=True) | |
self.thread.start() | |
def _run_timer(self): | |
self._loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self._loop) | |
self._loop.run_until_complete(self._timer_loop()) | |
async def _timer_loop(self): | |
while self.running: | |
try: | |
current_time = time.time() | |
self.update_queue.put(('update', current_time)) | |
await asyncio.sleep(REFRESH_RATE) | |
except Exception as e: | |
print(f"Timer error: {e}") | |
await asyncio.sleep(1) | |
def stop(self): | |
self.running = False | |
if self._loop: | |
self._loop.stop() | |
# First define the load_game_state function | |
def load_game_state(): | |
if 'player_name' in st.query_params: | |
st.session_state.player_name = st.query_params.player_name | |
st.session_state.position = { | |
'x': int(st.query_params.get('x', random.randint(0, GRID_WIDTH - 1))), | |
'y': int(st.query_params.get('y', random.randint(0, GRID_HEIGHT - 1))) | |
} | |
st.session_state.character_stats = { | |
'STR': int(st.query_params.get('STR', 10)), | |
'DEX': int(st.query_params.get('DEX', 10)), | |
'CON': int(st.query_params.get('CON', 10)), | |
'INT': int(st.query_params.get('INT', 10)), | |
'WIS': int(st.query_params.get('WIS', 10)), | |
'CHA': int(st.query_params.get('CHA', 10)), | |
'HP': int(st.query_params.get('HP', 20)), | |
'MAX_HP': int(st.query_params.get('MAX_HP', 40)), | |
'score': int(st.query_params.get('score', 0)), | |
'created_at': float(st.query_params.get('created_at', time.time())) | |
} | |
# Then initialize session state | |
if 'initialized' not in st.session_state: | |
st.session_state.initialized = False | |
st.session_state.game_state = { | |
'players': {}, | |
'chat_messages': [], | |
'last_sync': time.time() | |
} | |
st.session_state.player_name = None | |
st.session_state.character_stats = { | |
'STR': 10, | |
'DEX': 10, | |
'CON': 10, | |
'INT': 10, | |
'WIS': 10, | |
'CHA': 10, | |
'HP': 20, | |
'MAX_HP': 40, | |
'score': 0, | |
'created_at': time.time() | |
} | |
st.session_state.position = { | |
'x': random.randint(0, GRID_WIDTH - 1), | |
'y': random.randint(0, GRID_HEIGHT - 1) | |
} | |
st.session_state.last_move = time.time() | |
st.session_state.nearby_players = [] | |
# Now call load_game_state after it's been defined | |
load_game_state() | |
# Make sure all other functions that use load_game_state are defined after this point | |
# Initialize timer in session state | |
if 'game_timer' not in st.session_state: | |
st.session_state.game_timer = GameTimer() | |
# Also modify the initialization section to ensure character_stats is properly initialized | |
if 'initialized' not in st.session_state: | |
st.session_state.initialized = False | |
st.session_state.game_state = { | |
'players': {}, | |
'chat_messages': [], | |
'last_sync': time.time() | |
} | |
st.session_state.player_name = None | |
st.session_state.character_stats = { | |
'STR': 10, | |
'DEX': 10, | |
'CON': 10, | |
'INT': 10, | |
'WIS': 10, | |
'CHA': 10, | |
'HP': 20, | |
'MAX_HP': 40, | |
'score': 0, | |
'created_at': time.time() | |
} | |
st.session_state.position = { | |
'x': random.randint(0, GRID_WIDTH - 1), | |
'y': random.randint(0, GRID_HEIGHT - 1) | |
} | |
st.session_state.last_move = time.time() | |
st.session_state.nearby_players = [] | |
load_game_state() | |
# 📝 Name Generation | |
def generate_fantasy_name(): | |
prefixes = ['Aer', 'Bal', 'Cal', 'Dor', 'El', 'Fae', 'Gor', 'Hel', 'Il', 'Jor', | |
'Kal', 'Lyr', 'Mel', 'Nym', 'Oro', 'Pyr', 'Qar', 'Ryn', 'Syl', 'Tyr'] | |
suffixes = ['ian', 'or', 'ion', 'us', 'ix', 'ar', 'en', 'yr', 'el', 'an', | |
'is', 'ax', 'on', 'ir', 'ex', 'az', 'er', 'eth', 'ys', 'ix'] | |
return random.choice(prefixes) + random.choice(suffixes) | |
# 🎨 Resource Management | |
def get_game_images(): | |
return { | |
'tile': Image.new('RGB', (50, 50), color='green'), | |
'player': Image.new('RGB', (50, 50), color='blue'), | |
'other_player': Image.new('RGB', (50, 50), color='red') | |
} | |
# 🔄 Player Synchronization | |
def load_all_players(timestamp): | |
players = {} | |
if os.path.exists('players'): | |
for filename in os.listdir('players'): | |
if filename.endswith('.json'): | |
with open(f"players/{filename}", 'r') as f: | |
try: | |
player_data = json.load(f) | |
if time.time() - player_data['last_update'] < 60: | |
players[player_data['name']] = player_data | |
except json.JSONDecodeError: | |
continue | |
return {'players': players, 'last_update': timestamp} | |
def clear_caches(): | |
st.cache_resource.clear() | |
st.cache_data.clear() | |
# 🔄 Auto-Refresh Implementation | |
def create_autorefresh(): | |
refresh_html = """ | |
<script> | |
let lastUpdate = Date.now(); | |
let updateInterval = null; | |
function checkForUpdates() { | |
const now = Date.now(); | |
if (now - lastUpdate >= 5000) { | |
lastUpdate = now; | |
window.parent.document.querySelector('.stApp').classList.add('refreshing'); | |
window.location.reload(); | |
} | |
} | |
function startAutoRefresh() { | |
if (!updateInterval) { | |
updateInterval = setInterval(checkForUpdates, 1000); | |
} | |
document.addEventListener('visibilitychange', function() { | |
if (!document.hidden) { | |
lastUpdate = Date.now() - 4900; | |
} | |
}); | |
} | |
if (document.readyState === 'complete') { | |
startAutoRefresh(); | |
} else { | |
window.addEventListener('load', startAutoRefresh); | |
} | |
// Cleanup on unload | |
window.addEventListener('unload', function() { | |
if (updateInterval) { | |
clearInterval(updateInterval); | |
} | |
}); | |
</script> | |
""" | |
st.components.v1.html(refresh_html, height=0) | |
# 💾 State Management | |
def save_game_state(): | |
if st.session_state.player_name and st.session_state.character_stats: | |
params = { | |
'player_name': st.session_state.player_name, | |
'x': str(st.session_state.position['x']), | |
'y': str(st.session_state.position['y']), | |
**{k: str(v) for k, v in st.session_state.character_stats.items()}, | |
'last_sync': str(time.time()) | |
} | |
st.query_params.from_dict(params) | |
def load_game_state(): | |
if 'player_name' in st.query_params: | |
st.session_state.player_name = st.query_params.player_name | |
st.session_state.position = { | |
'x': int(st.query_params.get('x', random.randint(0, GRID_WIDTH - 1))), | |
'y': int(st.query_params.get('y', random.randint(0, GRID_HEIGHT - 1))) | |
} | |
st.session_state.character_stats = { | |
'STR': int(st.query_params.get('STR', 10)), | |
'DEX': int(st.query_params.get('DEX', 10)), | |
'CON': int(st.query_params.get('CON', 10)), | |
'INT': int(st.query_params.get('INT', 10)), | |
'WIS': int(st.query_params.get('WIS', 10)), | |
'CHA': int(st.query_params.get('CHA', 10)), | |
'HP': int(st.query_params.get('HP', 20)), | |
'MAX_HP': int(st.query_params.get('MAX_HP', 40)), | |
'score': int(st.query_params.get('score', 0)), | |
'created_at': float(st.query_params.get('created_at', time.time())) | |
} | |
# 🎲 Game State Initialization | |
if 'initialized' not in st.session_state: | |
st.session_state.initialized = False | |
st.session_state.game_state = { | |
'players': {}, | |
'chat_messages': [], | |
'last_sync': time.time() | |
} | |
st.session_state.player_name = None | |
st.session_state.character_stats = None | |
st.session_state.position = { | |
'x': random.randint(0, GRID_WIDTH - 1), | |
'y': random.randint(0, GRID_HEIGHT - 1) | |
} | |
st.session_state.last_move = time.time() | |
st.session_state.nearby_players = [] | |
load_game_state() | |
# 🎮 Player State Management | |
def save_player_state(): | |
if st.session_state.player_name: | |
player_data = { | |
'name': st.session_state.player_name, | |
'position': st.session_state.position, | |
'stats': st.session_state.character_stats, | |
'last_update': time.time() | |
} | |
os.makedirs('players', exist_ok=True) | |
with open(f"players/{st.session_state.player_name}.json", 'w') as f: | |
json.dump(player_data, f) | |
save_game_state() | |
# 📏 Distance Calculation | |
def calculate_distance(pos1, pos2): | |
dx = min(abs(pos1['x'] - pos2['x']), GRID_WIDTH - abs(pos1['x'] - pos2['x'])) | |
dy = min(abs(pos1['y'] - pos2['y']), GRID_HEIGHT - abs(pos1['y'] - pos2['y'])) | |
return dx + dy | |
# 🏃♂️ Movement System | |
def update_position(direction): | |
if direction == "up": | |
st.session_state.position['y'] = (st.session_state.position['y'] - 1) % GRID_HEIGHT | |
elif direction == "down": | |
st.session_state.position['y'] = (st.session_state.position['y'] + 1) % GRID_HEIGHT | |
elif direction == "left": | |
st.session_state.position['x'] = (st.session_state.position['x'] - 1) % GRID_WIDTH | |
elif direction == "right": | |
st.session_state.position['x'] = (st.session_state.position['x'] + 1) % GRID_WIDTH | |
st.session_state.last_move = time.time() | |
save_player_state() | |
# 👥 Nearby Players Update | |
def update_nearby_players(): | |
current_time = time.time() | |
all_players = load_all_players(current_time)['players'] | |
nearby = [] | |
score_gain = 0 | |
for player_name, player_data in all_players.items(): | |
if player_name != st.session_state.player_name: | |
distance = calculate_distance(st.session_state.position, player_data['position']) | |
if distance <= INTERACTION_RADIUS: | |
nearby.append({ | |
'name': player_name, | |
'distance': distance, | |
'score': player_data['stats']['score'], | |
'last_seen': current_time - player_data['last_update'] | |
}) | |
score_gain += POINTS_PER_INTERACTION | |
if score_gain > 0: | |
st.session_state.character_stats['score'] += score_gain | |
save_player_state() | |
st.session_state.nearby_players = nearby | |
# Modify the create_game_board function to add safety checks | |
def create_game_board(): | |
current_time = time.time() | |
all_players = load_all_players(current_time)['players'] | |
images = get_game_images() | |
# Add safety check for character_stats | |
if not st.session_state.character_stats: | |
st.session_state.character_stats = { | |
'STR': 10, | |
'DEX': 10, | |
'CON': 10, | |
'INT': 10, | |
'WIS': 10, | |
'CHA': 10, | |
'HP': 20, | |
'MAX_HP': 40, | |
'score': 0, | |
'created_at': time.time() | |
} | |
save_player_state() | |
for y in range(GRID_HEIGHT): | |
cols = st.columns(GRID_WIDTH) | |
for x in range(GRID_WIDTH): | |
player_here = None | |
for player_name, player_data in all_players.items(): | |
if (player_data['position']['x'] == x and | |
player_data['position']['y'] == y): | |
player_here = player_name | |
if x == st.session_state.position['x'] and y == st.session_state.position['y']: | |
cols[x].image(images['player'], use_column_width=True) | |
# Add safety check when accessing score | |
score = st.session_state.character_stats.get('score', 0) | |
cols[x].markdown(f"**You** ({score} pts)") | |
elif player_here: | |
cols[x].image(images['other_player'], use_column_width=True) | |
player_score = all_players[player_here]['stats'].get('score', 0) | |
cols[x].markdown(f"**{player_here}** ({player_score} pts)") | |
else: | |
cols[x].image(images['tile'], use_column_width=True) | |
# 🎮 Main Game Loop | |
def main(): | |
create_autorefresh() | |
# Process any queued updates | |
try: | |
while not st.session_state.game_timer.update_queue.empty(): | |
update_type, timestamp = st.session_state.game_timer.update_queue.get_nowait() | |
if update_type == 'update': | |
update_nearby_players() | |
save_player_state() | |
except queue.Empty: | |
pass | |
st.sidebar.title("Player Info") | |
if st.session_state.player_name is None: | |
default_name = generate_fantasy_name() | |
player_name = st.sidebar.text_input("Enter your name or use generated name:", value=default_name) | |
if st.sidebar.button("Start Playing"): | |
st.session_state.player_name = player_name | |
if st.session_state.character_stats is None: | |
st.session_state.character_stats = { | |
'STR': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'DEX': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'CON': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'INT': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'WIS': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'CHA': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'HP': random.randint(1, 20) * 2 + random.randint(1, 20), | |
'MAX_HP': 40, | |
'score': 0, | |
'created_at': time.time() | |
} | |
save_player_state() | |
st.rerun() | |
else: | |
st.sidebar.markdown("### Nearby Players") | |
for player in st.session_state.nearby_players: | |
st.sidebar.markdown( | |
f"**{player['name']}** - {player['distance']} tiles away - {player['score']} pts\n" | |
f"Last seen: {player['last_seen']:.1f}s ago" | |
) | |
st.sidebar.markdown("### Movement Controls") | |
move_cols = st.sidebar.columns(3) | |
if move_cols[1].button("⬆️", key="up"): | |
update_position("up") | |
st.rerun() | |
cols = st.sidebar.columns(3) | |
if cols[0].button("⬅️", key="left"): | |
update_position("left") | |
st.rerun() | |
if cols[1].button("⬇️", key="down"): | |
update_position("down") | |
st.rerun() | |
if cols[2].button("➡️", key="right"): | |
update_position("right") | |
st.rerun() | |
if st.sidebar.button("Clear Game State"): | |
st.session_state.game_timer.stop() | |
st.query_params.clear() | |
clear_caches() | |
st.rerun() | |
st.title("Multiplayer Tile Game") | |
create_game_board() | |
# Add this in your main() function, right after create_autorefresh() | |
# Add the time display component | |
st.markdown(""" | |
<div id="time-display-root"></div> | |
""", unsafe_allow_html=True) | |
# Mount the React component | |
html(""" | |
<div id="time-display-mount"></div> | |
<script> | |
const root = document.getElementById('time-display-root'); | |
const mount = document.getElementById('time-display-mount'); | |
if (root && mount) { | |
root.appendChild(mount); | |
} | |
</script> | |
""", height=200) | |
# 🧹 Cleanup on exit | |
def cleanup(): | |
if 'game_timer' in st.session_state: | |
st.session_state.game_timer.stop() | |
# Register cleanup | |
atexit.register(cleanup) | |
if __name__ == "__main__": | |
main() |