""" DroneKit-Python interface for DeepDrone - Real Drone Control Module This module provides functions for controlling real drones using DroneKit-Python. """ import time import math # Import compatibility fix for collections.MutableMapping from . import compatibility_fix from dronekit import connect, VehicleMode, LocationGlobalRelative, Command from pymavlink import mavutil from typing import Dict, List, Optional, Tuple, Union import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('drone_control') class DroneController: """Class to handle real drone control operations using DroneKit.""" def __init__(self, connection_string: str = None): """ Initialize the drone controller. Args: connection_string: Connection string for the drone (e.g., 'udp:127.0.0.1:14550' for SITL, '/dev/ttyACM0' for serial, or 'tcp:192.168.1.1:5760' for remote connection) """ self.vehicle = None self.connection_string = connection_string self.connected = False def connect_to_drone(self, connection_string: str = None, timeout: int = 30) -> bool: """ Connect to the drone using DroneKit. Args: connection_string: Connection string for the drone (overrides the one provided in __init__) timeout: Connection timeout in seconds Returns: bool: True if connection successful, False otherwise """ if connection_string: self.connection_string = connection_string if not self.connection_string: logger.error("No connection string provided") return False try: logger.info(f"Connecting to drone on {self.connection_string}...") self.vehicle = connect(self.connection_string, wait_ready=True, timeout=timeout) self.connected = True logger.info("Connected to drone successfully") # Log basic vehicle info logger.info(f"Vehicle Version: {self.vehicle.version}") logger.info(f"Vehicle Status: {self.vehicle.system_status.state}") logger.info(f"GPS: {self.vehicle.gps_0}") logger.info(f"Battery: {self.vehicle.battery}") return True except Exception as e: logger.error(f"Error connecting to drone: {str(e)}") self.connected = False return False def disconnect(self) -> None: """Disconnect from the drone.""" if self.vehicle and self.connected: logger.info("Disconnecting from drone...") self.vehicle.close() self.connected = False logger.info("Disconnected from drone") def arm_and_takeoff(self, target_altitude: float) -> bool: """ Arms the drone and takes off to the specified altitude. Args: target_altitude: Target altitude in meters Returns: bool: True if takeoff successful, False otherwise """ if not self._ensure_connected(): return False logger.info("Arming motors...") # Switch to GUIDED mode self.vehicle.mode = VehicleMode("GUIDED") # Wait until mode change is verified timeout = 10 # seconds start = time.time() while self.vehicle.mode.name != "GUIDED": if time.time() - start > timeout: logger.error("Failed to enter GUIDED mode") return False time.sleep(0.5) # Arm the drone self.vehicle.armed = True # Wait for arming timeout = 10 # seconds start = time.time() while not self.vehicle.armed: if time.time() - start > timeout: logger.error("Failed to arm") return False time.sleep(0.5) logger.info("Taking off!") # Take off to target altitude self.vehicle.simple_takeoff(target_altitude) # Wait until target altitude reached while True: current_altitude = self.vehicle.location.global_relative_frame.alt logger.info(f"Altitude: {current_altitude}") # Break and return when we're close enough to target altitude if current_altitude >= target_altitude * 0.95: logger.info("Reached target altitude") break time.sleep(1) return True def land(self) -> bool: """ Land the drone. Returns: bool: True if land command sent successfully, False otherwise """ if not self._ensure_connected(): return False logger.info("Landing...") self.vehicle.mode = VehicleMode("LAND") return True def return_to_launch(self) -> bool: """ Return to launch location. Returns: bool: True if RTL command sent successfully, False otherwise """ if not self._ensure_connected(): return False logger.info("Returning to launch location...") self.vehicle.mode = VehicleMode("RTL") return True def goto_location(self, latitude: float, longitude: float, altitude: float) -> bool: """ Go to the specified GPS location. Args: latitude: Target latitude in degrees longitude: Target longitude in degrees altitude: Target altitude in meters (relative to home position) Returns: bool: True if goto command sent successfully, False otherwise """ if not self._ensure_connected(): return False logger.info(f"Going to location: Lat: {latitude}, Lon: {longitude}, Alt: {altitude}") # Make sure vehicle is in GUIDED mode if self.vehicle.mode.name != "GUIDED": self.vehicle.mode = VehicleMode("GUIDED") # Wait for mode change timeout = 5 start = time.time() while self.vehicle.mode.name != "GUIDED": if time.time() - start > timeout: logger.error("Failed to enter GUIDED mode") return False time.sleep(0.5) # Create LocationGlobalRelative object and send command target_location = LocationGlobalRelative(latitude, longitude, altitude) self.vehicle.simple_goto(target_location) logger.info(f"Going to location: Lat: {latitude}, Lon: {longitude}, Alt: {altitude}") return True def get_current_location(self) -> Dict[str, float]: """ Get the current GPS location of the drone. Returns: Dict containing latitude, longitude, and altitude """ if not self._ensure_connected(): return {"error": "Not connected to drone"} location = self.vehicle.location.global_relative_frame return { "latitude": location.lat, "longitude": location.lon, "altitude": location.alt } def get_battery_status(self) -> Dict[str, float]: """ Get the current battery status. Returns: Dict containing battery voltage and remaining percentage """ if not self._ensure_connected(): return {"error": "Not connected to drone"} return { "voltage": self.vehicle.battery.voltage, "level": self.vehicle.battery.level, "current": self.vehicle.battery.current } def get_airspeed(self) -> float: """ Get the current airspeed. Returns: Current airspeed in m/s """ if not self._ensure_connected(): return -1.0 return self.vehicle.airspeed def get_groundspeed(self) -> float: """ Get the current ground speed. Returns: Current ground speed in m/s """ if not self._ensure_connected(): return -1.0 return self.vehicle.groundspeed def upload_mission(self, waypoints: List[Dict[str, float]]) -> bool: """ Upload a mission with multiple waypoints to the drone. Args: waypoints: List of dictionaries with lat, lon, alt for each waypoint Returns: bool: True if mission upload successful, False otherwise """ if not self._ensure_connected(): return False logger.info(f"Uploading mission with {len(waypoints)} waypoints...") # Create list of commands cmds = self.vehicle.commands cmds.clear() # Add home location as first waypoint cmds.add(Command(0, 0, 0, mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, 0, 0, 0, 0, 0, self.vehicle.home_location.lat, self.vehicle.home_location.lon, 0)) # Add mission waypoints for idx, wp in enumerate(waypoints): # Add delay at waypoint (0 = no delay) delay = wp.get("delay", 0) # Add waypoint command cmds.add(Command(0, 0, 0, mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, 0, delay, 0, 0, 0, wp["lat"], wp["lon"], wp["alt"])) # Upload the commands to the vehicle cmds.upload() logger.info("Mission uploaded successfully") return True def execute_mission(self) -> bool: """ Execute the uploaded mission. Returns: bool: True if mission started successfully, False otherwise """ if not self._ensure_connected(): return False logger.info("Executing mission...") self.vehicle.mode = VehicleMode("AUTO") # Wait for mode change timeout = 5 start = time.time() while self.vehicle.mode.name != "AUTO": if time.time() - start > timeout: logger.error("Failed to enter AUTO mode") return False time.sleep(0.5) logger.info("Mission execution started") return True def set_airspeed(self, speed: float) -> bool: """ Set the target airspeed. Args: speed: Target airspeed in m/s Returns: bool: True if command sent successfully, False otherwise """ if not self._ensure_connected(): return False logger.info(f"Setting airspeed to {speed} m/s") self.vehicle.airspeed = speed return True def _ensure_connected(self) -> bool: """ Ensure drone is connected before executing a command. Returns: bool: True if connected, False otherwise """ if not self.vehicle or not self.connected: logger.error("Not connected to a drone. Call connect_to_drone() first.") return False return True # Convenience functions for using the controller without creating an instance _controller = None def connect_drone(connection_string: str, timeout: int = 30) -> bool: """ Connect to a drone using the specified connection string. Args: connection_string: Connection string for the drone timeout: Connection timeout in seconds Returns: bool: True if connection successful, False otherwise """ global _controller if _controller is None: _controller = DroneController() return _controller.connect_to_drone(connection_string, timeout) def disconnect_drone() -> None: """Disconnect from the drone.""" global _controller if _controller: _controller.disconnect() def takeoff(altitude: float) -> bool: """ Arm and take off to the specified altitude. Args: altitude: Target altitude in meters Returns: bool: True if takeoff successful, False otherwise """ global _controller if _controller: return _controller.arm_and_takeoff(altitude) return False def land() -> bool: """ Land the drone. Returns: bool: True if land command sent successfully, False otherwise """ global _controller if _controller: return _controller.land() return False def return_home() -> bool: """ Return to launch/home location. Returns: bool: True if RTL command sent successfully, False otherwise """ global _controller if _controller: return _controller.return_to_launch() return False def fly_to(lat: float, lon: float, alt: float) -> bool: """ Go to the specified GPS location. Args: lat: Target latitude in degrees lon: Target longitude in degrees alt: Target altitude in meters (relative to home position) Returns: bool: True if goto command sent successfully, False otherwise """ global _controller if _controller: return _controller.goto_location(lat, lon, alt) return False def get_location() -> Dict[str, float]: """ Get the current GPS location of the drone. Returns: Dict containing latitude, longitude, and altitude """ global _controller if _controller: return _controller.get_current_location() return {"error": "Not connected to drone"} def get_battery() -> Dict[str, float]: """ Get the current battery status. Returns: Dict containing battery voltage and remaining percentage """ global _controller if _controller: return _controller.get_battery_status() return {"error": "Not connected to drone"} def execute_mission_plan(waypoints: List[Dict[str, float]]) -> bool: """ Upload and execute a mission with multiple waypoints. Args: waypoints: List of dictionaries with lat, lon, alt for each waypoint Returns: bool: True if mission started successfully, False otherwise """ global _controller if _controller: if _controller.upload_mission(waypoints): return _controller.execute_mission() return False