Spaces:
Paused
Paused
""" | |
This provides an implementation of the iterative linear quadratic regulator (iLQR) algorithm for trajectory tracking. | |
It is specialized to the case with a discrete-time kinematic bicycle model and a quadratic trajectory tracking cost. | |
Original (Nonlinear) Discrete Time System: | |
z_k = [x_k, y_k, theta_k, v_k, delta_k] | |
u_k = [a_k, phi_k] | |
x_{k+1} = x_k + v_k * cos(theta_k) * dt | |
y_{k+1} = y_k + v_k * sin(theta_k) * dt | |
theta_{k+1} = theta_k + v_k * tan(delta_k) / L * dt | |
v_{k+1} = v_k + a_k * dt | |
delta_{k+1} = delta_k + phi_k * dt | |
where (x_k, y_k, theta_k) is the pose at timestep k with time discretization dt, | |
v_k and a_k are velocity and acceleration, | |
delta_k and phi_k are steering angle and steering angle rate, | |
and L is the vehicle wheelbase. | |
Quadratic Tracking Cost: | |
J = sum_{k=0}^{N-1} ||u_k||_2^{R_k} + | |
sum_{k=0}^N ||z_k - z_{ref,k}||_2^{Q_k} | |
For simplicity, we opt to use constant input cost matrices R_k = R and constant state cost matrices Q_k = Q. | |
There are multiple improvements that can be done for this implementation, but omitted for simplicity of the code. | |
Some of these include: | |
* Handle constraints directly in the optimization (e.g. log-barrier / penalty method with quadratic cost estimate). | |
* Line search in the input policy update (feedforward term) to determine a good gradient step size. | |
References Used: https://people.eecs.berkeley.edu/~pabbeel/cs287-fa19/slides/Lec5-LQR.pdf and | |
https://www.cs.cmu.edu/~rsalakhu/10703/Lectures/Lecture_trajectoryoptimization.pdf | |
""" | |
import time | |
from dataclasses import dataclass, fields | |
from typing import List, Optional, Tuple | |
import numpy as np | |
import numpy.typing as npt | |
# from nuplan.common.actor_state.vehicle_parameters import get_pacifica_parameters | |
# from nuplan.common.geometry.compute import principal_value | |
# from nuplan.planning.simulation.controller.tracker.tracker_utils import ( | |
# complete_kinematic_state_and_inputs_from_poses, | |
# compute_steering_angle_feedback, | |
# ) | |
from sim.ilqr.utils import principal_value, complete_kinematic_state_and_inputs_from_poses, compute_steering_angle_feedback | |
DoubleMatrix = npt.NDArray[np.float64] | |
class ILQRSolverParameters: | |
"""Parameters related to the solver implementation.""" | |
discretization_time: float # [s] Time discretization used for integration. | |
# Cost weights for state [x, y, heading, velocity, steering angle] and input variables [acceleration, steering rate]. | |
state_cost_diagonal_entries: List[float] | |
input_cost_diagonal_entries: List[float] | |
# Trust region cost weights for state and input variables. Helps keep linearization error per update step bounded. | |
state_trust_region_entries: List[float] | |
input_trust_region_entries: List[float] | |
# Parameters related to solver runtime / solution sub-optimality. | |
max_ilqr_iterations: int # Maximum number of iterations to run iLQR before timeout. | |
convergence_threshold: float # Threshold for delta inputs below which we can terminate iLQR early. | |
max_solve_time: Optional[ | |
float | |
] # [s] If defined, sets a maximum time to run a solve call of iLQR before terminating. | |
# Constraints for underlying dynamics model. | |
max_acceleration: float # [m/s^2] Absolute value threshold on acceleration input. | |
max_steering_angle: float # [rad] Absolute value threshold on steering angle state. | |
max_steering_angle_rate: float # [rad/s] Absolute value threshold on steering rate input. | |
# Parameters for dynamics / linearization. | |
min_velocity_linearization: float # [m/s] Absolute value threshold below which linearization velocity is modified. | |
wheelbase: float # [m] Wheelbase length parameter for the vehicle. | |
def __post_init__(self) -> None: | |
"""Ensure entries lie in expected bounds and initialize wheelbase.""" | |
for entry in [ | |
"discretization_time", | |
"max_ilqr_iterations", | |
"convergence_threshold", | |
"max_acceleration", | |
"max_steering_angle", | |
"max_steering_angle_rate", | |
"min_velocity_linearization", | |
"wheelbase", | |
]: | |
assert getattr(self, entry) > 0.0, f"Field {entry} should be positive." | |
assert self.max_steering_angle < np.pi / 2.0, "Max steering angle should be less than 90 degrees." | |
if isinstance(self.max_solve_time, float): | |
assert self.max_solve_time > 0.0, "The specified max solve time should be positive." | |
assert np.all([x >= 0 for x in self.state_cost_diagonal_entries]), "Q matrix must be positive semidefinite." | |
assert np.all([x > 0 for x in self.input_cost_diagonal_entries]), "R matrix must be positive definite." | |
assert np.all( | |
[x > 0 for x in self.state_trust_region_entries] | |
), "State trust region cost matrix must be positive definite." | |
assert np.all( | |
[x > 0 for x in self.input_trust_region_entries] | |
), "Input trust region cost matrix must be positive definite." | |
class ILQRWarmStartParameters: | |
"""Parameters related to generating a warm start trajectory for iLQR.""" | |
k_velocity_error_feedback: float # Gain for initial velocity error for warm start acceleration. | |
k_steering_angle_error_feedback: float # Gain for initial steering angle error for warm start steering rate. | |
lookahead_distance_lateral_error: float # [m] Distance ahead for which we estimate lateral error. | |
k_lateral_error: float # Gain for lateral error to compute steering angle feedback. | |
jerk_penalty_warm_start_fit: float # Penalty for jerk in velocity profile estimation. | |
curvature_rate_penalty_warm_start_fit: float # Penalty for curvature rate in curvature profile estimation. | |
def __post_init__(self) -> None: | |
"""Ensure entries lie in expected bounds.""" | |
for entry in [ | |
"k_velocity_error_feedback", | |
"k_steering_angle_error_feedback", | |
"lookahead_distance_lateral_error", | |
"k_lateral_error", | |
"jerk_penalty_warm_start_fit", | |
"curvature_rate_penalty_warm_start_fit", | |
]: | |
assert getattr(self, entry) > 0.0, f"Field {entry} should be positive." | |
class ILQRIterate: | |
"""Contains state, input, and associated Jacobian trajectories needed to perform an update step of iLQR.""" | |
state_trajectory: DoubleMatrix | |
input_trajectory: DoubleMatrix | |
state_jacobian_trajectory: DoubleMatrix | |
input_jacobian_trajectory: DoubleMatrix | |
def __post_init__(self) -> None: | |
"""Check consistency of dimension across trajectory elements.""" | |
assert len(self.state_trajectory.shape) == 2, "Expect state trajectory to be a 2D matrix." | |
state_trajectory_length, state_dim = self.state_trajectory.shape | |
assert len(self.input_trajectory.shape) == 2, "Expect input trajectory to be a 2D matrix." | |
input_trajectory_length, input_dim = self.input_trajectory.shape | |
assert ( | |
input_trajectory_length == state_trajectory_length - 1 | |
), "State trajectory should be 1 longer than the input trajectory." | |
assert self.state_jacobian_trajectory.shape == (input_trajectory_length, state_dim, state_dim) | |
assert self.input_jacobian_trajectory.shape == (input_trajectory_length, state_dim, input_dim) | |
for field in fields(self): | |
# Make sure that we have no nan entries in our trajectory rollout prior to operating on this. | |
assert ~np.any(np.isnan(getattr(self, field.name))), f"{field.name} has unexpected nan values." | |
class ILQRInputPolicy: | |
"""Contains parameters for the perturbation input policy computed after performing LQR.""" | |
state_feedback_matrices: DoubleMatrix | |
feedforward_inputs: DoubleMatrix | |
def __post__init__(self) -> None: | |
"""Check shape of policy parameters.""" | |
assert ( | |
len(self.state_feedback_matrices.shape) == 3 | |
), "Expected state_feedback_matrices to have shape (n_horizon, n_inputs, n_states)" | |
assert ( | |
len(self.feedforward_inputs.shape) == 2 | |
), "Expected feedforward inputs to have shape (n_horizon, n_inputs)." | |
assert ( | |
self.feedforward_inputs.shape == self.state_feedback_matrices.shape[:2] | |
), "Inconsistent horizon or input dimension between feedforward inputs and state feedback matrices." | |
for field in fields(self): | |
# Make sure that we have no nan entries in our policy parameters prior to using them. | |
assert ~np.any(np.isnan(getattr(self, field.name))), f"{field.name} has unexpected nan values." | |
class ILQRSolution: | |
"""Contains the iLQR solution with associated cost for consumption by the solver's client.""" | |
state_trajectory: DoubleMatrix | |
input_trajectory: DoubleMatrix | |
tracking_cost: float | |
def __post_init__(self) -> None: | |
"""Check consistency of dimension across trajectory elements and nonnegative cost.""" | |
assert len(self.state_trajectory.shape) == 2, "Expect state trajectory to be a 2D matrix." | |
state_trajectory_length, _ = self.state_trajectory.shape | |
assert len(self.input_trajectory.shape) == 2, "Expect input trajectory to be a 2D matrix." | |
input_trajectory_length, _ = self.input_trajectory.shape | |
assert ( | |
input_trajectory_length == state_trajectory_length - 1 | |
), "State trajectory should be 1 longer than the input trajectory." | |
assert self.tracking_cost >= 0.0, "Expect the tracking cost to be nonnegative." | |
class ILQRSolver: | |
"""iLQR solver implementation, see module docstring for details.""" | |
def __init__( | |
self, | |
solver_params: ILQRSolverParameters, | |
warm_start_params: ILQRWarmStartParameters, | |
) -> None: | |
""" | |
Initialize solver parameters. | |
:param solver_params: Contains solver parameters for iLQR. | |
:param warm_start_params: Contains warm start parameters for iLQR. | |
""" | |
self._solver_params = solver_params | |
self._warm_start_params = warm_start_params | |
self._n_states = 5 # state dimension | |
self._n_inputs = 2 # input dimension | |
state_cost_diagonal_entries = self._solver_params.state_cost_diagonal_entries | |
assert ( | |
len(state_cost_diagonal_entries) == self._n_states | |
), f"State cost matrix should have diagonal length {self._n_states}." | |
self._state_cost_matrix: DoubleMatrix = np.diag(state_cost_diagonal_entries) | |
input_cost_diagonal_entries = self._solver_params.input_cost_diagonal_entries | |
assert ( | |
len(input_cost_diagonal_entries) == self._n_inputs | |
), f"Input cost matrix should have diagonal length {self._n_inputs}." | |
self._input_cost_matrix: DoubleMatrix = np.diag(input_cost_diagonal_entries) | |
state_trust_region_entries = self._solver_params.state_trust_region_entries | |
assert ( | |
len(state_trust_region_entries) == self._n_states | |
), f"State trust region cost matrix should have diagonal length {self._n_states}." | |
self._state_trust_region_cost_matrix: DoubleMatrix = np.diag(state_trust_region_entries) | |
input_trust_region_entries = self._solver_params.input_trust_region_entries | |
assert ( | |
len(input_trust_region_entries) == self._n_inputs | |
), f"Input trust region cost matrix should have diagonal length {self._n_inputs}." | |
self._input_trust_region_cost_matrix: DoubleMatrix = np.diag(input_trust_region_entries) | |
max_acceleration = self._solver_params.max_acceleration | |
max_steering_angle_rate = self._solver_params.max_steering_angle_rate | |
# Define input clip limits once to avoid recomputation in _clip_inputs. | |
self._input_clip_min = (-max_acceleration, -max_steering_angle_rate) | |
self._input_clip_max = (max_acceleration, max_steering_angle_rate) | |
def solve(self, current_state: DoubleMatrix, reference_trajectory: DoubleMatrix) -> List[ILQRSolution]: | |
""" | |
Run the main iLQR loop used to try to find (locally) optimal inputs to track the reference trajectory. | |
:param current_state: The initial state from which we apply inputs, z_0. | |
:param reference_trajectory: The state reference we'd like to track, inclusive of the initial timestep, | |
z_{r,k} for k in {0, ..., N}. | |
:return: A list of solution iterates after running the iLQR algorithm where the index is the iteration number. | |
""" | |
# Check that state parameter has the right shape. | |
assert current_state.shape == (self._n_states,), "Incorrect state shape." | |
# Check that reference trajectory parameter has the right shape. | |
assert len(reference_trajectory.shape) == 2, "Reference trajectory should be a 2D matrix." | |
reference_trajectory_length, reference_trajectory_state_dimension = reference_trajectory.shape | |
assert reference_trajectory_length > 1, "The reference trajectory should be at least two timesteps long." | |
assert ( | |
reference_trajectory_state_dimension == self._n_states | |
), "The reference trajectory should have a matching state dimension." | |
# List of ILQRSolution results where the index corresponds to the iteration of iLQR. | |
solution_list: List[ILQRSolution] = [] | |
# Get warm start input and state trajectory, as well as associated Jacobians. | |
current_iterate = self._input_warm_start(current_state, reference_trajectory) | |
# Main iLQR Loop. | |
solve_start_time = time.perf_counter() | |
for _ in range(self._solver_params.max_ilqr_iterations): | |
# Determine the cost and store the associated solution object. | |
tracking_cost = self._compute_tracking_cost( | |
iterate=current_iterate, | |
reference_trajectory=reference_trajectory, | |
) | |
solution_list.append( | |
ILQRSolution( | |
input_trajectory=current_iterate.input_trajectory, | |
state_trajectory=current_iterate.state_trajectory, | |
tracking_cost=tracking_cost, | |
) | |
) | |
# Determine the LQR optimal perturbations to apply. | |
lqr_input_policy = self._run_lqr_backward_recursion( | |
current_iterate=current_iterate, | |
reference_trajectory=reference_trajectory, | |
) | |
# Apply the optimal perturbations to generate the next input trajectory iterate. | |
input_trajectory_next = self._update_inputs_with_policy( | |
current_iterate=current_iterate, | |
lqr_input_policy=lqr_input_policy, | |
) | |
# Check for convergence/timeout and terminate early if so. | |
# Else update the input_trajectory iterate and continue. | |
input_trajectory_norm_difference = np.linalg.norm(input_trajectory_next - current_iterate.input_trajectory) | |
current_iterate = self._run_forward_dynamics(current_state, input_trajectory_next) | |
if input_trajectory_norm_difference < self._solver_params.convergence_threshold: | |
break | |
elapsed_time = time.perf_counter() - solve_start_time | |
if ( | |
isinstance(self._solver_params.max_solve_time, float) | |
and elapsed_time >= self._solver_params.max_solve_time | |
): | |
break | |
# Store the final iterate in the solution_dict. | |
tracking_cost = self._compute_tracking_cost( | |
iterate=current_iterate, | |
reference_trajectory=reference_trajectory, | |
) | |
solution_list.append( | |
ILQRSolution( | |
input_trajectory=current_iterate.input_trajectory, | |
state_trajectory=current_iterate.state_trajectory, | |
tracking_cost=tracking_cost, | |
) | |
) | |
return solution_list | |
#################################################################################################################### | |
# Helper methods. | |
#################################################################################################################### | |
def _compute_tracking_cost(self, iterate: ILQRIterate, reference_trajectory: DoubleMatrix) -> float: | |
""" | |
Compute the trajectory tracking cost given a candidate solution. | |
:param iterate: Contains the candidate state and input trajectory to evaluate. | |
:param reference_trajectory: The desired state reference trajectory with same length as state_trajectory. | |
:return: The tracking cost of the candidate state/input trajectory. | |
""" | |
input_trajectory = iterate.input_trajectory | |
state_trajectory = iterate.state_trajectory | |
assert len(state_trajectory) == len( | |
reference_trajectory | |
), "The state and reference trajectory should have the same length." | |
error_state_trajectory = state_trajectory - reference_trajectory | |
error_state_trajectory[:, 2] = principal_value(error_state_trajectory[:, 2]) | |
cost = np.sum([u.T @ self._input_cost_matrix @ u for u in input_trajectory]) + np.sum( | |
[e.T @ self._state_cost_matrix @ e for e in error_state_trajectory] | |
) | |
return float(cost) | |
def _clip_inputs(self, inputs: DoubleMatrix) -> DoubleMatrix: | |
""" | |
Used to clip control inputs within constraints. | |
:param: inputs: The control inputs with shape (self._n_inputs,) to clip. | |
:return: Clipped version of the control inputs, unmodified if already within constraints. | |
""" | |
assert inputs.shape == (self._n_inputs,), f"The inputs should be a 1D vector with {self._n_inputs} elements." | |
return np.clip(inputs, self._input_clip_min, self._input_clip_max) # type: ignore | |
def _clip_steering_angle(self, steering_angle: float) -> float: | |
""" | |
Used to clip the steering angle state within bounds. | |
:param steering_angle: [rad] A steering angle (scalar) to clip. | |
:return: [rad] The clipped steering angle. | |
""" | |
steering_angle_sign = 1.0 if steering_angle >= 0 else -1.0 | |
steering_angle = steering_angle_sign * min(abs(steering_angle), self._solver_params.max_steering_angle) | |
return steering_angle | |
def _input_warm_start(self, current_state: DoubleMatrix, reference_trajectory: DoubleMatrix) -> ILQRIterate: | |
""" | |
Given a reference trajectory, we generate the warm start (initial guess) by inferring the inputs applied based | |
on poses in the reference trajectory. | |
:param current_state: The initial state from which we apply inputs. | |
:param reference_trajectory: The reference trajectory we are trying to follow. | |
:return: The warm start iterate from which to start iLQR. | |
""" | |
reference_states_completed, reference_inputs_completed = complete_kinematic_state_and_inputs_from_poses( | |
discretization_time=self._solver_params.discretization_time, | |
wheel_base=self._solver_params.wheelbase, | |
poses=reference_trajectory[:, :3], | |
jerk_penalty=self._warm_start_params.jerk_penalty_warm_start_fit, | |
curvature_rate_penalty=self._warm_start_params.curvature_rate_penalty_warm_start_fit, | |
) | |
# We could just stop here and apply reference_inputs_completed (assuming it satisfies constraints). | |
# This could work if current_state = reference_states_completed[0,:] - i.e. no initial tracking error. | |
# We add feedback input terms for the first control input only to account for nonzero initial tracking error. | |
_, _, _, velocity_current, steering_angle_current = current_state | |
_, _, _, velocity_reference, steering_angle_reference = reference_states_completed[0, :] | |
acceleration_feedback = -self._warm_start_params.k_velocity_error_feedback * ( | |
velocity_current - velocity_reference | |
) | |
steering_angle_feedback = compute_steering_angle_feedback( | |
pose_reference=current_state[:3], | |
pose_current=reference_states_completed[0, :3], | |
lookahead_distance=self._warm_start_params.lookahead_distance_lateral_error, | |
k_lateral_error=self._warm_start_params.k_lateral_error, | |
) | |
steering_angle_desired = steering_angle_feedback + steering_angle_reference | |
steering_rate_feedback = -self._warm_start_params.k_steering_angle_error_feedback * ( | |
steering_angle_current - steering_angle_desired | |
) | |
reference_inputs_completed[0, 0] += acceleration_feedback | |
reference_inputs_completed[0, 1] += steering_rate_feedback | |
# We rerun dynamics with constraints applied to make sure we have a feasible warm start for iLQR. | |
return self._run_forward_dynamics(current_state, reference_inputs_completed) | |
#################################################################################################################### | |
# Dynamics and Jacobian. | |
#################################################################################################################### | |
def _run_forward_dynamics(self, current_state: DoubleMatrix, input_trajectory: DoubleMatrix) -> ILQRIterate: | |
""" | |
Compute states and corresponding state/input Jacobian matrices using forward dynamics. | |
We additionally return the input since the dynamics may modify the input to ensure constraint satisfaction. | |
:param current_state: The initial state from which we apply inputs. Must be feasible given constraints. | |
:param input_trajectory: The input trajectory applied to the model. May be modified to ensure feasibility. | |
:return: A feasible iterate after applying dynamics with state/input trajectories and Jacobian matrices. | |
""" | |
# Store rollout as a set of numpy arrays, initialized as np.nan to ensure we correctly fill them in. | |
# The state trajectory includes the current_state, z_0, and is 1 element longer than the other arrays. | |
# The final_input_trajectory captures the applied input for the dynamics model satisfying constraints. | |
N = len(input_trajectory) | |
state_trajectory = np.nan * np.ones((N + 1, self._n_states), dtype=np.float64) | |
final_input_trajectory = np.nan * np.ones_like(input_trajectory, dtype=np.float64) | |
state_jacobian_trajectory = np.nan * np.ones((N, self._n_states, self._n_states), dtype=np.float64) | |
final_input_jacobian_trajectory = np.nan * np.ones((N, self._n_states, self._n_inputs), dtype=np.float64) | |
state_trajectory[0] = current_state | |
for idx_u, u in enumerate(input_trajectory): | |
state_next, final_input, state_jacobian, final_input_jacobian = self._dynamics_and_jacobian( | |
state_trajectory[idx_u], u | |
) | |
state_trajectory[idx_u + 1] = state_next | |
final_input_trajectory[idx_u] = final_input | |
state_jacobian_trajectory[idx_u] = state_jacobian | |
final_input_jacobian_trajectory[idx_u] = final_input_jacobian | |
iterate = ILQRIterate( | |
state_trajectory=state_trajectory, # type: ignore | |
input_trajectory=final_input_trajectory, # type: ignore | |
state_jacobian_trajectory=state_jacobian_trajectory, # type: ignore | |
input_jacobian_trajectory=final_input_jacobian_trajectory, # type: ignore | |
) | |
return iterate | |
def _dynamics_and_jacobian( | |
self, current_state: DoubleMatrix, current_input: DoubleMatrix | |
) -> Tuple[DoubleMatrix, DoubleMatrix, DoubleMatrix, DoubleMatrix]: | |
""" | |
Propagates the state forward by one step and computes the corresponding state and input Jacobian matrices. | |
We also impose all constraints here to ensure the current input and next state are always feasible. | |
:param current_state: The current state z_k. | |
:param current_input: The applied input u_k. | |
:return: The next state z_{k+1}, (possibly modified) input u_k, and state (df/dz) and input (df/du) Jacobians. | |
""" | |
x, y, heading, velocity, steering_angle = current_state | |
# Check steering angle is in expected range for valid Jacobian matrices. | |
assert ( | |
np.abs(steering_angle) < np.pi / 2.0 | |
), f"The steering angle {steering_angle} is outside expected limits. There is a singularity at delta = np.pi/2." | |
# Input constraints: clip inputs within bounds and then use. | |
current_input = self._clip_inputs(current_input) | |
acceleration, steering_rate = current_input | |
# Euler integration of bicycle model. | |
discretization_time = self._solver_params.discretization_time | |
wheelbase = self._solver_params.wheelbase | |
next_state: DoubleMatrix = np.copy(current_state) | |
next_state[0] += velocity * np.cos(heading) * discretization_time | |
next_state[1] += velocity * np.sin(heading) * discretization_time | |
next_state[2] += velocity * np.tan(steering_angle) / wheelbase * discretization_time | |
next_state[3] += acceleration * discretization_time | |
next_state[4] += steering_rate * discretization_time | |
# Constrain heading angle to lie within +/- pi. | |
next_state[2] = principal_value(next_state[2]) | |
# State constraints: clip the steering_angle within bounds and update steering_rate accordingly. | |
next_steering_angle = self._clip_steering_angle(next_state[4]) | |
applied_steering_rate = (next_steering_angle - steering_angle) / discretization_time | |
next_state[4] = next_steering_angle | |
current_input[1] = applied_steering_rate | |
# Now we construct and populate the state and input Jacobians. | |
state_jacobian: DoubleMatrix = np.eye(self._n_states, dtype=np.float64) | |
input_jacobian: DoubleMatrix = np.zeros((self._n_states, self._n_inputs), dtype=np.float64) | |
# Set a nonzero velocity to handle issues when linearizing at (near) zero velocity. | |
# This helps e.g. when the vehicle is stopped with zero steering angle and needs to accelerate/turn. | |
# Without this, the A matrix will indicate steering has no impact on heading due to Euler discretization. | |
# There will be a rank drop in the controllability matrix, so the discrete-time algebraic Riccati equation | |
# may not have a solution (uncontrollable subspace) or it may not be unique. | |
min_velocity_linearization = self._solver_params.min_velocity_linearization | |
if -min_velocity_linearization <= velocity and velocity <= min_velocity_linearization: | |
sign_velocity = 1.0 if velocity >= 0.0 else -1.0 | |
velocity = sign_velocity * min_velocity_linearization | |
state_jacobian[0, 2] = -velocity * np.sin(heading) * discretization_time | |
state_jacobian[0, 3] = np.cos(heading) * discretization_time | |
state_jacobian[1, 2] = velocity * np.cos(heading) * discretization_time | |
state_jacobian[1, 3] = np.sin(heading) * discretization_time | |
state_jacobian[2, 3] = np.tan(steering_angle) / wheelbase * discretization_time | |
state_jacobian[2, 4] = velocity * discretization_time / (wheelbase * np.cos(steering_angle) ** 2) | |
input_jacobian[3, 0] = discretization_time | |
input_jacobian[4, 1] = discretization_time | |
return next_state, current_input, state_jacobian, input_jacobian | |
#################################################################################################################### | |
# Core LQR implementation. | |
#################################################################################################################### | |
def _run_lqr_backward_recursion( | |
self, | |
current_iterate: ILQRIterate, | |
reference_trajectory: DoubleMatrix, | |
) -> ILQRInputPolicy: | |
""" | |
Computes the locally optimal affine state feedback policy by applying dynamic programming to linear perturbation | |
dynamics about a specified linearization trajectory. We include a trust region penalty as part of the cost. | |
:param current_iterate: Contains all relevant linearization information needed to compute LQR policy. | |
:param reference_trajectory: The desired state trajectory we are tracking. | |
:return: An affine state feedback policy - state feedback matrices and feedforward inputs found using LQR. | |
""" | |
state_trajectory = current_iterate.state_trajectory | |
input_trajectory = current_iterate.input_trajectory | |
state_jacobian_trajectory = current_iterate.state_jacobian_trajectory | |
input_jacobian_trajectory = current_iterate.input_jacobian_trajectory | |
# Check reference matches the expected shape. | |
assert reference_trajectory.shape == state_trajectory.shape, "The reference trajectory has incorrect shape." | |
# Compute nominal error trajectory. | |
error_state_trajectory = state_trajectory - reference_trajectory | |
error_state_trajectory[:, 2] = principal_value(error_state_trajectory[:, 2]) | |
# The value function has the form V_k(\Delta z_k) = \Delta z_k^T P_k \Delta z_k + 2 \rho_k^T \Delta z_k. | |
# So p_current = P_k is related to the Hessian of the value function at the current timestep. | |
# And rho_current = rho_k is part of the linear cost term in the value function at the current timestep. | |
p_current = self._state_cost_matrix + self._state_trust_region_cost_matrix | |
rho_current = self._state_cost_matrix @ error_state_trajectory[-1] | |
# The optimal LQR policy has the form \Delta u_k^* = K_k \Delta z_k + \kappa_k | |
# We refer to K_k as state_feedback_matrix and \kappa_k as feedforward input in the code below. | |
N = len(input_trajectory) | |
state_feedback_matrices = np.nan * np.ones((N, self._n_inputs, self._n_states), dtype=np.float64) | |
feedforward_inputs = np.nan * np.ones((N, self._n_inputs), dtype=np.float64) | |
for i in reversed(range(N)): | |
A = state_jacobian_trajectory[i] | |
B = input_jacobian_trajectory[i] | |
u = input_trajectory[i] | |
error = error_state_trajectory[i] | |
# Compute the optimal input policy for this timestep. | |
inverse_matrix_term = np.linalg.inv( | |
self._input_cost_matrix + self._input_trust_region_cost_matrix + B.T @ p_current @ B | |
) # invertible since we checked input_cost / input_trust_region_cost are positive definite during creation. | |
state_feedback_matrix = -inverse_matrix_term @ B.T @ p_current @ A | |
feedforward_input = -inverse_matrix_term @ (self._input_cost_matrix @ u + B.T @ rho_current) | |
# Compute the optimal value function for this timestep. | |
a_closed_loop = A + B @ state_feedback_matrix | |
p_prior = ( | |
self._state_cost_matrix | |
+ self._state_trust_region_cost_matrix | |
+ state_feedback_matrix.T @ self._input_cost_matrix @ state_feedback_matrix | |
+ state_feedback_matrix.T @ self._input_trust_region_cost_matrix @ state_feedback_matrix | |
+ a_closed_loop.T @ p_current @ a_closed_loop | |
) | |
rho_prior = ( | |
self._state_cost_matrix @ error | |
+ state_feedback_matrix.T @ self._input_cost_matrix @ (feedforward_input + u) | |
+ state_feedback_matrix.T @ self._input_trust_region_cost_matrix @ feedforward_input | |
+ a_closed_loop.T @ p_current @ B @ feedforward_input | |
+ a_closed_loop.T @ rho_current | |
) | |
p_current = p_prior | |
rho_current = rho_prior | |
state_feedback_matrices[i] = state_feedback_matrix | |
feedforward_inputs[i] = feedforward_input | |
lqr_input_policy = ILQRInputPolicy( | |
state_feedback_matrices=state_feedback_matrices, # type: ignore | |
feedforward_inputs=feedforward_inputs, # type: ignore | |
) | |
return lqr_input_policy | |
def _update_inputs_with_policy( | |
self, | |
current_iterate: ILQRIterate, | |
lqr_input_policy: ILQRInputPolicy, | |
) -> DoubleMatrix: | |
""" | |
Used to update an iterate of iLQR by applying a perturbation input policy for local cost improvement. | |
:param current_iterate: Contains the state and input trajectory about which we linearized. | |
:param lqr_input_policy: Contains the LQR policy to apply. | |
:return: The next input trajectory found by applying the LQR policy. | |
""" | |
state_trajectory = current_iterate.state_trajectory | |
input_trajectory = current_iterate.input_trajectory | |
# Trajectory of state perturbations while applying feedback policy. | |
# Starts with zero as the initial states match exactly, only later states might vary. | |
delta_state_trajectory = np.nan * np.ones((len(input_trajectory) + 1, self._n_states), dtype=np.float64) | |
delta_state_trajectory[0] = [0.0] * self._n_states | |
# This is the updated input trajectory we will return after applying the input perturbations. | |
input_next_trajectory = np.nan * np.ones_like(input_trajectory, dtype=np.float64) | |
zip_object = zip( | |
input_trajectory, | |
state_trajectory[:-1], | |
state_trajectory[1:], | |
lqr_input_policy.state_feedback_matrices, | |
lqr_input_policy.feedforward_inputs, | |
) | |
for input_idx, (input_lin, state_lin, state_lin_next, state_feedback_matrix, feedforward_input) in enumerate( | |
zip_object | |
): | |
# Compute locally optimal input perturbation. | |
delta_state = delta_state_trajectory[input_idx] | |
delta_input = state_feedback_matrix @ delta_state + feedforward_input | |
# Apply state and input perturbation. | |
input_perturbed = input_lin + delta_input | |
state_perturbed = state_lin + delta_state | |
state_perturbed[2] = principal_value(state_perturbed[2]) | |
# Run dynamics with perturbed state/inputs to get next state. | |
# We get the actually applied input since it might have been clipped/modified to satisfy constraints. | |
state_perturbed_next, input_perturbed, _, _ = self._dynamics_and_jacobian(state_perturbed, input_perturbed) | |
# Compute next state perturbation given next state. | |
delta_state_next = state_perturbed_next - state_lin_next | |
delta_state_next[2] = principal_value(delta_state_next[2]) | |
delta_state_trajectory[input_idx + 1] = delta_state_next | |
input_next_trajectory[input_idx] = input_perturbed | |
assert ~np.any(np.isnan(input_next_trajectory)), "All next inputs should be valid float values." | |
return input_next_trajectory # type: ignore |