from typing import Tuple import numpy as np import numpy.typing as npt DoubleMatrix = npt.NDArray[np.float64] def principal_value(angle, min_=-np.pi): """ Wrap heading angle in to specified domain (multiples of 2 pi alias), ensuring that the angle is between min_ and min_ + 2 pi. This function raises an error if the angle is infinite :param angle: rad :param min_: minimum domain for angle (rad) :return angle wrapped to [min_, min_ + 2 pi). """ assert np.all(np.isfinite(angle)), "angle is not finite" lhs = (angle - min_) % (2 * np.pi) + min_ return lhs def compute_steering_angle_feedback( pose_reference, pose_current, lookahead_distance, k_lateral_error ): """ Given pose information, determines the steering angle feedback value to address initial tracking error. This is based on the feedback controller developed in Section 2.2 of the following paper: https://ddl.stanford.edu/publications/design-feedback-feedforward-steering-controller-accurate-path-tracking-and-stability :param pose_reference: Contains the reference pose at the current timestep. :param pose_current: Contains the actual pose at the current timestep. :param lookahead_distance: [m] Distance ahead for which we should estimate lateral error based on a linear fit. :param k_lateral_error: Feedback gain for lateral error used to determine steering angle feedback. :return: [rad] The steering angle feedback to apply. """ assert pose_reference.shape == (3,), "We expect a single reference pose." assert pose_current.shape == (3,), "We expect a single current pose." assert lookahead_distance > 0.0, "Lookahead distance should be positive." assert k_lateral_error > 0.0, "Feedback gain for lateral error should be positive." x_reference, y_reference, heading_reference = pose_reference x_current, y_current, heading_current = pose_current x_error = x_current - x_reference y_error = y_current - y_reference heading_error = principal_value(heading_current - heading_reference) lateral_error = -x_error * np.sin(heading_reference) + y_error * np.cos(heading_reference) return float(-k_lateral_error * (lateral_error + lookahead_distance * heading_error)) def _convert_curvature_profile_to_steering_profile( curvature_profile: DoubleMatrix, discretization_time: float, wheel_base: float, ) -> Tuple[DoubleMatrix, DoubleMatrix]: """ Converts from a curvature profile to the corresponding steering profile. We assume a kinematic bicycle model where curvature = tan(steering_angle) / wheel_base. For simplicity, we just use finite differences to determine steering rate. :param curvature_profile: [rad] Curvature trajectory to convert. :param discretization_time: [s] Time discretization used for integration. :param wheel_base: [m] The vehicle wheelbase parameter required for conversion. :return: The [rad] steering angle and [rad/s] steering rate (derivative) profiles. """ assert discretization_time > 0.0, "Discretization time must be positive." assert wheel_base > 0.0, "The vehicle's wheelbase length must be positive." steering_angle_profile = np.arctan(wheel_base * curvature_profile) steering_rate_profile = np.diff(steering_angle_profile) / discretization_time return steering_angle_profile, steering_rate_profile def _get_xy_heading_displacements_from_poses(poses: DoubleMatrix) -> Tuple[DoubleMatrix, DoubleMatrix]: """ Returns position and heading displacements given a pose trajectory. :param poses: A trajectory of poses (x, y, heading). :return: Tuple of xy displacements with shape (num_poses-1, 2) and heading displacements with shape (num_poses-1,). """ assert len(poses.shape) == 2, "Expect a 2D matrix representing a trajectory of poses." assert poses.shape[0] > 1, "Cannot get displacements given an empty or single element pose trajectory." assert poses.shape[1] == 3, "Expect pose to have three elements (x, y, heading)." # Compute displacements that are used to complete the kinematic state and input. pose_differences = np.diff(poses, axis=0) xy_displacements = pose_differences[:, :2] heading_displacements = principal_value(pose_differences[:, 2]) return xy_displacements, heading_displacements def _make_banded_difference_matrix(number_rows: int) -> DoubleMatrix: """ Returns a banded difference matrix with specified number_rows. When applied to a vector [x_1, ..., x_N], it returns [x_2 - x_1, ..., x_N - x_{N-1}]. :param number_rows: The row dimension of the banded difference matrix (e.g. N-1 in the example above). :return: A banded difference matrix with shape (number_rows, number_rows+1). """ banded_matrix: DoubleMatrix = -1.0 * np.eye(number_rows + 1, dtype=np.float64)[:-1, :] for ind in range(len(banded_matrix)): banded_matrix[ind, ind + 1] = 1.0 return banded_matrix def _fit_initial_velocity_and_acceleration_profile( xy_displacements: DoubleMatrix, heading_profile: DoubleMatrix, discretization_time: float, jerk_penalty: float ) -> Tuple[float, DoubleMatrix]: """ Estimates initial velocity (v_0) and acceleration ({a_0, ...}) using least squares with jerk penalty regularization. :param xy_displacements: [m] Deviations in x and y occurring between M+1 poses, a M by 2 matrix. :param heading_profile: [rad] Headings associated to the starting timestamp for xy_displacements, a M-length vector. :param discretization_time: [s] Time discretization used for integration. :param jerk_penalty: A regularization parameter used to penalize acceleration differences. Should be positive. :return: Least squares solution for initial velocity (v_0) and acceleration profile ({a_0, ..., a_M-1}) for M displacement values. """ assert discretization_time > 0.0, "Discretization time must be positive." assert jerk_penalty > 0, "Should have a positive jerk_penalty." assert len(xy_displacements.shape) == 2, "Expect xy_displacements to be a matrix." assert xy_displacements.shape[1] == 2, "Expect xy_displacements to have 2 columns." num_displacements = len(xy_displacements) # aka M in the docstring assert heading_profile.shape == ( num_displacements, ), "Expect the length of heading_profile to match that of xy_displacements." # Core problem: minimize_x ||y-Ax||_2 y = xy_displacements.flatten() # Flatten to a vector, [delta x_0, delta y_0, ...] A: DoubleMatrix = np.zeros((2 * num_displacements, num_displacements), dtype=np.float64) for idx_timestep, heading in enumerate(heading_profile): start_row = 2 * idx_timestep # Which row of A corresponds to x-coordinate information at timestep k. # Related to v_0, initial velocity - column 0. # We fill in rows for measurements delta x_k, delta y_k. A[start_row : (start_row + 2), 0] = np.array( [ np.cos(heading) * discretization_time, np.sin(heading) * discretization_time, ], dtype=np.float64, ) if idx_timestep > 0: # Related to {a_0, ..., a_k-1}, acceleration profile - column 1 to k. # We fill in rows for measurements delta x_k, delta y_k. A[start_row : (start_row + 2), 1 : (1 + idx_timestep)] = np.array( [ [np.cos(heading) * discretization_time**2], [np.sin(heading) * discretization_time**2], ], dtype=np.float64, ) # Regularization using jerk penalty, i.e. difference of acceleration values. # If there are M displacements, then we have M - 1 acceleration values. # That means we have M - 2 jerk values, thus we make a banded difference matrix of that size. banded_matrix = _make_banded_difference_matrix(num_displacements - 2) R: DoubleMatrix = np.block([np.zeros((len(banded_matrix), 1)), banded_matrix]) # Compute regularized least squares solution. x = np.linalg.pinv(A.T @ A + jerk_penalty * R.T @ R) @ A.T @ y # Extract profile from solution. initial_velocity = x[0] acceleration_profile = x[1:] return initial_velocity, acceleration_profile def _generate_profile_from_initial_condition_and_derivatives( initial_condition: float, derivatives: DoubleMatrix, discretization_time: float ) -> DoubleMatrix: """ Returns the corresponding profile (i.e. trajectory) given an initial condition and derivatives at multiple timesteps by integration. :param initial_condition: The value of the variable at the initial timestep. :param derivatives: The trajectory of time derivatives of the variable at timesteps 0,..., N-1. :param discretization_time: [s] Time discretization used for integration. :return: The trajectory of the variable at timesteps 0,..., N. """ assert discretization_time > 0.0, "Discretization time must be positive." profile = initial_condition + np.insert(np.cumsum(derivatives * discretization_time), 0, 0.0) return profile # type: ignore def _fit_initial_curvature_and_curvature_rate_profile( heading_displacements: DoubleMatrix, velocity_profile: DoubleMatrix, discretization_time: float, curvature_rate_penalty: float, initial_curvature_penalty: float = 1e-10, ) -> Tuple[float, DoubleMatrix]: """ Estimates initial curvature (curvature_0) and curvature rate ({curvature_rate_0, ...}) using least squares with curvature rate regularization. :param heading_displacements: [rad] Angular deviations in heading occuring between timesteps. :param velocity_profile: [m/s] Estimated or actual velocities at the timesteps matching displacements. :param discretization_time: [s] Time discretization used for integration. :param curvature_rate_penalty: A regularization parameter used to penalize curvature_rate. Should be positive. :param initial_curvature_penalty: A regularization parameter to handle zero initial speed. Should be positive and small. :return: Least squares solution for initial curvature (curvature_0) and curvature rate profile (curvature_rate_0, ..., curvature_rate_{M-1}) for M heading displacement values. """ assert discretization_time > 0.0, "Discretization time must be positive." assert curvature_rate_penalty > 0.0, "Should have a positive curvature_rate_penalty." assert initial_curvature_penalty > 0.0, "Should have a positive initial_curvature_penalty." # Core problem: minimize_x ||y-Ax||_2 y = heading_displacements A: DoubleMatrix = np.tri(len(y), dtype=np.float64) # lower triangular matrix A[:, 0] = velocity_profile * discretization_time for idx, velocity in enumerate(velocity_profile): if idx == 0: continue A[idx, 1:] *= velocity * discretization_time**2 # Regularization on curvature rate. We add a small but nonzero weight on initial curvature too. # This is since the corresponding row of the A matrix might be zero if initial speed is 0, leading to singularity. # We guarantee that Q is positive definite such that the minimizer of the least squares problem is unique. Q: DoubleMatrix = curvature_rate_penalty * np.eye(len(y)) Q[0, 0] = initial_curvature_penalty # Compute regularized least squares solution. x = np.linalg.pinv(A.T @ A + Q) @ A.T @ y # Extract profile from solution. initial_curvature = x[0] curvature_rate_profile = x[1:] return initial_curvature, curvature_rate_profile def get_velocity_curvature_profiles_with_derivatives_from_poses( discretization_time: float, poses: DoubleMatrix, jerk_penalty: float, curvature_rate_penalty: float, ) -> Tuple[DoubleMatrix, DoubleMatrix, DoubleMatrix, DoubleMatrix]: """ Main function for joint estimation of velocity, acceleration, curvature, and curvature rate given N poses sampled at discretization_time. This is done by solving two least squares problems with the given penalty weights. :param discretization_time: [s] Time discretization used for integration. :param poses: A trajectory of N poses (x, y, heading). :param jerk_penalty: A regularization parameter used to penalize acceleration differences. Should be positive. :param curvature_rate_penalty: A regularization parameter used to penalize curvature_rate. Should be positive. :return: Profiles for velocity (N-1), acceleration (N-2), curvature (N-1), and curvature rate (N-2). """ xy_displacements, heading_displacements = _get_xy_heading_displacements_from_poses(poses) # Compute initial velocity + acceleration least squares solution and extract results. # Note: If we have M displacements, we require the M associated heading values. # Therefore, we exclude the last heading in the call below. initial_velocity, acceleration_profile = _fit_initial_velocity_and_acceleration_profile( xy_displacements=xy_displacements, heading_profile=poses[:-1, 2], discretization_time=discretization_time, jerk_penalty=jerk_penalty, ) velocity_profile = _generate_profile_from_initial_condition_and_derivatives( initial_condition=initial_velocity, derivatives=acceleration_profile, discretization_time=discretization_time, ) # Compute initial curvature + curvature rate least squares solution and extract results. It relies on velocity fit. initial_curvature, curvature_rate_profile = _fit_initial_curvature_and_curvature_rate_profile( heading_displacements=heading_displacements, velocity_profile=velocity_profile, discretization_time=discretization_time, curvature_rate_penalty=curvature_rate_penalty, ) curvature_profile = _generate_profile_from_initial_condition_and_derivatives( initial_condition=initial_curvature, derivatives=curvature_rate_profile, discretization_time=discretization_time, ) return velocity_profile, acceleration_profile, curvature_profile, curvature_rate_profile def complete_kinematic_state_and_inputs_from_poses( discretization_time: float, wheel_base: float, poses: DoubleMatrix, jerk_penalty: float, curvature_rate_penalty: float, ) -> Tuple[DoubleMatrix, DoubleMatrix]: """ Main function for joint estimation of velocity, acceleration, steering angle, and steering rate given poses sampled at discretization_time and the vehicle wheelbase parameter for curvature -> steering angle conversion. One caveat is that we can only determine the first N-1 kinematic states and N-2 kinematic inputs given N-1 displacement/difference values, so we need to extrapolate to match the length of poses provided. This is handled by repeating the last input and extrapolating the motion model for the last state. :param discretization_time: [s] Time discretization used for integration. :param wheel_base: [m] The wheelbase length for the kinematic bicycle model being used. :param poses: A trajectory of poses (x, y, heading). :param jerk_penalty: A regularization parameter used to penalize acceleration differences. Should be positive. :param curvature_rate_penalty: A regularization parameter used to penalize curvature_rate. Should be positive. :return: kinematic_states (x, y, heading, velocity, steering_angle) and corresponding kinematic_inputs (acceleration, steering_rate). """ ( velocity_profile, acceleration_profile, curvature_profile, curvature_rate_profile, ) = get_velocity_curvature_profiles_with_derivatives_from_poses( discretization_time=discretization_time, poses=poses, jerk_penalty=jerk_penalty, curvature_rate_penalty=curvature_rate_penalty, ) # Convert to steering angle given the wheelbase parameter. At this point, we don't need to worry about curvature. steering_angle_profile, steering_rate_profile = _convert_curvature_profile_to_steering_profile( curvature_profile=curvature_profile, discretization_time=discretization_time, wheel_base=wheel_base, ) # Extend input fits with a repeated element and extrapolate state fits to match length of poses. # This is since we fit with N-1 displacements but still have N poses at the end to deal with. acceleration_profile = np.append(acceleration_profile, acceleration_profile[-1]) steering_rate_profile = np.append(steering_rate_profile, steering_rate_profile[-1]) velocity_profile = np.append( velocity_profile, velocity_profile[-1] + acceleration_profile[-1] * discretization_time ) steering_angle_profile = np.append( steering_angle_profile, steering_angle_profile[-1] + steering_rate_profile[-1] * discretization_time ) # Collect completed state and input in matrices. kinematic_states: DoubleMatrix = np.column_stack((poses, velocity_profile, steering_angle_profile)) kinematic_inputs: DoubleMatrix = np.column_stack((acceleration_profile, steering_rate_profile)) return kinematic_states, kinematic_inputs