File size: 16,064 Bytes
712d204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
import math
import time
import pygame as pg
from vi import Agent, Config, Window, HeadlessSimulation
from typing import Optional
from queue import Queue
import numpy as np
from pygame.math import Vector2
import py_trees as pt
import parser
import xml.etree.ElementTree as ET
import threading
# import pyttsx3

class MyWindow(Window):
    """Custom window class for simulation."""
    def __init__(self, width=800, height=600):
        super().__init__(width, height)




class MyConfig(Config):
    """Custom configuration for simulation."""
    def __init__(self, radius=25, visualise_chunks=True, window=None, movement_speed=2.0):
        super().__init__(
            radius=radius,
            visualise_chunks=visualise_chunks,
            window=window or MyWindow(800, 600),
            movement_speed=movement_speed
        )




class SwarmAgent(Agent):
    def __init__(self, images, simulation, pos, nest_pos, target_pos):
        super().__init__(images=images, simulation=simulation)
        # Ensure the agent gets the configuration from the simulation.
        self.config = simulation.config  
        
        self.pos = pos
        self.nest_pos = nest_pos
        self.target_pos = target_pos  
        self.target_detected_flag = False
        self.target_reached_flag = False
        self.is_agent_in_nest_flag = False
        self.obstacle_radius = 3
        self.state = "seeking"
        self.bt_active = True  # Add a flag
        # self.tts_engine = pyttsx3.init()  # Initialize text-to-speech engine

        file_path = "tree.xml"
        trees = parser.parse_behavior_trees(file_path)
        subtree_mapping = { tree.attributes.get("ID"): tree for tree in trees }

        
        xml_tree = ET.parse(file_path)
        xml_root = xml_tree.getroot()
        main_tree_id = xml_root.attrib.get("main_tree_to_execute")
        
        if not main_tree_id or main_tree_id not in subtree_mapping:
            raise ValueError("Main tree not found in the XML!")
        main_tree_node = subtree_mapping[main_tree_id]
        
        # Build the py_trees tree:
        self.bt = parser.build_behavior(main_tree_node, subtree_mapping)
        
        # Inject the agent instance into all leaf behaviors.
        self._inject_agent(self.bt)
        
    def _inject_agent(self, node):
        """Recursively set the agent for any custom BT nodes."""
        if hasattr(node, "agent"):
            node.agent = self
        if hasattr(node, "children"):
            for child in node.children:
                self._inject_agent(child)

    def update(self):
        if self.bt_active:
            self.bt.tick_once()

    def obstacle(self):
        """
        Check for obstacle intersections within a predefined radius.        
        Returns: True if an obstacle is detected within the radius, False otherwise.
        """
        for intersection in self.obstacle_intersections(scale=self.obstacle_radius):
            return True
        return False


    # def update(self):
    #     self.bt.tick_once()
    #     # self.root_node.run(self)

    # def say(self, message: str):
    #     """
    #     Action Node: Speak the provided message using text-to-speech if it hasn't been spoken before.
    #     Args: message (str): The message to be spoken.
    #     Returns: Always returns SUCCESS, indicating the action was executed.
    #     """
    #     if not hasattr(self, 'old_message'):
    #         self.old_message = []
                
    #     # Only speak the message if it has not been spoken before (i.e. not in old_message)
    #     if message not in self.old_message:
    #         self.tts_engine.say(message)
    #         self.tts_engine.runAndWait()
    #         self.old_message.append(message)
            
    #     return pt.common.Status.SUCCESS

    def flocking(self):
        """
        Action Node: Adjust the agent's move vector by blending alignment and separation forces from nearby agents.
        Returns: Always returns SUCCESS, indicating the action was executed.
        """
        nearby_agents = list(self.in_proximity_accuracy().without_distance())
        if not nearby_agents:
            return pt.common.Status.SUCCESS

        alignment = Vector2(0, 0)
        separation = Vector2(0, 0)
        separation_count = 0

        # Desired minimum separation distance (adjust as needed)
        separation_threshold = 3

        # Calculate alignment and separation contributions.
        for other in nearby_agents:
            alignment += other.move
            
            diff = self.pos - other.pos
            distance = diff.length()
            if 0 < distance < separation_threshold:
                # The closer the neighbor, the stronger the repulsive force.
                separation += diff.normalize() * (separation_threshold - distance)
                separation_count += 1

        # Average the alignment vector over all neighbors.
        alignment /= len(nearby_agents)
        
        # If any agents are too close, average the separation vector.
        if separation_count > 0:
            separation /= separation_count

        # Blend the two influences. Here, alignment has a stronger influence than separation.
        # Adjust the blend factor (e.g., 0.3) to control separation influence.
        blended_force = alignment.lerp(separation, 0.3)
        
        # Smoothly blend the current move with the blended force.
        self.move = self.move.lerp(blended_force, 0.5)
        
        # Normalize and scale to the configured movement speed.
        if self.move.length() > 0:
            self.move = self.move.normalize() * self.config.movement_speed

        # Update position and apply wrap-around if necessary.
        self.pos += self.move
        self.there_is_no_escape()
        
        return pt.common.Status.SUCCESS


    
    def align_with_swarm(self):
        """
        Action Node: Align the agent's move vector with the average movement of nearby agents.
        Returns: Always returns SUCCESS, indicating the action was executed.
        """
        nearby_agents = list(self.in_proximity_accuracy().without_distance())
        if not nearby_agents:
            return pt.common.Status.SUCCESS

        avg_direction = Vector2(0, 0)
        for other in nearby_agents:
            avg_direction += other.move
        avg_direction /= len(nearby_agents)
        
        # Blend current movement with average direction.
        self.move = self.move.lerp(avg_direction, 0.5)
        if self.move.length() > 0:
            self.move = self.move.normalize() * self.config.movement_speed

        # Update position and wrap-around if necessary.
        self.pos += self.move
        self.there_is_no_escape()
        
        return pt.common.Status.SUCCESS
 

    def is_obstacle_detected(self):
        """
        Condition node: Determine if any obstacles are detected in the vicinity of the agent. 
        Returns: SUCCESS if an obstacle is detected, FAILURE otherwise.
        """
        if self.obstacle():
            return pt.common.Status.SUCCESS
        else:
            return pt.common.Status.FAILURE


    def avoid_obstacle(self):
        """
        Action node: Execute an action to avoid detected obstacles. 
        Returns: Always returns SUCCESS, indicating the action was executed.
        """
        self.move.rotate_ip(180)
        return pt.common.Status.SUCCESS
    
    def is_target_detected(self):
        """
        Condition node: Check if the target is within a detectable distance from the agent's position. 
        Returns: SUCCESS if the target is within 20 units of distance, FAILURE otherwise.
        """
        distance = math.dist(self.target_pos, self.pos)
        if distance <= 20:
            self.target_detected_flag = True        
        if self.target_detected_flag:
            return pt.common.Status.SUCCESS
        return pt.common.Status.FAILURE
        
    
    def is_target_reached(self):
        """
        Condition node: Check if the agent has reached the target. 
        Returns: SUCCESS if the target is within 15 units of distance, FAILURE otherwise.
        """
        distance = math.dist(self.target_pos, self.pos)
        if distance <= 15:
            self.target_reached_flag = True        
        if self.target_reached_flag:            
            return pt.common.Status.SUCCESS
        return pt.common.Status.FAILURE
    
    def change_color(self, color):
        """
        Action Node: Change the agent's color to 'white', 'green', or 'red'.
        Args: color (str): Color name.
        Returns: Always returns SUCCESS, indicating the action was executed.
        """
        color = color.lower()
        if color == "white":
            self.change_image(0)
        elif color == "green":
            self.change_image(1)
        elif color == "red":
            self.change_image(2)

        return pt.common.Status.SUCCESS
        

    def is_agent_in_nest(self):
        """
        Condition node: Determine if the agent is in the nest.
        Returns: SUCCESS if the agent is in the nest, FAILURE otherwise.
        """
        distance = math.dist(self.nest_pos, self.pos)
        if distance <= 17 and (self.target_reached_flag==True or self.target_detected_flag == True or self.state == "completed" ) :
            self.state = "seeking"
            # self.target_detected_flag = False
            # self.target_reached_flag = False   
            self.is_agent_in_nest_flag = True

        if self.is_agent_in_nest_flag:
            return pt.common.Status.SUCCESS
        return pt.common.Status.FAILURE


    def agent_movement_freeze(self):
        """
        Action node: Freeze the agent's movement, typically to indicate a stop in activity.
        Returns: Always returns SUCCESS, indicating the action was executed.
        """
        self.freeze_movement()
        return pt.common.Status.SUCCESS
    
    def continue_movement_agent(self):
        """
        Action node: Continue the agent's movement after it has been previously frozen.
        Returns: Always returns SUCCESS, indicating the action was executed.
        """
        self.continue_movement()
        return pt.common.Status.SUCCESS

    def move_randomly(self):
        """
        Action node: Perform a wandering action where the agent moves randomly within the environment.
        Returns: Always returns SUCCESS, indicating the action was executed.
        """
        Agent.change_position(self)
        return pt.common.Status.SUCCESS

    def is_path_clear(self):
        """
        Condition node: Check if the path ahead of the agent is clear of obstacles.
        Returns: SUCCESS if no obstacles are detected ahead, FAILURE if obstacles are present.
        """
        # return not self.obstacle()

        if not self.obstacle():
            return pt.common.Status.SUCCESS
        else:
            return pt.common.Status.FAILURE
    
    def is_line_formed(self):
        """
        Condition node: Determine if the agent has formed a line with a reference point at the center of the window.
        Returns: SUCCESS if the line is formed with the center, FAILURE otherwise.
        """
        center_x = self.config.window.width / 2
        direction = Vector2(center_x, self.pos.y) - self.pos
        if direction.length() > 0.5:
            return pt.common.Status.FAILURE        
        return pt.common.Status.SUCCESS

    def form_line(self):
        """
        Action node: Direct the agent to form a line towards the center of the window. This function adjuststhe agent's position to align it with the center.
        Returns: Always returns SUCCESS, indicating the action was executed.
        """
        # print("form_line")
        center_x = self.config.window.width / 2
        direction = Vector2(center_x, self.pos.y) - self.pos
        if direction.length() > 0.5:
            direction.scale_to_length(self.config.movement_speed)
            self.pos += direction     
        return pt.common.Status.SUCCESS
    
    # def task_completed(self):
    #     """
    #     Action node: Signal that the agent has completed its designated task. Returns: Always returns True, indicating that the task completion action was executed.
    #     """
    #     self.state = "completed"
    #     return pt.common.Status.SUCCESS
    








class StreamableSimulation(HeadlessSimulation):
    """Modified Simulation class that captures frames for streaming."""
    def __init__(self, config: Optional[Config] = None):
        super().__init__(config)
        pg.init()

        size = self.config.window.as_tuple()
        self._screen = pg.Surface(size, pg.SRCALPHA)
        self._background = pg.Surface(size, pg.SRCALPHA)
        self._background.fill((0, 0, 0))

        self.frame_queue = Queue(maxsize=30)
        self.running = True
        self._frame_lock = threading.Lock()

    
    def get_frame(self):
        with self._frame_lock:
            surf_copy = self._screen.copy()
            frame = np.array(pg.surfarray.pixels3d(surf_copy))
            return np.transpose(frame, (1, 0, 2))

    def tick(self):
        """Run a simulation step and capture frames."""
        super().tick()

        with self._frame_lock:
            self._screen.blit(self._background, (0, 0))
            for sprite in self._all.sprites():
                self._screen.blit(sprite.image, sprite.rect)

        try:
            frame = self.get_frame()
            self.frame_queue.put(frame, block=False)
        except Queue.Full:
            print("Frame queue is full. Dropping frame.")

    # def _load_image(self, path: str) -> pg.surface.Surface:
    #     """Load an image from the given path."""
    #     return pg.image.load(path)
    
    def _load_image(self, paths):
        """Load one or more images from given paths."""
        if isinstance(paths, str):  # If it's a single string, load normally
            return pg.image.load(paths)
        elif isinstance(paths, list):  # If it's a list, load all images
            return [pg.image.load(path) for path in paths]
        raise TypeError("Expected a string (file path) or a list of file paths")

    def stop(self):
        """Stop the simulation."""
        # Do not try to call self.bt.stop() because simulation does not own a BT.
        # self.running = False
        super().stop()
        pg.quit()       # Quit the Pygame environment
        





# if __name__=="__main__":

#     # Define nest and target positions
#     nest_x, nest_y = 450, 400
#     target_x, target_y = 200, 100
#     nest_pos = Vector2(nest_x, nest_y)
#     target_pos = Vector2(target_x, target_y)

#     # Load images for agents
#     agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"]

#     config = MyConfig(radius=250, visualise_chunks=True, movement_speed=2)
#     sim = StreamableSimulation(config=config)

#     # Load images
#     loaded_agent_images = sim._load_image(agent_images_paths)



#     # Initialize agents with behavior tree parsing
#     for _ in range(50):
#         agent = SwarmAgent(
#             images=loaded_agent_images,
#             simulation=sim,
#             pos=Vector2(nest_x, nest_y),
#             nest_pos=nest_pos,
#             target_pos=target_pos,
#         )
#         sim._agents.add(agent)
#         sim._all.add(agent)
#     # Draw environment elements
#     sim.spawn_obstacle("./images/rect_obst.png", 350, 100)
#     sim.spawn_obstacle("./images/rect_obst (1).png", 100, 350)
#     sim.spawn_site("./images/rect.png", target_x, target_y)
#     sim.spawn_site("./images/nest.png", nest_x, nest_y)

#     for agent in sim._agents:
#         agent.bt.tick_once()
    
#     # Then run your simulation loop without ticking the BT further.
#     while sim.running:
#         sim.tick()
#         if not sim.frame_queue.empty():
#             frame = sim.frame_queue.get()
#             # update_frame(frame) or display the frame as needed.
#         time.sleep(1/30)