Spaces:
Sleeping
Sleeping
| # gamestate.py | |
| import threading | |
| import os | |
| import json | |
| import time | |
| class GameState: | |
| def __init__(self, state_file="global_state.json"): | |
| self.state_file = state_file | |
| self.lock = threading.Lock() | |
| # The state dictionary holds all game objects. | |
| # You could later add other keys (for example, player positions per session). | |
| self.state = {"objects": []} | |
| self.load_state() | |
| def load_state(self): | |
| """Load the global state from a JSON file if it exists.""" | |
| if os.path.exists(self.state_file): | |
| try: | |
| with open(self.state_file, "r", encoding="utf-8") as f: | |
| self.state = json.load(f) | |
| except Exception as e: | |
| print(f"Error loading global state: {e}") | |
| else: | |
| self.state = {"objects": []} | |
| def save_state(self): | |
| """Persist the global state to a JSON file.""" | |
| with self.lock: | |
| try: | |
| with open(self.state_file, "w", encoding="utf-8") as f: | |
| json.dump(self.state, f, indent=2) | |
| except Exception as e: | |
| print(f"Error saving global state: {e}") | |
| def get_state(self): | |
| """Return a copy of the current state.""" | |
| with self.lock: | |
| return self.state.copy() | |
| def update_state(self, new_objects): | |
| """ | |
| Merge a list of new object records into the state. | |
| Uniqueness is determined by the 'obj_id' field. | |
| """ | |
| with self.lock: | |
| # Create a dict of existing objects by obj_id. | |
| existing = {obj["obj_id"]: obj for obj in self.state.get("objects", []) if "obj_id" in obj} | |
| for obj in new_objects: | |
| # If no obj_id, generate one. | |
| if "obj_id" not in obj or not obj["obj_id"]: | |
| obj["obj_id"] = str(time.time()) + "_" + str(len(existing)) | |
| existing[obj["obj_id"]] = obj | |
| self.state["objects"] = list(existing.values()) | |
| self.state["last_updated"] = time.strftime("%Y-%m-%d %H:%M:%S") | |
| self.save_state() | |