Spaces:
Running
Running
""" | |
DroneKit-Python interface for DeepDrone - Real Drone Control Module | |
This module provides functions for controlling real drones using DroneKit-Python. | |
""" | |
import time | |
import math | |
# Import compatibility fix for collections.MutableMapping | |
from . import compatibility_fix | |
from dronekit import connect, VehicleMode, LocationGlobalRelative, Command | |
from pymavlink import mavutil | |
from typing import Dict, List, Optional, Tuple, Union | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger('drone_control') | |
class DroneController: | |
"""Class to handle real drone control operations using DroneKit.""" | |
def __init__(self, connection_string: str = None): | |
""" | |
Initialize the drone controller. | |
Args: | |
connection_string: Connection string for the drone (e.g., 'udp:127.0.0.1:14550' for SITL, | |
'/dev/ttyACM0' for serial, or 'tcp:192.168.1.1:5760' for remote connection) | |
""" | |
self.vehicle = None | |
self.connection_string = connection_string | |
self.connected = False | |
def connect_to_drone(self, connection_string: str = None, timeout: int = 30) -> bool: | |
""" | |
Connect to the drone using DroneKit. | |
Args: | |
connection_string: Connection string for the drone (overrides the one provided in __init__) | |
timeout: Connection timeout in seconds | |
Returns: | |
bool: True if connection successful, False otherwise | |
""" | |
if connection_string: | |
self.connection_string = connection_string | |
if not self.connection_string: | |
logger.error("No connection string provided") | |
return False | |
try: | |
logger.info(f"Connecting to drone on {self.connection_string}...") | |
self.vehicle = connect(self.connection_string, wait_ready=True, timeout=timeout) | |
self.connected = True | |
logger.info("Connected to drone successfully") | |
# Log basic vehicle info | |
logger.info(f"Vehicle Version: {self.vehicle.version}") | |
logger.info(f"Vehicle Status: {self.vehicle.system_status.state}") | |
logger.info(f"GPS: {self.vehicle.gps_0}") | |
logger.info(f"Battery: {self.vehicle.battery}") | |
return True | |
except Exception as e: | |
logger.error(f"Error connecting to drone: {str(e)}") | |
self.connected = False | |
return False | |
def disconnect(self) -> None: | |
"""Disconnect from the drone.""" | |
if self.vehicle and self.connected: | |
logger.info("Disconnecting from drone...") | |
self.vehicle.close() | |
self.connected = False | |
logger.info("Disconnected from drone") | |
def arm_and_takeoff(self, target_altitude: float) -> bool: | |
""" | |
Arms the drone and takes off to the specified altitude. | |
Args: | |
target_altitude: Target altitude in meters | |
Returns: | |
bool: True if takeoff successful, False otherwise | |
""" | |
if not self._ensure_connected(): | |
return False | |
logger.info("Arming motors...") | |
# Switch to GUIDED mode | |
self.vehicle.mode = VehicleMode("GUIDED") | |
# Wait until mode change is verified | |
timeout = 10 # seconds | |
start = time.time() | |
while self.vehicle.mode.name != "GUIDED": | |
if time.time() - start > timeout: | |
logger.error("Failed to enter GUIDED mode") | |
return False | |
time.sleep(0.5) | |
# Arm the drone | |
self.vehicle.armed = True | |
# Wait for arming | |
timeout = 10 # seconds | |
start = time.time() | |
while not self.vehicle.armed: | |
if time.time() - start > timeout: | |
logger.error("Failed to arm") | |
return False | |
time.sleep(0.5) | |
logger.info("Taking off!") | |
# Take off to target altitude | |
self.vehicle.simple_takeoff(target_altitude) | |
# Wait until target altitude reached | |
while True: | |
current_altitude = self.vehicle.location.global_relative_frame.alt | |
logger.info(f"Altitude: {current_altitude}") | |
# Break and return when we're close enough to target altitude | |
if current_altitude >= target_altitude * 0.95: | |
logger.info("Reached target altitude") | |
break | |
time.sleep(1) | |
return True | |
def land(self) -> bool: | |
""" | |
Land the drone. | |
Returns: | |
bool: True if land command sent successfully, False otherwise | |
""" | |
if not self._ensure_connected(): | |
return False | |
logger.info("Landing...") | |
self.vehicle.mode = VehicleMode("LAND") | |
return True | |
def return_to_launch(self) -> bool: | |
""" | |
Return to launch location. | |
Returns: | |
bool: True if RTL command sent successfully, False otherwise | |
""" | |
if not self._ensure_connected(): | |
return False | |
logger.info("Returning to launch location...") | |
self.vehicle.mode = VehicleMode("RTL") | |
return True | |
def goto_location(self, latitude: float, longitude: float, altitude: float) -> bool: | |
""" | |
Go to the specified GPS location. | |
Args: | |
latitude: Target latitude in degrees | |
longitude: Target longitude in degrees | |
altitude: Target altitude in meters (relative to home position) | |
Returns: | |
bool: True if goto command sent successfully, False otherwise | |
""" | |
if not self._ensure_connected(): | |
return False | |
logger.info(f"Going to location: Lat: {latitude}, Lon: {longitude}, Alt: {altitude}") | |
# Make sure vehicle is in GUIDED mode | |
if self.vehicle.mode.name != "GUIDED": | |
self.vehicle.mode = VehicleMode("GUIDED") | |
# Wait for mode change | |
timeout = 5 | |
start = time.time() | |
while self.vehicle.mode.name != "GUIDED": | |
if time.time() - start > timeout: | |
logger.error("Failed to enter GUIDED mode") | |
return False | |
time.sleep(0.5) | |
# Create LocationGlobalRelative object and send command | |
target_location = LocationGlobalRelative(latitude, longitude, altitude) | |
self.vehicle.simple_goto(target_location) | |
logger.info(f"Going to location: Lat: {latitude}, Lon: {longitude}, Alt: {altitude}") | |
return True | |
def get_current_location(self) -> Dict[str, float]: | |
""" | |
Get the current GPS location of the drone. | |
Returns: | |
Dict containing latitude, longitude, and altitude | |
""" | |
if not self._ensure_connected(): | |
return {"error": "Not connected to drone"} | |
location = self.vehicle.location.global_relative_frame | |
return { | |
"latitude": location.lat, | |
"longitude": location.lon, | |
"altitude": location.alt | |
} | |
def get_battery_status(self) -> Dict[str, float]: | |
""" | |
Get the current battery status. | |
Returns: | |
Dict containing battery voltage and remaining percentage | |
""" | |
if not self._ensure_connected(): | |
return {"error": "Not connected to drone"} | |
return { | |
"voltage": self.vehicle.battery.voltage, | |
"level": self.vehicle.battery.level, | |
"current": self.vehicle.battery.current | |
} | |
def get_airspeed(self) -> float: | |
""" | |
Get the current airspeed. | |
Returns: | |
Current airspeed in m/s | |
""" | |
if not self._ensure_connected(): | |
return -1.0 | |
return self.vehicle.airspeed | |
def get_groundspeed(self) -> float: | |
""" | |
Get the current ground speed. | |
Returns: | |
Current ground speed in m/s | |
""" | |
if not self._ensure_connected(): | |
return -1.0 | |
return self.vehicle.groundspeed | |
def upload_mission(self, waypoints: List[Dict[str, float]]) -> bool: | |
""" | |
Upload a mission with multiple waypoints to the drone. | |
Args: | |
waypoints: List of dictionaries with lat, lon, alt for each waypoint | |
Returns: | |
bool: True if mission upload successful, False otherwise | |
""" | |
if not self._ensure_connected(): | |
return False | |
logger.info(f"Uploading mission with {len(waypoints)} waypoints...") | |
# Create list of commands | |
cmds = self.vehicle.commands | |
cmds.clear() | |
# Add home location as first waypoint | |
cmds.add(Command(0, 0, 0, mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, | |
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, 0, 0, 0, 0, 0, | |
self.vehicle.home_location.lat, | |
self.vehicle.home_location.lon, | |
0)) | |
# Add mission waypoints | |
for idx, wp in enumerate(waypoints): | |
# Add delay at waypoint (0 = no delay) | |
delay = wp.get("delay", 0) | |
# Add waypoint command | |
cmds.add(Command(0, 0, 0, mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, | |
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, 0, delay, 0, 0, 0, | |
wp["lat"], wp["lon"], wp["alt"])) | |
# Upload the commands to the vehicle | |
cmds.upload() | |
logger.info("Mission uploaded successfully") | |
return True | |
def execute_mission(self) -> bool: | |
""" | |
Execute the uploaded mission. | |
Returns: | |
bool: True if mission started successfully, False otherwise | |
""" | |
if not self._ensure_connected(): | |
return False | |
logger.info("Executing mission...") | |
self.vehicle.mode = VehicleMode("AUTO") | |
# Wait for mode change | |
timeout = 5 | |
start = time.time() | |
while self.vehicle.mode.name != "AUTO": | |
if time.time() - start > timeout: | |
logger.error("Failed to enter AUTO mode") | |
return False | |
time.sleep(0.5) | |
logger.info("Mission execution started") | |
return True | |
def set_airspeed(self, speed: float) -> bool: | |
""" | |
Set the target airspeed. | |
Args: | |
speed: Target airspeed in m/s | |
Returns: | |
bool: True if command sent successfully, False otherwise | |
""" | |
if not self._ensure_connected(): | |
return False | |
logger.info(f"Setting airspeed to {speed} m/s") | |
self.vehicle.airspeed = speed | |
return True | |
def _ensure_connected(self) -> bool: | |
""" | |
Ensure drone is connected before executing a command. | |
Returns: | |
bool: True if connected, False otherwise | |
""" | |
if not self.vehicle or not self.connected: | |
logger.error("Not connected to a drone. Call connect_to_drone() first.") | |
return False | |
return True | |
# Convenience functions for using the controller without creating an instance | |
_controller = None | |
def connect_drone(connection_string: str, timeout: int = 30) -> bool: | |
""" | |
Connect to a drone using the specified connection string. | |
Args: | |
connection_string: Connection string for the drone | |
timeout: Connection timeout in seconds | |
Returns: | |
bool: True if connection successful, False otherwise | |
""" | |
global _controller | |
if _controller is None: | |
_controller = DroneController() | |
return _controller.connect_to_drone(connection_string, timeout) | |
def disconnect_drone() -> None: | |
"""Disconnect from the drone.""" | |
global _controller | |
if _controller: | |
_controller.disconnect() | |
def takeoff(altitude: float) -> bool: | |
""" | |
Arm and take off to the specified altitude. | |
Args: | |
altitude: Target altitude in meters | |
Returns: | |
bool: True if takeoff successful, False otherwise | |
""" | |
global _controller | |
if _controller: | |
return _controller.arm_and_takeoff(altitude) | |
return False | |
def land() -> bool: | |
""" | |
Land the drone. | |
Returns: | |
bool: True if land command sent successfully, False otherwise | |
""" | |
global _controller | |
if _controller: | |
return _controller.land() | |
return False | |
def return_home() -> bool: | |
""" | |
Return to launch/home location. | |
Returns: | |
bool: True if RTL command sent successfully, False otherwise | |
""" | |
global _controller | |
if _controller: | |
return _controller.return_to_launch() | |
return False | |
def fly_to(lat: float, lon: float, alt: float) -> bool: | |
""" | |
Go to the specified GPS location. | |
Args: | |
lat: Target latitude in degrees | |
lon: Target longitude in degrees | |
alt: Target altitude in meters (relative to home position) | |
Returns: | |
bool: True if goto command sent successfully, False otherwise | |
""" | |
global _controller | |
if _controller: | |
return _controller.goto_location(lat, lon, alt) | |
return False | |
def get_location() -> Dict[str, float]: | |
""" | |
Get the current GPS location of the drone. | |
Returns: | |
Dict containing latitude, longitude, and altitude | |
""" | |
global _controller | |
if _controller: | |
return _controller.get_current_location() | |
return {"error": "Not connected to drone"} | |
def get_battery() -> Dict[str, float]: | |
""" | |
Get the current battery status. | |
Returns: | |
Dict containing battery voltage and remaining percentage | |
""" | |
global _controller | |
if _controller: | |
return _controller.get_battery_status() | |
return {"error": "Not connected to drone"} | |
def execute_mission_plan(waypoints: List[Dict[str, float]]) -> bool: | |
""" | |
Upload and execute a mission with multiple waypoints. | |
Args: | |
waypoints: List of dictionaries with lat, lon, alt for each waypoint | |
Returns: | |
bool: True if mission started successfully, False otherwise | |
""" | |
global _controller | |
if _controller: | |
if _controller.upload_mission(waypoints): | |
return _controller.execute_mission() | |
return False |