""" Drone tools manager for terminal interface. Adapts the existing drone control functionality for the terminal application. """ import time import logging from typing import Dict, List, Optional, Any from .drone_control import DroneController # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DroneToolsManager: """Manages drone operations for the terminal interface.""" def __init__(self, connection_string: Optional[str] = None): self.controller = DroneController(connection_string) self.connected = False self.mission_in_progress = False # Status tracking self.status = "STANDBY" self.phase = "" self.log_entries = [] def connect_drone(self, connection_string: str, timeout: int = 30) -> bool: """Connect to a drone.""" try: self._update_status("CONNECTING", f"Connecting to {connection_string}") success = self.controller.connect_to_drone(connection_string, timeout) if success: self.connected = True self._update_status("CONNECTED", "Successfully connected to drone") # Get initial status location = self.get_location() battery = self.get_battery() logger.info(f"Connected to drone. Location: {location}, Battery: {battery}") return True else: self._update_status("ERROR", "Failed to connect to drone") return False except Exception as e: self._update_status("ERROR", f"Connection error: {str(e)}") logger.error(f"Connection error: {e}") return False def disconnect_drone(self): """Disconnect from the drone.""" try: self._update_status("DISCONNECTING", "Disconnecting from drone") self.controller.disconnect() self.connected = False self.mission_in_progress = False self._update_status("STANDBY", "Disconnected from drone") logger.info("Disconnected from drone") except Exception as e: logger.error(f"Disconnect error: {e}") def takeoff(self, altitude: float) -> bool: """Take off to specified altitude.""" if not self._ensure_connected(): return False try: self._update_status("TAKING OFF", f"Taking off to {altitude} meters") self.mission_in_progress = True success = self.controller.arm_and_takeoff(altitude) if success: self._update_status("AIRBORNE", f"Reached altitude of {altitude} meters") return True else: self._update_status("ERROR", "Takeoff failed") self.mission_in_progress = False return False except Exception as e: self._update_status("ERROR", f"Takeoff error: {str(e)}") self.mission_in_progress = False logger.error(f"Takeoff error: {e}") return False def land(self) -> bool: """Land the drone.""" if not self._ensure_connected(): return False try: self._update_status("LANDING", "Landing drone") success = self.controller.land() if success: self._update_status("LANDED", "Drone has landed") self.mission_in_progress = False return True else: self._update_status("ERROR", "Landing failed") return False except Exception as e: self._update_status("ERROR", f"Landing error: {str(e)}") logger.error(f"Landing error: {e}") return False def return_home(self) -> bool: """Return to launch/home location.""" if not self._ensure_connected(): return False try: self._update_status("RETURNING", "Returning to launch point") success = self.controller.return_to_launch() if success: self._update_status("RETURNING", "Drone is returning to launch point") return True else: self._update_status("ERROR", "Return to home failed") return False except Exception as e: self._update_status("ERROR", f"Return error: {str(e)}") logger.error(f"Return error: {e}") return False def fly_to(self, latitude: float, longitude: float, altitude: float) -> bool: """Fly to specific GPS coordinates.""" if not self._ensure_connected(): return False try: self._update_status( "NAVIGATING", f"Flying to lat:{latitude:.4f}, lon:{longitude:.4f}, alt:{altitude}m" ) success = self.controller.goto_location(latitude, longitude, altitude) if success: return True else: self._update_status("ERROR", "Navigation failed") return False except Exception as e: self._update_status("ERROR", f"Navigation error: {str(e)}") logger.error(f"Navigation error: {e}") return False def get_location(self) -> Dict[str, Any]: """Get current GPS location.""" if not self._ensure_connected(): return {"error": "Not connected to drone"} try: return self.controller.get_current_location() except Exception as e: logger.error(f"Location error: {e}") return {"error": str(e)} def get_battery(self) -> Dict[str, Any]: """Get battery status.""" if not self._ensure_connected(): return {"error": "Not connected to drone"} try: return self.controller.get_battery_status() except Exception as e: logger.error(f"Battery error: {e}") return {"error": str(e)} def execute_mission(self, waypoints: List[Dict[str, float]]) -> bool: """Execute a mission with multiple waypoints.""" if not self._ensure_connected(): return False if not waypoints: logger.error("No waypoints provided") return False try: self._update_status("MISSION", f"Starting mission with {len(waypoints)} waypoints") self.mission_in_progress = True # Upload mission upload_success = self.controller.upload_mission(waypoints) if not upload_success: self._update_status("ERROR", "Failed to upload mission") self.mission_in_progress = False return False # Execute mission execute_success = self.controller.execute_mission() if execute_success: self._update_status("EXECUTING", "Mission execution started") # Simulate mission progress updates for i, wp in enumerate(waypoints, 1): self._update_status( "EXECUTING", f"Waypoint {i}/{len(waypoints)}: {wp['lat']:.4f}, {wp['lon']:.4f}" ) time.sleep(1) # Brief pause between waypoints self._update_status("MISSION COMPLETE", "All waypoints reached") return True else: self._update_status("ERROR", "Failed to start mission execution") self.mission_in_progress = False return False except Exception as e: self._update_status("ERROR", f"Mission error: {str(e)}") self.mission_in_progress = False logger.error(f"Mission error: {e}") return False def get_status(self) -> Dict[str, Any]: """Get current drone and system status.""" status_info = { "connected": self.connected, "mission_in_progress": self.mission_in_progress, "status": self.status, "phase": self.phase, "log_entries": self.log_entries[-5:], # Last 5 entries } if self.connected: try: status_info["location"] = self.get_location() status_info["battery"] = self.get_battery() if hasattr(self.controller, 'vehicle') and self.controller.vehicle: status_info["mode"] = str(self.controller.vehicle.mode.name) status_info["armed"] = self.controller.vehicle.armed status_info["system_status"] = str(self.controller.vehicle.system_status.state) except Exception as e: status_info["telemetry_error"] = str(e) return status_info def is_connected(self) -> bool: """Check if connected to drone.""" return self.connected def emergency_stop(self): """Emergency stop - immediately land or RTL.""" if not self.connected: return try: self._update_status("EMERGENCY", "Emergency stop initiated") # Try RTL first, then land if not self.return_home(): self.land() self.mission_in_progress = False except Exception as e: logger.error(f"Emergency stop error: {e}") def set_airspeed(self, speed: float) -> bool: """Set target airspeed.""" if not self._ensure_connected(): return False try: return self.controller.set_airspeed(speed) except Exception as e: logger.error(f"Airspeed error: {e}") return False def get_telemetry(self) -> Dict[str, Any]: """Get comprehensive telemetry data.""" if not self._ensure_connected(): return {"error": "Not connected to drone"} try: telemetry = {} # Basic info telemetry["location"] = self.get_location() telemetry["battery"] = self.get_battery() # Vehicle-specific data if available if hasattr(self.controller, 'vehicle') and self.controller.vehicle: vehicle = self.controller.vehicle telemetry.update({ "mode": str(vehicle.mode.name), "armed": vehicle.armed, "system_status": str(vehicle.system_status.state), "airspeed": vehicle.airspeed, "groundspeed": vehicle.groundspeed, "heading": vehicle.heading, }) # GPS info if vehicle.gps_0: telemetry["gps"] = { "fix_type": vehicle.gps_0.fix_type, "satellites_visible": vehicle.gps_0.satellites_visible, "eph": vehicle.gps_0.eph, "epv": vehicle.gps_0.epv, } return telemetry except Exception as e: logger.error(f"Telemetry error: {e}") return {"error": str(e)} def _ensure_connected(self) -> bool: """Ensure drone is connected.""" if not self.connected: logger.error("Not connected to drone") return False return True def _update_status(self, status: str, phase: str = ""): """Update system status and log entry.""" self.status = status self.phase = phase # Create log entry timestamp = time.strftime("%H:%M:%S") log_entry = f"[{timestamp}] {status}: {phase}" if phase else f"[{timestamp}] {status}" self.log_entries.append(log_entry) # Keep only last 50 entries if len(self.log_entries) > 50: self.log_entries = self.log_entries[-50:] logger.info(log_entry)