Spaces:
Sleeping
Sleeping
File size: 16,064 Bytes
712d204 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 |
import math
import time
import pygame as pg
from vi import Agent, Config, Window, HeadlessSimulation
from typing import Optional
from queue import Queue
import numpy as np
from pygame.math import Vector2
import py_trees as pt
import parser
import xml.etree.ElementTree as ET
import threading
# import pyttsx3
class MyWindow(Window):
"""Custom window class for simulation."""
def __init__(self, width=800, height=600):
super().__init__(width, height)
class MyConfig(Config):
"""Custom configuration for simulation."""
def __init__(self, radius=25, visualise_chunks=True, window=None, movement_speed=2.0):
super().__init__(
radius=radius,
visualise_chunks=visualise_chunks,
window=window or MyWindow(800, 600),
movement_speed=movement_speed
)
class SwarmAgent(Agent):
def __init__(self, images, simulation, pos, nest_pos, target_pos):
super().__init__(images=images, simulation=simulation)
# Ensure the agent gets the configuration from the simulation.
self.config = simulation.config
self.pos = pos
self.nest_pos = nest_pos
self.target_pos = target_pos
self.target_detected_flag = False
self.target_reached_flag = False
self.is_agent_in_nest_flag = False
self.obstacle_radius = 3
self.state = "seeking"
self.bt_active = True # Add a flag
# self.tts_engine = pyttsx3.init() # Initialize text-to-speech engine
file_path = "tree.xml"
trees = parser.parse_behavior_trees(file_path)
subtree_mapping = { tree.attributes.get("ID"): tree for tree in trees }
xml_tree = ET.parse(file_path)
xml_root = xml_tree.getroot()
main_tree_id = xml_root.attrib.get("main_tree_to_execute")
if not main_tree_id or main_tree_id not in subtree_mapping:
raise ValueError("Main tree not found in the XML!")
main_tree_node = subtree_mapping[main_tree_id]
# Build the py_trees tree:
self.bt = parser.build_behavior(main_tree_node, subtree_mapping)
# Inject the agent instance into all leaf behaviors.
self._inject_agent(self.bt)
def _inject_agent(self, node):
"""Recursively set the agent for any custom BT nodes."""
if hasattr(node, "agent"):
node.agent = self
if hasattr(node, "children"):
for child in node.children:
self._inject_agent(child)
def update(self):
if self.bt_active:
self.bt.tick_once()
def obstacle(self):
"""
Check for obstacle intersections within a predefined radius.
Returns: True if an obstacle is detected within the radius, False otherwise.
"""
for intersection in self.obstacle_intersections(scale=self.obstacle_radius):
return True
return False
# def update(self):
# self.bt.tick_once()
# # self.root_node.run(self)
# def say(self, message: str):
# """
# Action Node: Speak the provided message using text-to-speech if it hasn't been spoken before.
# Args: message (str): The message to be spoken.
# Returns: Always returns SUCCESS, indicating the action was executed.
# """
# if not hasattr(self, 'old_message'):
# self.old_message = []
# # Only speak the message if it has not been spoken before (i.e. not in old_message)
# if message not in self.old_message:
# self.tts_engine.say(message)
# self.tts_engine.runAndWait()
# self.old_message.append(message)
# return pt.common.Status.SUCCESS
def flocking(self):
"""
Action Node: Adjust the agent's move vector by blending alignment and separation forces from nearby agents.
Returns: Always returns SUCCESS, indicating the action was executed.
"""
nearby_agents = list(self.in_proximity_accuracy().without_distance())
if not nearby_agents:
return pt.common.Status.SUCCESS
alignment = Vector2(0, 0)
separation = Vector2(0, 0)
separation_count = 0
# Desired minimum separation distance (adjust as needed)
separation_threshold = 3
# Calculate alignment and separation contributions.
for other in nearby_agents:
alignment += other.move
diff = self.pos - other.pos
distance = diff.length()
if 0 < distance < separation_threshold:
# The closer the neighbor, the stronger the repulsive force.
separation += diff.normalize() * (separation_threshold - distance)
separation_count += 1
# Average the alignment vector over all neighbors.
alignment /= len(nearby_agents)
# If any agents are too close, average the separation vector.
if separation_count > 0:
separation /= separation_count
# Blend the two influences. Here, alignment has a stronger influence than separation.
# Adjust the blend factor (e.g., 0.3) to control separation influence.
blended_force = alignment.lerp(separation, 0.3)
# Smoothly blend the current move with the blended force.
self.move = self.move.lerp(blended_force, 0.5)
# Normalize and scale to the configured movement speed.
if self.move.length() > 0:
self.move = self.move.normalize() * self.config.movement_speed
# Update position and apply wrap-around if necessary.
self.pos += self.move
self.there_is_no_escape()
return pt.common.Status.SUCCESS
def align_with_swarm(self):
"""
Action Node: Align the agent's move vector with the average movement of nearby agents.
Returns: Always returns SUCCESS, indicating the action was executed.
"""
nearby_agents = list(self.in_proximity_accuracy().without_distance())
if not nearby_agents:
return pt.common.Status.SUCCESS
avg_direction = Vector2(0, 0)
for other in nearby_agents:
avg_direction += other.move
avg_direction /= len(nearby_agents)
# Blend current movement with average direction.
self.move = self.move.lerp(avg_direction, 0.5)
if self.move.length() > 0:
self.move = self.move.normalize() * self.config.movement_speed
# Update position and wrap-around if necessary.
self.pos += self.move
self.there_is_no_escape()
return pt.common.Status.SUCCESS
def is_obstacle_detected(self):
"""
Condition node: Determine if any obstacles are detected in the vicinity of the agent.
Returns: SUCCESS if an obstacle is detected, FAILURE otherwise.
"""
if self.obstacle():
return pt.common.Status.SUCCESS
else:
return pt.common.Status.FAILURE
def avoid_obstacle(self):
"""
Action node: Execute an action to avoid detected obstacles.
Returns: Always returns SUCCESS, indicating the action was executed.
"""
self.move.rotate_ip(180)
return pt.common.Status.SUCCESS
def is_target_detected(self):
"""
Condition node: Check if the target is within a detectable distance from the agent's position.
Returns: SUCCESS if the target is within 20 units of distance, FAILURE otherwise.
"""
distance = math.dist(self.target_pos, self.pos)
if distance <= 20:
self.target_detected_flag = True
if self.target_detected_flag:
return pt.common.Status.SUCCESS
return pt.common.Status.FAILURE
def is_target_reached(self):
"""
Condition node: Check if the agent has reached the target.
Returns: SUCCESS if the target is within 15 units of distance, FAILURE otherwise.
"""
distance = math.dist(self.target_pos, self.pos)
if distance <= 15:
self.target_reached_flag = True
if self.target_reached_flag:
return pt.common.Status.SUCCESS
return pt.common.Status.FAILURE
def change_color(self, color):
"""
Action Node: Change the agent's color to 'white', 'green', or 'red'.
Args: color (str): Color name.
Returns: Always returns SUCCESS, indicating the action was executed.
"""
color = color.lower()
if color == "white":
self.change_image(0)
elif color == "green":
self.change_image(1)
elif color == "red":
self.change_image(2)
return pt.common.Status.SUCCESS
def is_agent_in_nest(self):
"""
Condition node: Determine if the agent is in the nest.
Returns: SUCCESS if the agent is in the nest, FAILURE otherwise.
"""
distance = math.dist(self.nest_pos, self.pos)
if distance <= 17 and (self.target_reached_flag==True or self.target_detected_flag == True or self.state == "completed" ) :
self.state = "seeking"
# self.target_detected_flag = False
# self.target_reached_flag = False
self.is_agent_in_nest_flag = True
if self.is_agent_in_nest_flag:
return pt.common.Status.SUCCESS
return pt.common.Status.FAILURE
def agent_movement_freeze(self):
"""
Action node: Freeze the agent's movement, typically to indicate a stop in activity.
Returns: Always returns SUCCESS, indicating the action was executed.
"""
self.freeze_movement()
return pt.common.Status.SUCCESS
def continue_movement_agent(self):
"""
Action node: Continue the agent's movement after it has been previously frozen.
Returns: Always returns SUCCESS, indicating the action was executed.
"""
self.continue_movement()
return pt.common.Status.SUCCESS
def move_randomly(self):
"""
Action node: Perform a wandering action where the agent moves randomly within the environment.
Returns: Always returns SUCCESS, indicating the action was executed.
"""
Agent.change_position(self)
return pt.common.Status.SUCCESS
def is_path_clear(self):
"""
Condition node: Check if the path ahead of the agent is clear of obstacles.
Returns: SUCCESS if no obstacles are detected ahead, FAILURE if obstacles are present.
"""
# return not self.obstacle()
if not self.obstacle():
return pt.common.Status.SUCCESS
else:
return pt.common.Status.FAILURE
def is_line_formed(self):
"""
Condition node: Determine if the agent has formed a line with a reference point at the center of the window.
Returns: SUCCESS if the line is formed with the center, FAILURE otherwise.
"""
center_x = self.config.window.width / 2
direction = Vector2(center_x, self.pos.y) - self.pos
if direction.length() > 0.5:
return pt.common.Status.FAILURE
return pt.common.Status.SUCCESS
def form_line(self):
"""
Action node: Direct the agent to form a line towards the center of the window. This function adjuststhe agent's position to align it with the center.
Returns: Always returns SUCCESS, indicating the action was executed.
"""
# print("form_line")
center_x = self.config.window.width / 2
direction = Vector2(center_x, self.pos.y) - self.pos
if direction.length() > 0.5:
direction.scale_to_length(self.config.movement_speed)
self.pos += direction
return pt.common.Status.SUCCESS
# def task_completed(self):
# """
# Action node: Signal that the agent has completed its designated task. Returns: Always returns True, indicating that the task completion action was executed.
# """
# self.state = "completed"
# return pt.common.Status.SUCCESS
class StreamableSimulation(HeadlessSimulation):
"""Modified Simulation class that captures frames for streaming."""
def __init__(self, config: Optional[Config] = None):
super().__init__(config)
pg.init()
size = self.config.window.as_tuple()
self._screen = pg.Surface(size, pg.SRCALPHA)
self._background = pg.Surface(size, pg.SRCALPHA)
self._background.fill((0, 0, 0))
self.frame_queue = Queue(maxsize=30)
self.running = True
self._frame_lock = threading.Lock()
def get_frame(self):
with self._frame_lock:
surf_copy = self._screen.copy()
frame = np.array(pg.surfarray.pixels3d(surf_copy))
return np.transpose(frame, (1, 0, 2))
def tick(self):
"""Run a simulation step and capture frames."""
super().tick()
with self._frame_lock:
self._screen.blit(self._background, (0, 0))
for sprite in self._all.sprites():
self._screen.blit(sprite.image, sprite.rect)
try:
frame = self.get_frame()
self.frame_queue.put(frame, block=False)
except Queue.Full:
print("Frame queue is full. Dropping frame.")
# def _load_image(self, path: str) -> pg.surface.Surface:
# """Load an image from the given path."""
# return pg.image.load(path)
def _load_image(self, paths):
"""Load one or more images from given paths."""
if isinstance(paths, str): # If it's a single string, load normally
return pg.image.load(paths)
elif isinstance(paths, list): # If it's a list, load all images
return [pg.image.load(path) for path in paths]
raise TypeError("Expected a string (file path) or a list of file paths")
def stop(self):
"""Stop the simulation."""
# Do not try to call self.bt.stop() because simulation does not own a BT.
# self.running = False
super().stop()
pg.quit() # Quit the Pygame environment
# if __name__=="__main__":
# # Define nest and target positions
# nest_x, nest_y = 450, 400
# target_x, target_y = 200, 100
# nest_pos = Vector2(nest_x, nest_y)
# target_pos = Vector2(target_x, target_y)
# # Load images for agents
# agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"]
# config = MyConfig(radius=250, visualise_chunks=True, movement_speed=2)
# sim = StreamableSimulation(config=config)
# # Load images
# loaded_agent_images = sim._load_image(agent_images_paths)
# # Initialize agents with behavior tree parsing
# for _ in range(50):
# agent = SwarmAgent(
# images=loaded_agent_images,
# simulation=sim,
# pos=Vector2(nest_x, nest_y),
# nest_pos=nest_pos,
# target_pos=target_pos,
# )
# sim._agents.add(agent)
# sim._all.add(agent)
# # Draw environment elements
# sim.spawn_obstacle("./images/rect_obst.png", 350, 100)
# sim.spawn_obstacle("./images/rect_obst (1).png", 100, 350)
# sim.spawn_site("./images/rect.png", target_x, target_y)
# sim.spawn_site("./images/nest.png", nest_x, nest_y)
# for agent in sim._agents:
# agent.bt.tick_once()
# # Then run your simulation loop without ticking the BT further.
# while sim.running:
# sim.tick()
# if not sim.frame_queue.empty():
# frame = sim.frame_queue.get()
# # update_frame(frame) or display the frame as needed.
# time.sleep(1/30) |