Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Test the ability of the DroneAssistant to plan and execute missions | |
based on natural language requests. | |
This script simulates a user asking the agent to create a mission plan and execute it. | |
""" | |
import os | |
import time | |
import json | |
import sys | |
from drone.drone_chat import DroneAssistant, generate_mission_plan | |
from drone.drone_control import connect_drone, disconnect_drone | |
from drone.hf_model import HfApiModel, Message | |
from drone import compatibility_fix # Import for Python 3.10+ compatibility | |
# Import the simplified model that works with smolagents | |
from tests.test_simple_agent import SimplePlaceholderModel, final_answer | |
# Test cases - different natural language requests for missions | |
TEST_CASES = [ | |
"Create a mission plan for a survey mission that takes 20 minutes and execute it on the simulator.", | |
"I need to inspect a building. Can you make a plan for a 15-minute inspection mission and fly it?", | |
"Plan a delivery mission to these coordinates and execute it: 37.7749, -122.4194. It should take about 10 minutes.", | |
"Plan and execute a simple square pattern flight around my current position.", | |
] | |
def setup_drone_agent(): | |
"""Create and initialize a DroneAssistant for testing.""" | |
print("Setting up drone agent...") | |
# Check if HF_TOKEN is set in environment variables | |
hf_token = os.environ.get("HF_TOKEN", "") | |
if not hf_token: | |
print("WARNING: No Hugging Face API token found. Using a placeholder model.") | |
# Use our simplified placeholder model instead of the previous complex one | |
model = SimplePlaceholderModel() | |
else: | |
# Use the real model if API token is available | |
model = HfApiModel( | |
max_tokens=2096, | |
temperature=0.7, | |
model_id='Qwen/Qwen2.5-Coder-32B-Instruct' | |
) | |
# Create the drone assistant with the required tools | |
drone_agent = DroneAssistant( | |
tools=[generate_mission_plan], | |
model=model | |
) | |
return drone_agent | |
def execute_mission_from_plan(mission_plan): | |
"""Execute a mission based on the provided plan using DroneKit.""" | |
print("\nExecuting mission from plan...") | |
# Parse the mission plan | |
try: | |
if isinstance(mission_plan, str): | |
plan = eval(mission_plan) # Convert string representation to dict | |
else: | |
plan = mission_plan | |
print(f"Mission type: {plan.get('mission_type')}") | |
print(f"Duration: {plan.get('duration_minutes')} minutes") | |
print(f"Flight pattern: {plan.get('flight_pattern')}") | |
print(f"Recommended altitude: {plan.get('recommended_altitude')}") | |
# Connect to the simulator | |
print("\nConnecting to simulator...") | |
connected = connect_drone('udp:127.0.0.1:14550') | |
if not connected: | |
print("Failed to connect to the simulator. Make sure it's running.") | |
return False | |
print("Connected to simulator!") | |
print("Simulating mission execution...") | |
# Here we would normally execute the actual mission | |
# For testing purposes, we'll just simulate the execution | |
for i in range(5): | |
print(f"Mission progress: {i*20}%") | |
time.sleep(1) | |
print("Mission completed successfully!") | |
disconnect_drone() | |
return True | |
except Exception as e: | |
print(f"Error executing mission: {str(e)}") | |
return False | |
def run_test_case(drone_agent, test_case): | |
"""Run a single test case with the drone agent.""" | |
print(f"\n----- TESTING CASE: '{test_case}' -----") | |
# Simulate a user asking the agent | |
print(f"\nUSER: {test_case}") | |
# Get the agent's response | |
response = drone_agent.chat(test_case) | |
print(f"\nAGENT: {response}") | |
# Check if the response contains or implies a mission plan | |
mission_keywords = ["mission plan", "flight plan", "waypoints", "coordinates"] | |
has_mission_plan = any(keyword in response.lower() for keyword in mission_keywords) | |
if not has_mission_plan: | |
print("FAIL: Agent did not create a mission plan.") | |
return False | |
# Extract the mission plan and execute it | |
# For a real implementation, we would need to parse the response more carefully | |
# Here we'll just call the generate_mission_plan function directly | |
if "survey" in test_case.lower(): | |
mission_type = "survey" | |
elif "inspect" in test_case.lower(): | |
mission_type = "inspection" | |
elif "delivery" in test_case.lower(): | |
mission_type = "delivery" | |
else: | |
mission_type = "custom" | |
if "minute" in test_case.lower(): | |
# Try to extract the duration | |
import re | |
duration_match = re.search(r'(\d+)\s*minute', test_case) | |
duration = int(duration_match.group(1)) if duration_match else 15 | |
else: | |
duration = 15 | |
print(f"\nGenerating mission plan for {mission_type} mission with duration {duration} minutes...") | |
mission_plan = generate_mission_plan(mission_type=mission_type, duration_minutes=duration) | |
print(f"\nGenerated plan: {mission_plan}") | |
# Execute the mission plan | |
result = execute_mission_from_plan(mission_plan) | |
if result: | |
print("PASS: Successfully executed the mission plan.") | |
return True | |
else: | |
print("FAIL: Could not execute the mission plan.") | |
return False | |
def run_all_tests(): | |
"""Run all test cases and report results.""" | |
drone_agent = setup_drone_agent() | |
results = [] | |
for test_case in TEST_CASES: | |
result = run_test_case(drone_agent, test_case) | |
results.append(result) | |
# Print summary | |
print("\n----- TEST SUMMARY -----") | |
for i, (test, result) in enumerate(zip(TEST_CASES, results)): | |
status = "PASS" if result else "FAIL" | |
print(f"Test {i+1}: {status} - '{test[:40]}...'") | |
success_rate = sum(results) / len(results) * 100 | |
print(f"\nOverall Success Rate: {success_rate:.1f}%") | |
return all(results) | |
if __name__ == "__main__": | |
print("Testing DroneAssistant's ability to plan and execute missions...") | |
success = run_all_tests() | |
if success: | |
print("\nAll tests passed! The agent can successfully plan and execute missions.") | |
exit(0) | |
else: | |
print("\nSome tests failed. The agent needs improvement.") | |
exit(1) |