Spaces:
Running
Running
File size: 6,472 Bytes
bd61f34 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
#!/usr/bin/env python3
"""
Test the ability of the DroneAssistant to plan and execute missions
based on natural language requests.
This script simulates a user asking the agent to create a mission plan and execute it.
"""
import os
import time
import json
import sys
from drone.drone_chat import DroneAssistant, generate_mission_plan
from drone.drone_control import connect_drone, disconnect_drone
from drone.hf_model import HfApiModel, Message
from drone import compatibility_fix # Import for Python 3.10+ compatibility
# Import the simplified model that works with smolagents
from tests.test_simple_agent import SimplePlaceholderModel, final_answer
# Test cases - different natural language requests for missions
TEST_CASES = [
"Create a mission plan for a survey mission that takes 20 minutes and execute it on the simulator.",
"I need to inspect a building. Can you make a plan for a 15-minute inspection mission and fly it?",
"Plan a delivery mission to these coordinates and execute it: 37.7749, -122.4194. It should take about 10 minutes.",
"Plan and execute a simple square pattern flight around my current position.",
]
def setup_drone_agent():
"""Create and initialize a DroneAssistant for testing."""
print("Setting up drone agent...")
# Check if HF_TOKEN is set in environment variables
hf_token = os.environ.get("HF_TOKEN", "")
if not hf_token:
print("WARNING: No Hugging Face API token found. Using a placeholder model.")
# Use our simplified placeholder model instead of the previous complex one
model = SimplePlaceholderModel()
else:
# Use the real model if API token is available
model = HfApiModel(
max_tokens=2096,
temperature=0.7,
model_id='Qwen/Qwen2.5-Coder-32B-Instruct'
)
# Create the drone assistant with the required tools
drone_agent = DroneAssistant(
tools=[generate_mission_plan],
model=model
)
return drone_agent
def execute_mission_from_plan(mission_plan):
"""Execute a mission based on the provided plan using DroneKit."""
print("\nExecuting mission from plan...")
# Parse the mission plan
try:
if isinstance(mission_plan, str):
plan = eval(mission_plan) # Convert string representation to dict
else:
plan = mission_plan
print(f"Mission type: {plan.get('mission_type')}")
print(f"Duration: {plan.get('duration_minutes')} minutes")
print(f"Flight pattern: {plan.get('flight_pattern')}")
print(f"Recommended altitude: {plan.get('recommended_altitude')}")
# Connect to the simulator
print("\nConnecting to simulator...")
connected = connect_drone('udp:127.0.0.1:14550')
if not connected:
print("Failed to connect to the simulator. Make sure it's running.")
return False
print("Connected to simulator!")
print("Simulating mission execution...")
# Here we would normally execute the actual mission
# For testing purposes, we'll just simulate the execution
for i in range(5):
print(f"Mission progress: {i*20}%")
time.sleep(1)
print("Mission completed successfully!")
disconnect_drone()
return True
except Exception as e:
print(f"Error executing mission: {str(e)}")
return False
def run_test_case(drone_agent, test_case):
"""Run a single test case with the drone agent."""
print(f"\n----- TESTING CASE: '{test_case}' -----")
# Simulate a user asking the agent
print(f"\nUSER: {test_case}")
# Get the agent's response
response = drone_agent.chat(test_case)
print(f"\nAGENT: {response}")
# Check if the response contains or implies a mission plan
mission_keywords = ["mission plan", "flight plan", "waypoints", "coordinates"]
has_mission_plan = any(keyword in response.lower() for keyword in mission_keywords)
if not has_mission_plan:
print("FAIL: Agent did not create a mission plan.")
return False
# Extract the mission plan and execute it
# For a real implementation, we would need to parse the response more carefully
# Here we'll just call the generate_mission_plan function directly
if "survey" in test_case.lower():
mission_type = "survey"
elif "inspect" in test_case.lower():
mission_type = "inspection"
elif "delivery" in test_case.lower():
mission_type = "delivery"
else:
mission_type = "custom"
if "minute" in test_case.lower():
# Try to extract the duration
import re
duration_match = re.search(r'(\d+)\s*minute', test_case)
duration = int(duration_match.group(1)) if duration_match else 15
else:
duration = 15
print(f"\nGenerating mission plan for {mission_type} mission with duration {duration} minutes...")
mission_plan = generate_mission_plan(mission_type=mission_type, duration_minutes=duration)
print(f"\nGenerated plan: {mission_plan}")
# Execute the mission plan
result = execute_mission_from_plan(mission_plan)
if result:
print("PASS: Successfully executed the mission plan.")
return True
else:
print("FAIL: Could not execute the mission plan.")
return False
def run_all_tests():
"""Run all test cases and report results."""
drone_agent = setup_drone_agent()
results = []
for test_case in TEST_CASES:
result = run_test_case(drone_agent, test_case)
results.append(result)
# Print summary
print("\n----- TEST SUMMARY -----")
for i, (test, result) in enumerate(zip(TEST_CASES, results)):
status = "PASS" if result else "FAIL"
print(f"Test {i+1}: {status} - '{test[:40]}...'")
success_rate = sum(results) / len(results) * 100
print(f"\nOverall Success Rate: {success_rate:.1f}%")
return all(results)
if __name__ == "__main__":
print("Testing DroneAssistant's ability to plan and execute missions...")
success = run_all_tests()
if success:
print("\nAll tests passed! The agent can successfully plan and execute missions.")
exit(0)
else:
print("\nSome tests failed. The agent needs improvement.")
exit(1) |