import xml.etree.ElementTree as ET from typing import List, Dict import py_trees as pt # from simulator_env import SwarmAgent ######################################################################## # 1. XML Parsing Classes and Functions ######################################################################## class Node: """ A generic node representing a behavior tree element. It holds: - tag: the element's tag (e.g., "Sequence", "say", "SubTree", etc.) - attributes: a dict of the element's attributes (e.g., name, num_cycles, port values) - children: a list of child Node instances (which may be other behaviors or sub-elements) - ports: a dict grouping any port definitions found as child elements (input_port, output_port, inout_port) """ def __init__(self, tag: str, attributes: Dict[str, str]): self.tag = tag self.attributes = attributes.copy() self.children: List['Node'] = [] self.ports: Dict[str, List[Dict[str, str]]] = {} def __repr__(self): return (f"Node(tag={self.tag!r}, attributes={self.attributes!r}, " f"children={self.children!r}, ports={self.ports!r})") def parse_node(element: ET.Element) -> Node: """ Recursively parse an XML element into a Node. This function: - Reads the element's tag and attributes. - Checks for child elements that define ports (input_port, output_port, inout_port) and stores them. - Recursively parses any other child elements as behavior nodes. """ node = Node(element.tag, element.attrib) for child in element: # Check if the child defines a port (this covers the new "inout_port" as well) if child.tag in ['input_port', 'output_port', 'inout_port']: if child.tag not in node.ports: node.ports[child.tag] = [] node.ports[child.tag].append(child.attrib) else: # Otherwise, treat the child as a regular behavior node. child_node = parse_node(child) node.children.append(child_node) return node def parse_behavior_trees(xml_file: str) -> List[Node]: """ Parses the given XML file and returns a list of BehaviorTree nodes. Each element is considered a complete behavior tree (or subtree). """ tree = ET.parse(xml_file) root = tree.getroot() behavior_trees = [] for bt_elem in root.findall('BehaviorTree'): bt_node = parse_node(bt_elem) behavior_trees.append(bt_node) return behavior_trees ######################################################################## # 2. Functions that will be executed by the BT (your actions, conditions, etc.) ######################################################################## def get_function_mapping(): from simulator_env import SwarmAgent mapping = { name: func for name, func in SwarmAgent.__dict__.items() if callable(func) and not name.startswith("_") and name not in ['update','_inject_agent','obstacle','_speak'] } # print("mapping: \n", mapping) return mapping ######################################################################## # 3. Helpers and Custom py_trees Behavior Wrappers ######################################################################## def convert_param(val: str): """ Attempt to convert a string parameter to int or float if possible. Otherwise, return the string. """ try: return int(val) except ValueError: try: return float(val) except ValueError: return val # A simple leaf node that wraps a function call. class FunctionAction(pt.behaviour.Behaviour): def __init__(self, name, function, params): super(FunctionAction, self).__init__(name=name) self.function = function self.params = params self.agent = None # Will be set later def update(self): # Pass the agent (context) into the function status = self.function(self.agent, **self.params) return status # A decorator node that wraps a child behavior and calls a function. class FunctionDecorator(pt.decorators.Decorator): def __init__(self, name, function, params, child): super(FunctionDecorator, self).__init__(name=name, child=child) self.function = function self.params = params def update(self): # Ensure the child is updated. self.decorated.tick_once() child_status = self.decorated.status # Call the decorator function (for side effects) self.function(**self.params) # For this example, we simply pass through the child's status. return child_status # A control node that has one child and then calls a function. class FunctionControl(pt.behaviour.Behaviour): def __init__(self, name, function, params, child): super(FunctionControl, self).__init__(name=name) self.function = function self.params = params self.child = child def update(self): self.child.tick_once() return self.function(**self.params) # Define an AlwaysSuccess behavior to use when an unknown node is encountered. class AlwaysSuccess(pt.behaviour.Behaviour): def __init__(self, name="AlwaysSuccess"): super(AlwaysSuccess, self).__init__(name=name) def update(self): return pt.common.Status.SUCCESS ######################################################################## # 4. Convert the Parsed Node Tree into a py_trees Behavior Tree ######################################################################## def build_behavior(node: Node, subtree_mapping: Dict[str, Node]) -> pt.behaviour.Behaviour: """ Recursively converts a parsed Node (from XML) into a py_trees behavior. """ # Special case: unwrap the BehaviorTree container. if node.tag == "BehaviorTree": if node.children: return build_behavior(node.children[0], subtree_mapping) else: return AlwaysSuccess(name="Empty BehaviorTree") # Define which tags represent which kinds of nodes. composite_tags = ["Sequence", "Fallback"] repeat_tags = ["Repeat"] decorator_tags = ["Inverter","AlwaysFailure", "AlwaysSuccess"] control_tags = [] mapping = get_function_mapping() if node.tag == "Sequence": composite = pt.composites.Sequence( name=node.attributes.get('name', 'Sequence'), memory=True # Added memory parameter ) for child in node.children: composite.add_child(build_behavior(child, subtree_mapping)) return composite elif node.tag == "Fallback": composite = pt.composites.Selector( name=node.attributes.get('name', 'Fallback'), memory=True # Added memory parameter ) for child in node.children: composite.add_child(build_behavior(child, subtree_mapping)) return composite elif node.tag in repeat_tags: if len(node.children) != 1: print("Repeat node must have exactly one child!") child_behavior = build_behavior(node.children[0], subtree_mapping) # Read the number of cycles from the XML; default to 1 if not provided. num_cycles = int(node.attributes.get('num_cycles', 1)) # Create the Repeat decorator, providing the required 'num_success' parameter. repeat_decorator = pt.decorators.Repeat( name=node.attributes.get('name', 'Repeat'), child=child_behavior, num_success=num_cycles # Provide the required parameter here. ) return repeat_decorator elif node.tag == "SubTree": subtree_id = node.attributes.get('ID') if subtree_id in subtree_mapping: return build_behavior(subtree_mapping[subtree_id], subtree_mapping) else: print(f"SubTree with ID {subtree_id} not found!") return AlwaysSuccess(name="Missing SubTree") elif node.tag in decorator_tags: if len(node.children) != 1: print("Decorator node must have exactly one child!") child_behavior = build_behavior(node.children[0], subtree_mapping) params = {k: convert_param(v) for k, v in node.attributes.items() if k != "name"} return FunctionDecorator( name=node.attributes.get('name', node.tag), function=mapping[node.tag], params=params, child=child_behavior ) elif node.tag in control_tags: if len(node.children) != 1: print("Control node must have exactly one child!") child_behavior = build_behavior(node.children[0], subtree_mapping) params = {k: convert_param(v) for k, v in node.attributes.items() if k != "name"} return FunctionControl( name=node.attributes.get('name', node.tag), function=mapping[node.tag], params=params, child=child_behavior ) else: if node.tag in mapping: params = {k: convert_param(v) for k, v in node.attributes.items() if k != "name"} return FunctionAction( name=node.attributes.get('name', node.tag), function=mapping[node.tag], params=params ) else: return AlwaysSuccess(name=node.attributes.get('name', node.tag)) ######################################################################## # 5. Main: Parse XML, Build the py_trees Tree, and Execute It ######################################################################## def print_node(node, indent=0): ind = " " * indent print(f"{ind}{node.tag}: {node.attributes}") # Optionally print any ports for port_type, port_list in node.ports.items(): for port in port_list: print(f"{ind} {port_type}: {port}") for child in node.children: print_node(child, indent + 1) # Usage in your main: if __name__ == "__main__": file_path = 'tree.xml' trees = parse_behavior_trees(file_path) for tree in trees: print_node(tree) # if __name__ == "__main__": # The XML file with your behavior tree. # file_path = 'tree.xml' # # 1. Parse the XML into a list of BehaviorTree nodes. # trees = parse_behavior_trees(file_path) # # Build a mapping of BehaviorTree ID to Node. # print(trees) # subtree_mapping = { tree.attributes.get("ID"): tree for tree in trees } # print() # print(subtree_mapping)