Spaces:
Running
Running
import streamlit as st | |
import os | |
import random | |
import time | |
from PIL import Image | |
import json | |
from datetime import datetime | |
from pathlib import Path | |
import asyncio | |
import threading | |
from typing import Optional | |
import queue | |
import atexit | |
# ๐ฎ Game Constants | |
GRID_WIDTH = 16 | |
GRID_HEIGHT = 9 | |
REFRESH_RATE = 5 | |
INTERACTION_RADIUS = 2 | |
POINTS_PER_INTERACTION = 1 | |
# ๐ Timer Management | |
class GameTimer: | |
def __init__(self): | |
self.running = True | |
self._loop: Optional[asyncio.AbstractEventLoop] = None | |
self.update_queue = queue.Queue() | |
self.thread = threading.Thread(target=self._run_timer, daemon=True) | |
self.thread.start() | |
def _run_timer(self): | |
self._loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self._loop) | |
self._loop.run_until_complete(self._timer_loop()) | |
async def _timer_loop(self): | |
while self.running: | |
try: | |
current_time = time.time() | |
self.update_queue.put(('update', current_time)) | |
await asyncio.sleep(REFRESH_RATE) | |
except Exception as e: | |
print(f"Timer error: {e}") | |
await asyncio.sleep(1) | |
def stop(self): | |
self.running = False | |
if self._loop: | |
self._loop.stop() | |
# ๐จ Resource Management: Load and cache images, assign randomly but consistently for each (x, y) | |
def get_game_images(): | |
image_folder = Path('game_images') # Directory containing the .png images | |
images = [] | |
for file in image_folder.glob('*.png'): | |
images.append(Image.open(file)) | |
return images | |
def initialize_map(): | |
# Path to the cached map configuration | |
map_path = Path('map.json') | |
# Load or initialize the map with consistent image selections for each grid position | |
if map_path.exists(): | |
with open(map_path, 'r') as f: | |
map_layout = json.load(f) | |
else: | |
# If the map doesn't exist, create a random mapping and save it | |
map_layout = {} | |
for y in range(GRID_HEIGHT): | |
for x in range(GRID_WIDTH): | |
map_layout[f"{x},{y}"] = random.randint(0, len(get_game_images()) - 1) | |
with open(map_path, 'w') as f: | |
json.dump(map_layout, f) | |
return map_layout | |
# Cache the map layout for consistent use across sessions | |
map_layout = initialize_map() | |
# First define the load_game_state function | |
def load_game_state(): | |
if 'player_name' in st.query_params: | |
st.session_state.player_name = st.query_params.player_name | |
st.session_state.position = { | |
'x': int(st.query_params.get('x', random.randint(0, GRID_WIDTH - 1))), | |
'y': int(st.query_params.get('y', random.randint(0, GRID_HEIGHT - 1))) | |
} | |
st.session_state.character_stats = { | |
'STR': int(st.query_params.get('STR', 10)), | |
'DEX': int(st.query_params.get('DEX', 10)), | |
'CON': int(st.query_params.get('CON', 10)), | |
'INT': int(st.query_params.get('INT', 10)), | |
'WIS': int(st.query_params.get('WIS', 10)), | |
'CHA': int(st.query_params.get('CHA', 10)), | |
'HP': int(st.query_params.get('HP', 20)), | |
'MAX_HP': int(st.query_params.get('MAX_HP', 40)), | |
'score': int(st.query_params.get('score', 0)), | |
'created_at': float(st.query_params.get('created_at', time.time())) | |
} | |
# Then initialize session state | |
if 'initialized' not in st.session_state: | |
st.session_state.initialized = False | |
st.session_state.game_state = { | |
'players': {}, | |
'chat_messages': [], | |
'last_sync': time.time() | |
} | |
st.session_state.player_name = None | |
st.session_state.character_stats = { | |
'STR': 10, | |
'DEX': 10, | |
'CON': 10, | |
'INT': 10, | |
'WIS': 10, | |
'CHA': 10, | |
'HP': 20, | |
'MAX_HP': 40, | |
'score': 0, | |
'created_at': time.time() | |
} | |
st.session_state.position = { | |
'x': random.randint(0, GRID_WIDTH - 1), | |
'y': random.randint(0, GRID_HEIGHT - 1) | |
} | |
st.session_state.last_move = time.time() | |
st.session_state.nearby_players = [] | |
# Now call load_game_state after it's been defined | |
load_game_state() | |
# ๐ฒ Game State Initialization | |
if 'initialized' not in st.session_state: | |
st.session_state.initialized = False | |
st.session_state.game_state = { | |
'players': {}, | |
'chat_messages': [], | |
'last_sync': time.time() | |
} | |
st.session_state.player_name = None | |
st.session_state.character_stats = None | |
st.session_state.position = { | |
'x': random.randint(0, GRID_WIDTH - 1), | |
'y': random.randint(0, GRID_HEIGHT - 1) | |
} | |
st.session_state.last_move = time.time() | |
st.session_state.nearby_players = [] | |
load_game_state() | |
# ๐ฎ Player State Management | |
def save_player_state(): | |
if st.session_state.player_name: | |
player_data = { | |
'name': st.session_state.player_name, | |
'position': st.session_state.position, | |
'stats': st.session_state.character_stats, | |
'last_update': time.time() | |
} | |
os.makedirs('players', exist_ok=True) | |
with open(f"players/{st.session_state.player_name}.json", 'w') as f: | |
json.dump(player_data, f) | |
# ๐ Distance Calculation | |
def calculate_distance(pos1, pos2): | |
dx = min(abs(pos1['x'] - pos2['x']), GRID_WIDTH - abs(pos1['x'] - pos2['x'])) | |
dy = min(abs(pos1['y'] - pos2['y']), GRID_HEIGHT - abs(pos1['y'] - pos2['y'])) | |
return dx + dy | |
# ๐โโ๏ธ Movement System | |
def update_position(direction): | |
if direction == "up": | |
st.session_state.position['y'] = (st.session_state.position['y'] - 1) % GRID_HEIGHT | |
elif direction == "down": | |
st.session_state.position['y'] = (st.session_state.position['y'] + 1) % GRID_HEIGHT | |
elif direction == "left": | |
st.session_state.position['x'] = (st.session_state.position['x'] - 1) % GRID_WIDTH | |
elif direction == "right": | |
st.session_state.position['x'] = (st.session_state.position['x'] + 1) % GRID_WIDTH | |
st.session_state.last_move = time.time() | |
save_player_state() | |
# Modify the create_game_board function to use the map_layout | |
def create_game_board(): | |
current_time = time.time() | |
all_players = load_all_players(current_time)['players'] | |
images = get_game_images() | |
for y in range(GRID_HEIGHT): | |
cols = st.columns(GRID_WIDTH) | |
for x in range(GRID_WIDTH): | |
player_here = None | |
for player_name, player_data in all_players.items(): | |
if player_data['position']['x'] == x and player_data['position']['y'] == y: | |
player_here = player_name | |
# Display the player or other players using the preloaded images | |
if x == st.session_state.position['x'] and y == st.session_state.position['y']: | |
cols[x].image(images[map_layout[f"{x},{y}"]], use_column_width=True) | |
score = st.session_state.character_stats.get('score', 0) | |
cols[x].markdown(f"**You** ({score} pts)") | |
elif player_here: | |
cols[x].image(images[map_layout[f"{x},{y}"]], use_column_width=True) | |
player_score = all_players[player_here]['stats'].get('score', 0) | |
cols[x].markdown(f"**{player_here}** ({player_score} pts)") | |
else: | |
cols[x].image(images[map_layout[f"{x},{y}"]], use_column_width=True) | |
# ๐ฎ Main Game Loop | |
def main(): | |
# Initialize timer in session state | |
if 'game_timer' not in st.session_state: | |
st.session_state.game_timer = GameTimer() | |
st.sidebar.title("Player Info") | |
if st.session_state.player_name is None: | |
default_name = generate_fantasy_name() | |
player_name = st.sidebar.text_input("Enter your name or use generated name:", value=default_name) | |
if st.sidebar.button("Start Playing"): | |
st.session_state.player_name = player_name | |
if st.session_state.character_stats is None: | |
st.session_state.character_stats = { | |
'STR': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'DEX': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'CON': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'INT': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'WIS': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'CHA': sum(sorted([random.randint(1, 6) for _ in range(4)])[1:]), | |
'HP': random.randint(1, 20) * 2 + random.randint(1, 20), | |
'MAX_HP': 40, | |
'score': 0, | |
'created_at': time.time() | |
} | |
save_player_state() | |
st.rerun() | |
else: | |
st.sidebar.markdown("### Nearby Players") | |
for player in st.session_state.nearby_players: | |
st.sidebar.markdown( | |
f"**{player['name']}** - {player['distance']} tiles away - {player['score']} pts\n" | |
f"Last seen: {player['last_seen']:.1f}s ago" | |
) | |
st.sidebar.markdown("### Movement Controls") | |
move_cols = st.sidebar.columns(3) | |
if move_cols[1].button("โฌ๏ธ", key="up"): | |
update_position("up") | |
st.rerun() | |
cols = st.sidebar.columns(3) | |
if cols[0].button("โฌ ๏ธ", key="left"): | |
update_position("left") | |
st.rerun() | |
if cols[1].button("โฌ๏ธ", key="down"): | |
update_position("down") | |
st.rerun() | |
if cols[2].button("โก๏ธ", key="right"): | |
update_position("right") | |
st.rerun() | |
st.title("Multiplayer Tile Game") | |
create_game_board() | |
# ๐งน Cleanup on exit | |
def cleanup(): | |
if 'game_timer' in st.session_state: | |
st.session_state.game_timer.stop() | |
# Register cleanup | |
atexit.register(cleanup) | |
if __name__ == "__main__": | |
main() | |