|
""" |
|
Human Maze Solving Experiment |
|
----------------------------- |
|
This application creates a web interface for human participants to solve maze puzzles, |
|
mirroring the experiments conducted with AI systems. The program: |
|
|
|
1. Displays mazes from the same dataset used in AI experiments |
|
2. Allows participants to navigate through mazes of various sizes (5x5, 7x7) and shapes |
|
(square, cross, spiral, triangle, C, Z) |
|
3. Records performance data including: |
|
- Time taken to solve each maze |
|
- Complete movement history |
|
- Success/failure rate |
|
4. Tests shape recognition by asking participants to identify maze shapes after solving |
|
5. Saves results in a structured format matching the AI experiment results for direct comparison |
|
|
|
Usage: Run with 'python human_test.py' to start the interface, which can be shared |
|
with participants through the generated public URL. |
|
""" |
|
|
|
import os |
|
import time |
|
import numpy as np |
|
import gradio as gr |
|
from pathlib import Path |
|
import random |
|
import json |
|
from PIL import Image, ImageDraw, ImageFont |
|
import re |
|
from maze_generator import * |
|
from solution_verifier import get_valid_moves |
|
from collections import defaultdict |
|
import datetime |
|
|
|
|
|
WALL = 0 |
|
PATH = 1 |
|
POS = 2 |
|
END = 3 |
|
|
|
GEN_PATH = 1 |
|
|
|
|
|
SIZES = [(5, 5), (7, 7)] |
|
SHAPES = ["square", "cross", "spiral", "triangle", "C", "Z"] |
|
MAZES_PER_COMBINATION = 10 |
|
|
|
def save_numpy_array_as_image(array: np.ndarray, cell_size: int = 0, target_size: int = 500, |
|
font_scale: float = 0.4, is_generation: bool = False) -> Image.Image: |
|
""" |
|
Converts a 2D NumPy array to a PIL Image with: |
|
- Distinct colors for each cell value with optimal contrast. |
|
- Thick borders around each cell. |
|
- Text showing each cell's coordinates in a dynamically sized font. |
|
- Final image sized to approximately target_size (default 500px). |
|
|
|
Args: |
|
array: 2D NumPy array representing the maze |
|
cell_size: Size of each cell in pixels (default: 0, which means calculate based on target_size) |
|
target_size: Target size for the full image (default: 500px) |
|
font_scale: Scale factor for the font size as a proportion of cell size (default: 0.4) |
|
is_generation: If True, use a different color for PATH to make it more visible in generation mode |
|
|
|
Returns: |
|
PIL Image object |
|
""" |
|
|
|
if is_generation: |
|
|
|
color_map = { |
|
0: (80, 80, 80), |
|
1: (200, 200, 200), |
|
2: (0, 102, 204), |
|
3: (204, 0, 0), |
|
} |
|
else: |
|
|
|
color_map = { |
|
0: (80, 80, 80), |
|
1: (255, 255, 255), |
|
2: (0, 102, 204), |
|
3: (204, 0, 0), |
|
} |
|
|
|
border_color = (0, 0, 0) |
|
border_width = 2 |
|
|
|
height, width = array.shape |
|
|
|
|
|
if cell_size <= 0: |
|
|
|
cell_size = min(target_size // width, target_size // height) |
|
|
|
|
|
img_width = width * cell_size |
|
img_height = height * cell_size |
|
image = Image.new("RGB", (img_width, img_height), (255, 255, 255)) |
|
draw = ImageDraw.Draw(image) |
|
|
|
|
|
font_size = max(10, int(cell_size * font_scale)) |
|
|
|
|
|
try: |
|
|
|
fonts_to_try = ["arial.ttf", "Arial.ttf", "DejaVuSans.ttf", "FreeSans.ttf", |
|
"LiberationSans-Regular.ttf", "Verdana.ttf"] |
|
font = None |
|
|
|
for font_name in fonts_to_try: |
|
try: |
|
font = ImageFont.truetype(font_name, font_size) |
|
break |
|
except IOError: |
|
continue |
|
|
|
if font is None: |
|
|
|
system_font_dirs = [ |
|
"/usr/share/fonts", |
|
"/Library/Fonts", |
|
"C:/Windows/Fonts" |
|
] |
|
|
|
for font_dir in system_font_dirs: |
|
if os.path.exists(font_dir): |
|
font_files = [f for f in os.listdir(font_dir) |
|
if f.endswith(('.ttf', '.otf')) and 'bold' not in f.lower()] |
|
if font_files: |
|
try: |
|
font = ImageFont.truetype(os.path.join(font_dir, font_files[0]), font_size) |
|
break |
|
except: |
|
pass |
|
except Exception: |
|
pass |
|
|
|
|
|
if font is None: |
|
font = ImageFont.load_default() |
|
|
|
|
|
def get_text_dimensions(text, font): |
|
try: |
|
|
|
return font.getbbox(text)[2:4] |
|
except AttributeError: |
|
try: |
|
|
|
return font.getsize(text) |
|
except AttributeError: |
|
|
|
return font.getmask(text).size |
|
|
|
|
|
for row in range(height): |
|
for col in range(width): |
|
x1 = col * cell_size |
|
y1 = row * cell_size |
|
x2 = x1 + cell_size |
|
y2 = y1 + cell_size |
|
|
|
cell_value = array[row, col] |
|
fill_color = color_map.get(cell_value, (255, 255, 255)) |
|
|
|
|
|
draw.rectangle( |
|
[x1, y1, x2, y2], |
|
fill=fill_color, |
|
outline=border_color, |
|
width=border_width |
|
) |
|
|
|
|
|
text = f"({row},{col})" |
|
text_width, text_height = get_text_dimensions(text, font) |
|
text_x = x1 + (cell_size - text_width) // 2 |
|
text_y = y1 + (cell_size - text_height) // 2 |
|
|
|
|
|
|
|
luminance = 0.299 * fill_color[0] + 0.587 * fill_color[1] + 0.114 * fill_color[2] |
|
text_color = (0, 0, 0) if luminance > 128 else (255, 255, 255) |
|
|
|
draw.text((text_x, text_y), text, fill=text_color, font=font) |
|
|
|
return image |
|
|
|
def load_npy_files(folder_path): |
|
"""Load .npy maze files from the specified folder.""" |
|
all_file_data = [] |
|
idx = 0 |
|
|
|
for root, dirs, files in os.walk(folder_path): |
|
for filename in files: |
|
|
|
if filename.endswith(".npy"): |
|
file_path = os.path.join(root, filename) |
|
|
|
try: |
|
array_data = np.load(file_path) |
|
file_data = { |
|
'name': filename, |
|
'id': idx, |
|
'path': os.path.relpath(file_path, folder_path), |
|
'data': array_data |
|
} |
|
idx += 1 |
|
all_file_data.append(file_data) |
|
except Exception as e: |
|
print(f"Could not load {filename}: {e}") |
|
return all_file_data |
|
|
|
class MazeExperiment: |
|
"""Class to manage the maze experiment.""" |
|
|
|
def __init__(self, results_dir="results", maze_dir="experiment_mazes"): |
|
"""Initialize the maze experiment. |
|
|
|
Args: |
|
results_dir: Directory to save experiment results |
|
maze_dir: Directory containing maze files |
|
""" |
|
self.results_dir = results_dir |
|
self.maze_dir = maze_dir |
|
self.current_maze = None |
|
self.current_file_info = None |
|
self.current_size = None |
|
self.current_shape = None |
|
self.start_time = None |
|
self.maze_complete = False |
|
self.experiment_complete = False |
|
self.moves = [] |
|
self.current_phase = "solve" |
|
|
|
|
|
os.makedirs(results_dir, exist_ok=True) |
|
|
|
|
|
self.participant_id = f"p{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}" |
|
|
|
|
|
self.combination_counts = defaultdict(int) |
|
|
|
|
|
self.completed_combinations = [] |
|
|
|
|
|
self.mazes_per_combination = MAZES_PER_COMBINATION |
|
self.total_mazes = len(SIZES) * len(SHAPES) * self.mazes_per_combination |
|
|
|
|
|
self.load_next_combination() |
|
|
|
def load_random_maze(self, size, shape): |
|
"""Load a random maze of the specified size and shape. |
|
|
|
Args: |
|
size: Tuple of (height, width) |
|
shape: String representing the maze shape |
|
|
|
Returns: |
|
Tuple of (maze_image, message, phase, progress) |
|
""" |
|
|
|
self.current_size = size |
|
self.current_shape = shape |
|
|
|
|
|
self.maze_complete = False |
|
self.moves = [] |
|
self.start_time = time.time() |
|
|
|
try: |
|
|
|
size_str = f"{size[0]}x{size[1]}" |
|
|
|
|
|
shape_dir = os.path.join(self.maze_dir, size_str, shape) |
|
if not os.path.exists(shape_dir): |
|
return None, f"Directory not found: {shape_dir}", "error", self.get_progress() |
|
|
|
|
|
maze_files = [] |
|
for file in os.listdir(shape_dir): |
|
if file.endswith(".npy"): |
|
maze_files.append(file) |
|
|
|
if not maze_files: |
|
return None, f"No maze files found in {shape_dir}", "error", self.get_progress() |
|
|
|
|
|
maze_file = random.choice(maze_files) |
|
maze_path = os.path.join(shape_dir, maze_file) |
|
|
|
|
|
self.current_maze = np.load(maze_path) |
|
|
|
|
|
self.current_file_info = { |
|
"file": maze_file, |
|
"size": size_str, |
|
"shape": shape |
|
} |
|
|
|
|
|
completed_mazes = self.combination_counts[(size_str, shape)] |
|
total_combinations = len(SIZES) * len(SHAPES) |
|
completed_combinations = len(self.completed_combinations) |
|
|
|
|
|
progress_msg = f"Maze {completed_mazes + 1}/{MAZES_PER_COMBINATION} for {size_str} {shape} " \ |
|
f"(Combination {completed_combinations + 1}/{total_combinations})" |
|
|
|
|
|
return self.render_maze(), progress_msg, "solve", self.get_progress() |
|
except Exception as e: |
|
return None, f"Error loading maze: {str(e)}", "error", self.get_progress() |
|
|
|
def load_next_combination(self): |
|
"""Load the next maze combination in the experiment sequence. |
|
|
|
If all combinations have been completed for all mazes, mark the experiment as complete. |
|
|
|
Returns: |
|
Tuple of (maze_image, message, phase, progress) |
|
""" |
|
|
|
available_combinations = [] |
|
|
|
for size in SIZES: |
|
size_str = f"{size[0]}x{size[1]}" |
|
for shape in SHAPES: |
|
combo_key = (size_str, shape) |
|
if self.combination_counts[combo_key] < MAZES_PER_COMBINATION: |
|
if combo_key not in self.completed_combinations: |
|
available_combinations.append((size, shape)) |
|
|
|
if not available_combinations: |
|
self.experiment_complete = True |
|
return None, "Experiment complete! Thank you for participating.", "complete", self.get_progress() |
|
|
|
|
|
next_combination = available_combinations[0] |
|
|
|
|
|
return self.load_random_maze(*next_combination) |
|
|
|
def process_move(self, direction): |
|
"""Process a move in the maze. |
|
|
|
Args: |
|
direction (str): The direction to move in ('up', 'down', 'left', 'right'). |
|
|
|
Returns: |
|
Tuple of (maze_image, message, phase, progress) |
|
""" |
|
if self.current_maze is None: |
|
return self.load_random_maze(self.current_size, self.current_shape) |
|
|
|
|
|
i, j = np.where(self.current_maze == 2) |
|
if len(i) == 0: |
|
return self.render_maze(), "Invalid maze state. No current position found.", self.current_phase, self.get_progress() |
|
i, j = i[0], j[0] |
|
|
|
|
|
new_i, new_j = i, j |
|
if direction == 'up': |
|
new_i -= 1 |
|
elif direction == 'down': |
|
new_i += 1 |
|
elif direction == 'left': |
|
new_j -= 1 |
|
elif direction == 'right': |
|
new_j += 1 |
|
|
|
|
|
if new_i < 0 or new_i >= self.current_maze.shape[0] or new_j < 0 or new_j >= self.current_maze.shape[1]: |
|
return self.render_maze(), "Can't move outside the maze!", self.current_phase, self.get_progress() |
|
|
|
|
|
if self.current_maze[new_i, new_j] == WALL: |
|
|
|
current_time = time.time() |
|
elapsed_time = current_time - self.start_time |
|
|
|
|
|
self.save_results(elapsed_time, failed=True) |
|
|
|
|
|
self.combination_counts[(f"{self.current_size[0]}x{self.current_size[1]}", self.current_shape)] += 1 |
|
|
|
|
|
if self.combination_counts[(f"{self.current_size[0]}x{self.current_size[1]}", self.current_shape)] >= MAZES_PER_COMBINATION: |
|
self.completed_combinations.append((f"{self.current_size[0]}x{self.current_size[1]}", self.current_shape)) |
|
|
|
|
|
return self.process_complete_maze() |
|
|
|
|
|
if self.current_maze[new_i, new_j] == END: |
|
|
|
self.current_maze[i, j] = PATH |
|
self.current_maze[new_i, new_j] = POS |
|
|
|
|
|
current_time = time.time() |
|
elapsed_time = current_time - self.start_time |
|
|
|
|
|
self.save_results(elapsed_time) |
|
|
|
|
|
self.combination_counts[(f"{self.current_size[0]}x{self.current_size[1]}", self.current_shape)] += 1 |
|
|
|
|
|
if self.combination_counts[(f"{self.current_size[0]}x{self.current_size[1]}", self.current_shape)] >= MAZES_PER_COMBINATION: |
|
self.completed_combinations.append((f"{self.current_size[0]}x{self.current_size[1]}", self.current_shape)) |
|
|
|
|
|
self.maze_complete = True |
|
self.current_phase = "recognize" |
|
return self.render_maze(), "Maze complete! What shape do you think this maze represents?", "recognize", self.get_progress() |
|
|
|
|
|
self.current_maze[i, j] = PATH |
|
self.current_maze[new_i, new_j] = POS |
|
|
|
|
|
self.moves.append(direction) |
|
|
|
return self.render_maze(), f"Moved {direction}. Keep going!", self.current_phase, self.get_progress() |
|
|
|
def process_complete_maze(self): |
|
"""Process a completed maze and load the next one. |
|
|
|
Returns: |
|
Tuple of (maze_image, message, phase, progress) |
|
""" |
|
|
|
last_maze_failed = False |
|
|
|
|
|
result_files = [] |
|
for root, _, files in os.walk(self.results_dir): |
|
for file in files: |
|
if file.startswith(self.participant_id) and file.endswith(".json"): |
|
file_path = os.path.join(root, file) |
|
result_files.append(file_path) |
|
|
|
if result_files: |
|
try: |
|
file_path = sorted(result_files, key=os.path.getmtime)[-1] |
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
last_maze_failed = data.get("failed", False) |
|
except Exception as e: |
|
print(f"Error checking failure status: {e}") |
|
|
|
|
|
img, msg, phase, progress = self.load_next_combination() |
|
|
|
|
|
if last_maze_failed: |
|
msg = f"Maze failed (wall collision). {msg}" |
|
|
|
return img, msg, phase, progress |
|
|
|
def is_experiment_complete(self): |
|
"""Check if all mazes in the experiment have been completed.""" |
|
for size in SIZES: |
|
size_str = f"{size[0]}x{size[1]}" |
|
for shape in SHAPES: |
|
combo_key = (size_str, shape) |
|
if self.combination_counts[combo_key] < MAZES_PER_COMBINATION: |
|
return False |
|
return True |
|
|
|
def load_next_maze(self): |
|
"""Load the next maze based on current progress.""" |
|
|
|
next_combination = None |
|
for size in SIZES: |
|
size_str = f"{size[0]}x{size[1]}" |
|
for shape in SHAPES: |
|
if self.combination_counts[(size_str, shape)] < MAZES_PER_COMBINATION: |
|
next_combination = (size, shape) |
|
break |
|
if next_combination: |
|
break |
|
|
|
if next_combination: |
|
size, shape = next_combination |
|
self.load_random_maze(size, shape) |
|
else: |
|
|
|
self.current_maze = None |
|
|
|
def save_results(self, elapsed_time, failed=False): |
|
"""Save the results to a file. |
|
|
|
Args: |
|
elapsed_time: Time elapsed during maze solving |
|
failed: Boolean indicating if the maze was failed |
|
""" |
|
if not self.current_file_info: |
|
return |
|
|
|
|
|
timestamp = int(time.time()) |
|
filename = f"{self.results_dir}/{self.participant_id}_{self.current_file_info['size']}_{self.current_file_info['shape']}_{timestamp}.json" |
|
|
|
|
|
data = { |
|
"participant_id": self.participant_id, |
|
"maze_file": self.current_file_info['file'], |
|
"maze_type": { |
|
"size": self.current_file_info['size'], |
|
"shape": self.current_file_info['shape'] |
|
}, |
|
"moves": self.moves, |
|
"total_moves": len(self.moves), |
|
"completion_time": elapsed_time, |
|
"timestamp": timestamp, |
|
"maze_complete": self.maze_complete, |
|
"failed": failed |
|
} |
|
|
|
|
|
with open(filename, 'w') as f: |
|
json.dump(data, f, indent=2) |
|
|
|
print(f"Results saved to {filename}") |
|
|
|
def skip_maze(self): |
|
"""Skip the current maze and save as not completed.""" |
|
if self.current_maze is not None: |
|
self.save_results(0) |
|
self.maze_complete = True |
|
return save_numpy_array_as_image(self.current_maze), "Maze skipped. Load a new maze.", "solve", self.get_progress() |
|
return None, "No maze loaded to skip.", "solve", self.get_progress() |
|
|
|
def ask_shape_recognition(self): |
|
"""Ask the participant to recognize the shape of the maze.""" |
|
if self.maze_complete and self.current_maze is not None: |
|
return "What shape do you think this maze was designed to look like? (square, cross, spiral, triangle, C, Z)" |
|
return "" |
|
|
|
def submit_shape_recognition(self, recognized_shape): |
|
"""Submit the participant's shape recognition answer.""" |
|
if not recognized_shape or not self.maze_complete: |
|
return save_numpy_array_as_image(self.current_maze), "Please solve the maze first and enter a shape before submitting.", "recognize", self.get_progress() |
|
|
|
|
|
recognized_shape = recognized_shape.strip().lower() |
|
valid_shapes = [s.lower() for s in SHAPES] |
|
|
|
if recognized_shape not in valid_shapes: |
|
return save_numpy_array_as_image(self.current_maze), f"Invalid shape. Please enter one of: {', '.join(SHAPES)}", "recognize", self.get_progress() |
|
|
|
try: |
|
|
|
result_files = [] |
|
for root, _, files in os.walk(self.results_dir): |
|
for file in files: |
|
if file.startswith(self.participant_id) and file.endswith(".json"): |
|
file_path = os.path.join(root, file) |
|
try: |
|
|
|
with open(file_path, 'r') as f: |
|
json.load(f) |
|
result_files.append(file_path) |
|
except json.JSONDecodeError: |
|
print(f"Skipping invalid JSON file: {file_path}") |
|
continue |
|
|
|
|
|
if result_files: |
|
file_path = sorted(result_files, key=os.path.getmtime)[-1] |
|
|
|
try: |
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
|
|
|
|
data["recognized_shape"] = recognized_shape |
|
data["recognition_correct"] = (recognized_shape.lower() == self.current_shape.lower()) |
|
|
|
with open(file_path, 'w') as f: |
|
json.dump(data, f, indent=2) |
|
|
|
|
|
self.current_phase = "generate" |
|
self.create_generation_grid() |
|
|
|
|
|
generation_img = self.get_generation_grid_image() |
|
|
|
|
|
if data["recognition_correct"]: |
|
feedback = f"Correct! This is a {self.current_shape} maze." |
|
else: |
|
feedback = f"Not quite. This is a {self.current_shape} maze." |
|
|
|
message = f"{feedback} Now draw a maze of shape '{self.current_shape}' using the movement buttons." |
|
return generation_img, message, "generate", self.get_progress() |
|
except Exception as e: |
|
print(f"Error processing JSON file {file_path}: {e}") |
|
|
|
self.save_results(0) |
|
|
|
return self.submit_shape_recognition(recognized_shape) |
|
else: |
|
|
|
self.save_results(0) |
|
|
|
self.current_phase = "generate" |
|
self.create_generation_grid() |
|
generation_img = self.get_generation_grid_image() |
|
return generation_img, f"Now draw a maze of shape '{self.current_shape}' using the movement buttons.", "generate", self.get_progress() |
|
except Exception as e: |
|
print(f"Error in shape recognition: {e}") |
|
return save_numpy_array_as_image(self.current_maze), "An error occurred. Please try again.", "recognize", self.get_progress() |
|
|
|
def create_generation_grid(self): |
|
"""Create a blank grid for maze generation.""" |
|
if self.current_size is None: |
|
return None |
|
|
|
|
|
height, width = self.current_maze.shape |
|
grid = np.zeros((height, width), dtype=np.int8) |
|
|
|
|
|
center_row = height // 2 |
|
center_col = width // 2 |
|
grid[center_row, center_col] = POS |
|
|
|
self.generation_grid = grid |
|
self.generation_trail = [(center_row, center_col)] |
|
|
|
return save_numpy_array_as_image(self.generation_grid, is_generation=True) |
|
|
|
def move_in_generation(self, direction): |
|
"""Move in the specified direction on the generation grid.""" |
|
if not hasattr(self, 'generation_grid') or self.generation_grid is None: |
|
|
|
img = self.create_generation_grid() |
|
return img, "Generation grid created. Draw a path in the shape of a " + self.current_shape, "generate", self.get_progress() |
|
|
|
|
|
player_pos = np.argwhere(self.generation_grid == POS)[0] |
|
player_pos = (int(player_pos[0]), int(player_pos[1])) |
|
|
|
|
|
new_pos = None |
|
height, width = self.generation_grid.shape |
|
|
|
if direction == "up" and player_pos[0] > 0: |
|
new_pos = (player_pos[0] - 1, player_pos[1]) |
|
elif direction == "down" and player_pos[0] < height - 1: |
|
new_pos = (player_pos[0] + 1, player_pos[1]) |
|
elif direction == "left" and player_pos[1] > 0: |
|
new_pos = (player_pos[0], player_pos[1] - 1) |
|
elif direction == "right" and player_pos[1] < width - 1: |
|
new_pos = (player_pos[0], player_pos[1] + 1) |
|
|
|
elif direction == "up-left" and player_pos[0] > 0 and player_pos[1] > 0: |
|
new_pos = (player_pos[0] - 1, player_pos[1] - 1) |
|
elif direction == "up-right" and player_pos[0] > 0 and player_pos[1] < width - 1: |
|
new_pos = (player_pos[0] - 1, player_pos[1] + 1) |
|
elif direction == "down-left" and player_pos[0] < height - 1 and player_pos[1] > 0: |
|
new_pos = (player_pos[0] + 1, player_pos[1] - 1) |
|
elif direction == "down-right" and player_pos[0] < height - 1 and player_pos[1] < width - 1: |
|
new_pos = (player_pos[0] + 1, player_pos[1] + 1) |
|
|
|
if new_pos: |
|
|
|
if self.generation_grid[new_pos[0], new_pos[1]] == PATH: |
|
return save_numpy_array_as_image(self.generation_grid, is_generation=True), "You've already drawn here. Try a different direction." |
|
|
|
|
|
|
|
self.generation_grid[player_pos[0], player_pos[1]] = PATH |
|
|
|
self.generation_grid[new_pos[0], new_pos[1]] = POS |
|
|
|
|
|
if new_pos not in self.generation_trail: |
|
self.generation_trail.append(new_pos) |
|
|
|
return save_numpy_array_as_image(self.generation_grid, is_generation=True), f"Drawing {self.current_shape} shape. Use movement buttons to draw the path." |
|
|
|
return save_numpy_array_as_image(self.generation_grid, is_generation=True), "Cannot move in that direction." |
|
|
|
def reset_generation(self): |
|
"""Reset the generation grid.""" |
|
grid = self.create_generation_grid() |
|
return grid, "Generation grid reset. Start drawing the shape again." |
|
|
|
def validate_generation(self): |
|
"""Validate the user's generated maze shape against the current shape. |
|
|
|
Returns: |
|
Dict with validation results |
|
""" |
|
if not hasattr(self, 'generation_grid') or self.generation_grid is None: |
|
return { |
|
'valid': False, |
|
'error': 'No maze has been generated' |
|
} |
|
|
|
|
|
path_cells = [] |
|
for i in range(len(self.generation_grid)): |
|
for j in range(len(self.generation_grid[i])): |
|
if self.generation_grid[i][j] == PATH or self.generation_grid[i][j] == POS: |
|
path_cells.append((i, j)) |
|
|
|
|
|
total_cells = self.generation_grid.shape[0] * self.generation_grid.shape[1] |
|
path_ratio = len(path_cells) / total_cells |
|
|
|
|
|
|
|
valid = True |
|
feedback = "Shape accepted. Great work!" |
|
|
|
|
|
if len(path_cells) < 5: |
|
valid = False |
|
feedback = "Your drawing is too small. Please create a more complex shape." |
|
|
|
return { |
|
'valid': valid, |
|
'shape': self.current_shape, |
|
'generated_shape': path_cells, |
|
'feedback': feedback, |
|
'path_ratio': path_ratio, |
|
} |
|
|
|
def submit_generation_drawing(self): |
|
"""Process the generated maze and move to the next.""" |
|
result = self.validate_generation() |
|
|
|
|
|
self.save_generation_results(result) |
|
|
|
|
|
feedback = result['feedback'] |
|
|
|
if not result['valid']: |
|
|
|
return save_numpy_array_as_image(self.generation_grid, is_generation=True), feedback, "generate", self.get_progress() |
|
|
|
|
|
self.complete_current_maze() |
|
|
|
|
|
has_next = self.select_next_combination() |
|
|
|
if not has_next: |
|
|
|
return save_numpy_array_as_image(np.zeros((5, 5))), "Congratulations! You've completed all mazes in the experiment. Thank you for participating!", "complete", self.get_progress() |
|
|
|
|
|
img, msg, phase, progress = self.load_random_maze(self.current_size, self.current_shape) |
|
progress_msg = f"Shape accepted! {feedback} Loading next maze..." |
|
return img, progress_msg, "solve", progress |
|
|
|
def get_progress(self): |
|
"""Get the current progress information for the experiment. |
|
|
|
Returns: |
|
A string with progress information. |
|
""" |
|
total_completed = sum(self.combination_counts.values()) |
|
total_mazes = self.total_mazes |
|
|
|
completed_combinations = len(self.completed_combinations) |
|
total_combinations = len(SIZES) * len(SHAPES) |
|
|
|
current_size = "None" if self.current_size is None else f"{self.current_size[0]}x{self.current_size[1]}" |
|
current_shape = "None" if self.current_shape is None else self.current_shape |
|
|
|
if self.current_file_info: |
|
combo_key = (self.current_file_info["size"], self.current_file_info["shape"]) |
|
completed_in_combo = self.combination_counts[combo_key] |
|
else: |
|
completed_in_combo = 0 |
|
|
|
return f"Progress: {total_completed}/{total_mazes} mazes completed ({completed_combinations}/{total_combinations} combinations)" |
|
|
|
def select_next_combination(self): |
|
"""Select the next size/shape combination that needs mazes.""" |
|
|
|
if self.current_size and self.current_shape: |
|
key = f"{self.current_size[0]}x{self.current_size[1]}_{self.current_shape}" |
|
if self.combination_counts.get(key, 0) >= self.mazes_per_combination: |
|
|
|
for size in SIZES: |
|
for shape in SHAPES: |
|
check_key = f"{size[0]}x{size[1]}_{shape}" |
|
if self.combination_counts.get(check_key, 0) < self.mazes_per_combination: |
|
self.current_size = size |
|
self.current_shape = shape |
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
return True |
|
|
|
def complete_current_maze(self): |
|
"""Mark the current maze as completed.""" |
|
if self.current_size and self.current_shape: |
|
key = f"{self.current_size[0]}x{self.current_size[1]}_{self.current_shape}" |
|
self.combination_counts[key] = self.combination_counts.get(key, 0) + 1 |
|
|
|
def initial_load(self, size, shape): |
|
"""Initial load of a maze with the specified size and shape. |
|
|
|
Args: |
|
size: Tuple of (height, width) |
|
shape: String representing the maze shape |
|
|
|
Returns: |
|
Tuple of (maze_image, message, phase, progress) |
|
""" |
|
|
|
self.maze_complete = False |
|
self.moves = [] |
|
|
|
|
|
return self.load_random_maze(size, shape) |
|
|
|
def render_maze(self): |
|
"""Render the current maze as an image. |
|
|
|
Returns: |
|
PIL Image of the current maze |
|
""" |
|
if self.current_maze is None: |
|
|
|
blank_maze = np.ones((5, 5), dtype=np.int8) |
|
return save_numpy_array_as_image(blank_maze) |
|
|
|
return save_numpy_array_as_image(self.current_maze) |
|
|
|
def save_generation_results(self, result): |
|
"""Save the generation results to a file. |
|
|
|
Args: |
|
result: Dict with generation validation results |
|
""" |
|
if not self.current_file_info: |
|
return |
|
|
|
|
|
result_files = [] |
|
for root, _, files in os.walk(self.results_dir): |
|
for file in files: |
|
if file.startswith(self.participant_id) and file.endswith(".json"): |
|
file_path = os.path.join(root, file) |
|
result_files.append(file_path) |
|
|
|
|
|
if result_files: |
|
file_path = sorted(result_files, key=os.path.getmtime)[-1] |
|
|
|
try: |
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
|
|
|
|
data["generation_result"] = result |
|
|
|
with open(file_path, 'w') as f: |
|
json.dump(data, f, indent=2) |
|
|
|
print(f"Generation results saved to {file_path}") |
|
|
|
except Exception as e: |
|
print(f"Error saving generation results: {e}") |
|
else: |
|
timestamp = int(time.time()) |
|
filename = f"{self.results_dir}/{self.participant_id}_{self.current_file_info['size']}_{self.current_shape}_generation_{timestamp}.json" |
|
|
|
|
|
data = { |
|
"participant_id": self.participant_id, |
|
"maze_file": self.current_file_info['file'], |
|
"maze_type": { |
|
"size": self.current_file_info['size'], |
|
"shape": self.current_shape |
|
}, |
|
"generation_result": result, |
|
"timestamp": timestamp |
|
} |
|
|
|
|
|
with open(filename, 'w') as f: |
|
json.dump(data, f, indent=2) |
|
|
|
print(f"Generation results saved to {filename}") |
|
|
|
def get_generation_grid_image(self): |
|
"""Get an image of the current generation grid. |
|
|
|
Returns: |
|
Image: PIL Image of the generation grid |
|
""" |
|
if not hasattr(self, 'generation_grid') or self.generation_grid is None: |
|
|
|
return self.create_generation_grid() |
|
|
|
return save_numpy_array_as_image(self.generation_grid, is_generation=True) |
|
|
|
def create_interface(experiment): |
|
""" |
|
Create the Gradio interface for the maze experiment. |
|
|
|
Args: |
|
experiment: MazeExperiment instance. |
|
|
|
Returns: |
|
gr.Interface: The Gradio interface for the experiment. |
|
""" |
|
|
|
with gr.Blocks() as interface: |
|
with gr.Row(): |
|
maze_display = gr.Image(label="Maze") |
|
|
|
with gr.Row(): |
|
message = gr.Textbox(label="Message", value="Loading maze...") |
|
|
|
with gr.Row(): |
|
phase_info = gr.Textbox(label="Current Phase", value="solve", visible=False) |
|
progress_info = gr.Textbox(label="Progress", value="Progress: 0/120 mazes completed (0/12 combinations)") |
|
|
|
|
|
with gr.Row(visible=True) as movement_controls: |
|
up_button = gr.Button("Up") |
|
down_button = gr.Button("Down") |
|
left_button = gr.Button("Left") |
|
right_button = gr.Button("Right") |
|
|
|
|
|
with gr.Row(visible=True) as diagonal_controls: |
|
up_left_button = gr.Button("↖ Up-Left") |
|
up_right_button = gr.Button("↗ Up-Right") |
|
down_left_button = gr.Button("↙ Down-Left") |
|
down_right_button = gr.Button("↘ Down-Right") |
|
|
|
|
|
with gr.Row(visible=False) as recognition_controls: |
|
shape_input = gr.Dropdown( |
|
choices=SHAPES, |
|
label="Recognize Shape", |
|
info="What shape does this maze represent?" |
|
) |
|
submit_recognition_button = gr.Button("Submit Recognition") |
|
|
|
|
|
with gr.Row(visible=False) as generation_controls: |
|
reset_gen_button = gr.Button("Reset") |
|
submit_gen_button = gr.Button("Submit Generated Shape") |
|
|
|
|
|
def handle_move(direction, phase): |
|
if phase == "solve": |
|
return experiment.process_move(direction) |
|
elif phase == "generate": |
|
img, msg = experiment.move_in_generation(direction) |
|
return img, msg, phase, progress_info.value |
|
else: |
|
|
|
return maze_display.value, message.value, phase, progress_info.value |
|
|
|
|
|
up_button.click( |
|
fn=handle_move, |
|
inputs=[gr.Textbox(value="up", visible=False), phase_info], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
down_button.click( |
|
fn=handle_move, |
|
inputs=[gr.Textbox(value="down", visible=False), phase_info], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
left_button.click( |
|
fn=handle_move, |
|
inputs=[gr.Textbox(value="left", visible=False), phase_info], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
right_button.click( |
|
fn=handle_move, |
|
inputs=[gr.Textbox(value="right", visible=False), phase_info], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
|
|
|
|
up_left_button.click( |
|
fn=handle_move, |
|
inputs=[gr.Textbox(value="up-left", visible=False), phase_info], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
up_right_button.click( |
|
fn=handle_move, |
|
inputs=[gr.Textbox(value="up-right", visible=False), phase_info], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
down_left_button.click( |
|
fn=handle_move, |
|
inputs=[gr.Textbox(value="down-left", visible=False), phase_info], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
down_right_button.click( |
|
fn=handle_move, |
|
inputs=[gr.Textbox(value="down-right", visible=False), phase_info], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
|
|
|
|
submit_recognition_button.click( |
|
fn=experiment.submit_shape_recognition, |
|
inputs=shape_input, |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
|
|
|
|
def handle_reset_generation(): |
|
img, msg = experiment.reset_generation() |
|
return img, msg, "generate", progress_info.value |
|
|
|
reset_gen_button.click( |
|
fn=handle_reset_generation, |
|
inputs=[], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
|
|
submit_gen_button.click( |
|
fn=experiment.submit_generation_drawing, |
|
inputs=[], |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
|
|
|
|
def handle_phase_change(phase): |
|
if phase == "solve": |
|
return { |
|
movement_controls: gr.update(visible=True), |
|
diagonal_controls: gr.update(visible=True), |
|
recognition_controls: gr.update(visible=False), |
|
generation_controls: gr.update(visible=False) |
|
} |
|
elif phase == "recognize": |
|
return { |
|
movement_controls: gr.update(visible=False), |
|
diagonal_controls: gr.update(visible=False), |
|
recognition_controls: gr.update(visible=True), |
|
generation_controls: gr.update(visible=False) |
|
} |
|
elif phase == "generate": |
|
return { |
|
movement_controls: gr.update(visible=True), |
|
diagonal_controls: gr.update(visible=True), |
|
recognition_controls: gr.update(visible=False), |
|
generation_controls: gr.update(visible=True) |
|
} |
|
else: |
|
return { |
|
movement_controls: gr.update(visible=False), |
|
diagonal_controls: gr.update(visible=False), |
|
recognition_controls: gr.update(visible=False), |
|
generation_controls: gr.update(visible=False) |
|
} |
|
|
|
phase_info.change( |
|
fn=handle_phase_change, |
|
inputs=phase_info, |
|
outputs=[movement_controls, diagonal_controls, recognition_controls, generation_controls] |
|
) |
|
|
|
|
|
def auto_start(): |
|
|
|
size = SIZES[0] |
|
shape = SHAPES[0] |
|
return experiment.initial_load(size, shape) |
|
|
|
|
|
interface.load( |
|
fn=auto_start, |
|
inputs=None, |
|
outputs=[maze_display, message, phase_info, progress_info] |
|
) |
|
|
|
return interface |
|
|
|
if __name__ == "__main__": |
|
experiment = MazeExperiment() |
|
interface = create_interface(experiment) |
|
interface.launch(share=True) |