Spaces:
Paused
Paused
import collections | |
import numpy as np | |
import torch | |
from enum import IntEnum | |
from scipy.interpolate import interp1d | |
def interp_lanes(lane): | |
""" generate interpolants for lanes | |
Args: | |
lane (np.array()): [Nx3] | |
Returns: | |
""" | |
ds = np.cumsum( | |
np.hstack([0., np.linalg.norm(lane[1:, :2]-lane[:-1, :2], axis=-1)])) | |
return interp1d(ds, lane, fill_value="extrapolate", assume_sorted=True, axis=0), lane[0] | |
def batch_proj(x, line): | |
# x:[batch,3], line:[batch,N,3] | |
line_length = line.shape[-2] | |
batch_dim = x.ndim - 1 | |
if isinstance(x, torch.Tensor): | |
delta = line[..., 0:2] - torch.unsqueeze(x[..., 0:2], dim=-2).repeat( | |
*([1] * batch_dim), line_length, 1 | |
) | |
dis = torch.linalg.norm(delta, axis=-1) | |
idx0 = torch.argmin(dis, dim=-1) | |
idx = idx0.view(*line.shape[:-2], 1, 1).repeat( | |
*([1] * (batch_dim + 1)), line.shape[-1] | |
) | |
line_min = torch.squeeze(torch.gather(line, -2, idx), dim=-2) | |
dx = x[..., None, 0] - line[..., 0] | |
dy = x[..., None, 1] - line[..., 1] | |
delta_y = -dx * torch.sin(line_min[..., None, 2]) + dy * torch.cos( | |
line_min[..., None, 2] | |
) | |
delta_x = dx * torch.cos(line_min[..., None, 2]) + dy * torch.sin( | |
line_min[..., None, 2] | |
) | |
ref_pts = torch.stack( | |
[ | |
line_min[..., 0] + delta_x * torch.cos(line_min[..., 2]), | |
line_min[..., 1] + delta_x * torch.sin(line_min[..., 2]), | |
line_min[..., 2], | |
], | |
dim=-1, | |
) | |
delta_psi = round_2pi(x[..., 2] - line_min[..., 2]) | |
return ( | |
delta_x, | |
delta_y, | |
torch.unsqueeze(delta_psi, dim=-1), | |
ref_pts, | |
) | |
elif isinstance(x, np.ndarray): | |
delta = line[..., 0:2] - np.repeat( | |
x[..., np.newaxis, 0:2], line_length, axis=-2 | |
) | |
dis = np.linalg.norm(delta, axis=-1) | |
idx0 = np.argmin(dis, axis=-1) | |
idx = idx0.reshape(*line.shape[:-2], 1, | |
1).repeat(line.shape[-1], axis=-1) | |
line_min = np.squeeze(np.take_along_axis(line, idx, axis=-2), axis=-2) | |
dx = x[..., None, 0] - line[..., 0] | |
dy = x[..., None, 1] - line[..., 1] | |
delta_y = -dx * np.sin(line_min[..., None, 2]) + dy * np.cos( | |
line_min[..., None, 2] | |
) | |
delta_x = dx * np.cos(line_min[..., None, 2]) + dy * np.sin( | |
line_min[..., None, 2] | |
) | |
line_min[..., 0] += delta_x * np.cos(line_min[..., 2]) | |
line_min[..., 1] += delta_x * np.sin(line_min[..., 2]) | |
delta_psi = round_2pi(x[..., 2] - line_min[..., 2]) | |
return ( | |
delta_x, | |
delta_y, | |
np.expand_dims(delta_psi, axis=-1), | |
line_min, | |
) | |
def round_2pi(x): | |
return (x + np.pi) % (2 * np.pi) - np.pi | |
def get_box_world_coords(pos, yaw, extent): | |
corners = (torch.tensor([[-1, -1], [-1, 1], [1, 1], [1, -1]]) * 0.5).to(pos.device) * ( | |
extent.unsqueeze(-2) | |
) | |
s = torch.sin(yaw).unsqueeze(-1) | |
c = torch.cos(yaw).unsqueeze(-1) | |
rotM = torch.cat((torch.cat((c, s), dim=-1), | |
torch.cat((-s, c), dim=-1)), dim=-2) | |
rotated_corners = (corners + pos.unsqueeze(-2)) @ rotM | |
return rotated_corners | |
def get_upright_box(pos, extent): | |
yaws = torch.zeros(*pos.shape[:-1], 1).to(pos.device) | |
boxes = get_box_world_coords(pos, yaws, extent) | |
upright_boxes = boxes[..., [0, 2], :] | |
return upright_boxes | |
def batch_nd_transform_points(points, Mat): | |
ndim = Mat.shape[-1] - 1 | |
Mat = torch.transpose(Mat, -1, -2) | |
return (points.unsqueeze(-2) @ Mat[..., :ndim, :ndim]).squeeze(-2) + Mat[ | |
..., -1:, :ndim | |
].squeeze(-2) | |
def transform_points_tensor( | |
points: torch.Tensor, transf_matrix: torch.Tensor | |
) -> torch.Tensor: | |
""" | |
Transform a set of 2D/3D points using the given transformation matrix. | |
Assumes row major ordering of the input points. The transform function has 3 modes: | |
- points (N, F), transf_matrix (F+1, F+1) | |
all points are transformed using the matrix and the output points have shape (N, F). | |
- points (B, N, F), transf_matrix (F+1, F+1) | |
all sequences of points are transformed using the same matrix and the output points have shape (B, N, F). | |
transf_matrix is broadcasted. | |
- points (B, N, F), transf_matrix (B, F+1, F+1) | |
each sequence of points is transformed using its own matrix and the output points have shape (B, N, F). | |
Note this function assumes points.shape[-1] == matrix.shape[-1] - 1, which means that last | |
rows in the matrices do not influence the final results. | |
For 2D points only the first 2x3 parts of the matrices will be used. | |
:param points: Input points of shape (N, F) or (B, N, F) | |
with F = 2 or 3 depending on input points are 2D or 3D points. | |
:param transf_matrix: Transformation matrix of shape (F+1, F+1) or (B, F+1, F+1) with F = 2 or 3. | |
:return: Transformed points of shape (N, F) or (B, N, F) depending on the dimensions of the input points. | |
""" | |
points_log = f" received points with shape {points.shape} " | |
matrix_log = f" received matrices with shape {transf_matrix.shape} " | |
assert points.ndim in [ | |
2, 3], f"points should have ndim in [2,3],{points_log}" | |
assert transf_matrix.ndim in [ | |
2, | |
3, | |
], f"matrix should have ndim in [2,3],{matrix_log}" | |
assert ( | |
points.ndim >= transf_matrix.ndim | |
), f"points ndim should be >= than matrix,{points_log},{matrix_log}" | |
points_feat = points.shape[-1] | |
assert points_feat in [ | |
2, 3], f"last points dimension must be 2 or 3,{points_log}" | |
assert ( | |
transf_matrix.shape[-1] == transf_matrix.shape[-2] | |
), f"matrix should be a square matrix,{matrix_log}" | |
matrix_feat = transf_matrix.shape[-1] | |
assert matrix_feat in [ | |
3, 4], f"last matrix dimension must be 3 or 4,{matrix_log}" | |
assert ( | |
points_feat == matrix_feat - 1 | |
), f"points last dim should be one less than matrix,{points_log},{matrix_log}" | |
def _transform(points: torch.Tensor, transf_matrix: torch.Tensor) -> torch.Tensor: | |
num_dims = transf_matrix.shape[-1] - 1 | |
transf_matrix = torch.permute(transf_matrix, (0, 2, 1)) | |
return ( | |
points @ transf_matrix[:, :num_dims, :num_dims] | |
+ transf_matrix[:, -1:, :num_dims] | |
) | |
if points.ndim == transf_matrix.ndim == 2: | |
points = torch.unsqueeze(points, 0) | |
transf_matrix = torch.unsqueeze(transf_matrix, 0) | |
return _transform(points, transf_matrix)[0] | |
elif points.ndim == transf_matrix.ndim == 3: | |
return _transform(points, transf_matrix) | |
elif points.ndim == 3 and transf_matrix.ndim == 2: | |
transf_matrix = torch.unsqueeze(transf_matrix, 0) | |
return _transform(points, transf_matrix) | |
else: | |
raise NotImplementedError( | |
f"unsupported case!{points_log},{matrix_log}") | |
def PED_PED_collision(p1, p2, S1, S2): | |
if isinstance(p1, torch.Tensor): | |
return ( | |
torch.linalg.norm(p1[..., 0:2] - p2[..., 0:2], dim=-1) | |
- (S1[..., 0] + S2[..., 0]) / 2 | |
) | |
elif isinstance(p1, np.ndarray): | |
return ( | |
np.linalg.norm(p1[..., 0:2] - p2[..., 0:2], axis=-1) | |
- (S1[..., 0] + S2[..., 0]) / 2 | |
) | |
else: | |
raise NotImplementedError | |
def batch_rotate_2D(xy, theta): | |
if isinstance(xy, torch.Tensor): | |
x1 = xy[..., 0] * torch.cos(theta) - xy[..., 1] * torch.sin(theta) | |
y1 = xy[..., 1] * torch.cos(theta) + xy[..., 0] * torch.sin(theta) | |
return torch.stack([x1, y1], dim=-1) | |
elif isinstance(xy, np.ndarray): | |
x1 = xy[..., 0] * np.cos(theta) - xy[..., 1] * np.sin(theta) | |
y1 = xy[..., 1] * np.cos(theta) + xy[..., 0] * np.sin(theta) | |
return np.concatenate((x1[..., None], y1[..., None]), axis=-1) | |
def VEH_VEH_collision( | |
p1, p2, S1, S2, alpha=5, return_dis=False, offsetX=1.0, offsetY=0.3 | |
): | |
if isinstance(p1, torch.Tensor): | |
cornersX = torch.kron( | |
S1[..., 0] + | |
offsetX, torch.tensor([0.5, 0.5, -0.5, -0.5]).to(p1.device) | |
) | |
cornersY = torch.kron( | |
S1[..., 1] + | |
offsetY, torch.tensor([0.5, -0.5, 0.5, -0.5]).to(p1.device) | |
) | |
corners = torch.stack([cornersX, cornersY], dim=-1) | |
theta1 = p1[..., 2] | |
theta2 = p2[..., 2] | |
dx = (p1[..., 0:2] - p2[..., 0:2]).repeat_interleave(4, dim=-2) | |
delta_x1 = batch_rotate_2D( | |
corners, theta1.repeat_interleave(4, dim=-1)) + dx | |
delta_x2 = batch_rotate_2D( | |
delta_x1, -theta2.repeat_interleave(4, dim=-1)) | |
dis = torch.maximum( | |
torch.abs(delta_x2[..., 0]) - 0.5 * | |
S2[..., 0].repeat_interleave(4, dim=-1), | |
torch.abs(delta_x2[..., 1]) - 0.5 * | |
S2[..., 1].repeat_interleave(4, dim=-1), | |
).view(*S1.shape[:-1], 4) | |
min_dis, _ = torch.min(dis, dim=-1) | |
return min_dis | |
elif isinstance(p1, np.ndarray): | |
cornersX = np.kron(S1[..., 0] + offsetX, | |
np.array([0.5, 0.5, -0.5, -0.5])) | |
cornersY = np.kron(S1[..., 1] + offsetY, | |
np.array([0.5, -0.5, 0.5, -0.5])) | |
corners = np.concatenate((cornersX, cornersY), axis=-1) | |
theta1 = p1[..., 2] | |
theta2 = p2[..., 2] | |
dx = (p1[..., 0:2] - p2[..., 0:2]).repeat(4, axis=-2) | |
delta_x1 = batch_rotate_2D(corners, theta1.repeat(4, axis=-1)) + dx | |
delta_x2 = batch_rotate_2D(delta_x1, -theta2.repeat(4, axis=-1)) | |
dis = np.maximum( | |
np.abs(delta_x2[..., 0]) - 0.5 * S2[..., 0].repeat(4, axis=-1), | |
np.abs(delta_x2[..., 1]) - 0.5 * S2[..., 1].repeat(4, axis=-1), | |
).reshape(*S1.shape[:-1], 4) | |
min_dis = np.min(dis, axis=-1) | |
return min_dis | |
else: | |
raise NotImplementedError | |
def VEH_PED_collision(p1, p2, S1, S2): | |
if isinstance(p1, torch.Tensor): | |
mask = torch.logical_or( | |
torch.abs(p1[..., 2]) > 0.1, torch.linalg.norm( | |
p2[..., 2:4], dim=-1) > 0.1 | |
).detach() | |
theta = p1[..., 2] | |
dx = batch_rotate_2D(p2[..., 0:2] - p1[..., 0:2], -theta) | |
return torch.maximum( | |
torch.abs(dx[..., 0]) - S1[..., 0] / 2 - S2[..., 0] / 2, | |
torch.abs(dx[..., 1]) - S1[..., 1] / 2 - S2[..., 0] / 2, | |
) | |
elif isinstance(p1, np.ndarray): | |
theta = p1[..., 2] | |
dx = batch_rotate_2D(p2[..., 0:2] - p1[..., 0:2], -theta) | |
return np.maximum( | |
np.abs(dx[..., 0]) - S1[..., 0] / 2 - S2[..., 0] / 2, | |
np.abs(dx[..., 1]) - S1[..., 1] / 2 - S2[..., 0] / 2, | |
) | |
else: | |
raise NotImplementedError | |
def PED_VEH_collision(p1, p2, S1, S2): | |
return VEH_PED_collision(p2, p1, S2, S1) | |
def batch_proj(x, line): | |
# x:[batch,3], line:[batch,N,3] | |
line_length = line.shape[-2] | |
batch_dim = x.ndim - 1 | |
if isinstance(x, torch.Tensor): | |
delta = line[..., 0:2] - torch.unsqueeze(x[..., 0:2], dim=-2).repeat( | |
*([1] * batch_dim), line_length, 1 | |
) | |
dis = torch.linalg.norm(delta, axis=-1) | |
idx0 = torch.argmin(dis, dim=-1) | |
idx = idx0.view(*line.shape[:-2], 1, 1).repeat( | |
*([1] * (batch_dim + 1)), line.shape[-1] | |
) | |
line_min = torch.squeeze(torch.gather(line, -2, idx), dim=-2) | |
dx = x[..., None, 0] - line[..., 0] | |
dy = x[..., None, 1] - line[..., 1] | |
delta_y = -dx * torch.sin(line_min[..., None, 2]) + dy * torch.cos( | |
line_min[..., None, 2] | |
) | |
delta_x = dx * torch.cos(line_min[..., None, 2]) + dy * torch.sin( | |
line_min[..., None, 2] | |
) | |
# ref_pts = torch.stack( | |
# [ | |
# line_min[..., 0] + delta_x * torch.cos(line_min[..., 2]), | |
# line_min[..., 1] + delta_x * torch.sin(line_min[..., 2]), | |
# line_min[..., 2], | |
# ], | |
# dim=-1, | |
# ) | |
delta_psi = round_2pi(x[..., 2] - line_min[..., 2]) | |
return ( | |
delta_x, | |
delta_y, | |
torch.unsqueeze(delta_psi, dim=-1), | |
) | |
elif isinstance(x, np.ndarray): | |
delta = line[..., 0:2] - np.repeat( | |
x[..., np.newaxis, 0:2], line_length, axis=-2 | |
) | |
dis = np.linalg.norm(delta, axis=-1) | |
idx0 = np.argmin(dis, axis=-1) | |
idx = idx0.reshape(*line.shape[:-2], 1, | |
1).repeat(line.shape[-1], axis=-1) | |
line_min = np.squeeze(np.take_along_axis(line, idx, axis=-2), axis=-2) | |
dx = x[..., None, 0] - line[..., 0] | |
dy = x[..., None, 1] - line[..., 1] | |
delta_y = -dx * np.sin(line_min[..., None, 2]) + dy * np.cos( | |
line_min[..., None, 2] | |
) | |
delta_x = dx * np.cos(line_min[..., None, 2]) + dy * np.sin( | |
line_min[..., None, 2] | |
) | |
# line_min[..., 0] += delta_x * np.cos(line_min[..., 2]) | |
# line_min[..., 1] += delta_x * np.sin(line_min[..., 2]) | |
delta_psi = round_2pi(x[..., 2] - line_min[..., 2]) | |
return ( | |
delta_x, | |
delta_y, | |
np.expand_dims(delta_psi, axis=-1), | |
) | |
class CollisionType(IntEnum): | |
"""This enum defines the three types of collisions: front, rear and side.""" | |
FRONT = 0 | |
REAR = 1 | |
SIDE = 2 | |
def detect_collision( | |
ego_pos: np.ndarray, | |
ego_yaw: np.ndarray, | |
ego_extent: np.ndarray, | |
other_pos: np.ndarray, | |
other_yaw: np.ndarray, | |
other_extent: np.ndarray, | |
): | |
""" | |
Computes whether a collision occured between ego and any another agent. | |
Also computes the type of collision: rear, front, or side. | |
For this, we compute the intersection of ego's four sides with a target | |
agent and measure the length of this intersection. A collision | |
is classified into a class, if the corresponding length is maximal, | |
i.e. a front collision exhibits the longest intersection with | |
egos front edge. | |
.. note:: please note that this funciton will stop upon finding the first | |
colision, so it won't return all collisions but only the first | |
one found. | |
:param ego_pos: predicted centroid | |
:param ego_yaw: predicted yaw | |
:param ego_extent: predicted extent | |
:param other_pos: target agents | |
:return: None if not collision was found, and a tuple with the | |
collision type and the agent track_id | |
""" | |
from l5kit.planning import utils | |
ego_bbox = utils._get_bounding_box( | |
centroid=ego_pos, yaw=ego_yaw, extent=ego_extent) | |
# within_range_mask = utils.within_range(ego_pos, ego_extent, other_pos, other_extent) | |
for i in range(other_pos.shape[0]): | |
agent_bbox = utils._get_bounding_box( | |
other_pos[i], other_yaw[i], other_extent[i]) | |
if ego_bbox.intersects(agent_bbox): | |
front_side, rear_side, left_side, right_side = utils._get_sides( | |
ego_bbox) | |
intersection_length_per_side = np.asarray( | |
[ | |
agent_bbox.intersection(front_side).length, | |
agent_bbox.intersection(rear_side).length, | |
agent_bbox.intersection(left_side).length, | |
agent_bbox.intersection(right_side).length, | |
] | |
) | |
argmax_side = np.argmax(intersection_length_per_side) | |
# Remap here is needed because there are two sides that are | |
# mapped to the same collision type CollisionType.SIDE | |
max_collision_types = max(CollisionType).value | |
remap_argmax = min(argmax_side, max_collision_types) | |
collision_type = CollisionType(remap_argmax) | |
return collision_type, i | |
return None | |
def calc_distance_map(road_flag, max_dis=10): | |
"""mark the image with manhattan distance to the drivable area | |
Args: | |
road_flag (torch.Tensor[B,W,H]): an image with 1 channel, 1 for drivable area, 0 for non-drivable area | |
max_dis (int, optional): maximum distance that the result saturates to. Defaults to 10. | |
""" | |
out = torch.zeros_like(road_flag, dtype=torch.float) | |
out[road_flag == 0] = max_dis | |
out[road_flag == 1] = 0 | |
for i in range(max_dis-1): | |
out[..., 1:, :] = torch.min(out[..., 1:, :], out[..., :-1, :]+1) | |
out[..., :-1, :] = torch.min(out[..., :-1, :], out[..., 1:, :]+1) | |
out[..., :, 1:] = torch.min(out[..., :, 1:], out[..., :, :-1]+1) | |
out[..., :, :-1] = torch.min(out[..., :, :-1], out[..., :, 1:]+1) | |
return out | |