import collections import numpy as np import torch from enum import IntEnum from scipy.interpolate import interp1d def interp_lanes(lane): """ generate interpolants for lanes Args: lane (np.array()): [Nx3] Returns: """ ds = np.cumsum( np.hstack([0., np.linalg.norm(lane[1:, :2]-lane[:-1, :2], axis=-1)])) return interp1d(ds, lane, fill_value="extrapolate", assume_sorted=True, axis=0), lane[0] def batch_proj(x, line): # x:[batch,3], line:[batch,N,3] line_length = line.shape[-2] batch_dim = x.ndim - 1 if isinstance(x, torch.Tensor): delta = line[..., 0:2] - torch.unsqueeze(x[..., 0:2], dim=-2).repeat( *([1] * batch_dim), line_length, 1 ) dis = torch.linalg.norm(delta, axis=-1) idx0 = torch.argmin(dis, dim=-1) idx = idx0.view(*line.shape[:-2], 1, 1).repeat( *([1] * (batch_dim + 1)), line.shape[-1] ) line_min = torch.squeeze(torch.gather(line, -2, idx), dim=-2) dx = x[..., None, 0] - line[..., 0] dy = x[..., None, 1] - line[..., 1] delta_y = -dx * torch.sin(line_min[..., None, 2]) + dy * torch.cos( line_min[..., None, 2] ) delta_x = dx * torch.cos(line_min[..., None, 2]) + dy * torch.sin( line_min[..., None, 2] ) ref_pts = torch.stack( [ line_min[..., 0] + delta_x * torch.cos(line_min[..., 2]), line_min[..., 1] + delta_x * torch.sin(line_min[..., 2]), line_min[..., 2], ], dim=-1, ) delta_psi = round_2pi(x[..., 2] - line_min[..., 2]) return ( delta_x, delta_y, torch.unsqueeze(delta_psi, dim=-1), ref_pts, ) elif isinstance(x, np.ndarray): delta = line[..., 0:2] - np.repeat( x[..., np.newaxis, 0:2], line_length, axis=-2 ) dis = np.linalg.norm(delta, axis=-1) idx0 = np.argmin(dis, axis=-1) idx = idx0.reshape(*line.shape[:-2], 1, 1).repeat(line.shape[-1], axis=-1) line_min = np.squeeze(np.take_along_axis(line, idx, axis=-2), axis=-2) dx = x[..., None, 0] - line[..., 0] dy = x[..., None, 1] - line[..., 1] delta_y = -dx * np.sin(line_min[..., None, 2]) + dy * np.cos( line_min[..., None, 2] ) delta_x = dx * np.cos(line_min[..., None, 2]) + dy * np.sin( line_min[..., None, 2] ) line_min[..., 0] += delta_x * np.cos(line_min[..., 2]) line_min[..., 1] += delta_x * np.sin(line_min[..., 2]) delta_psi = round_2pi(x[..., 2] - line_min[..., 2]) return ( delta_x, delta_y, np.expand_dims(delta_psi, axis=-1), line_min, ) def round_2pi(x): return (x + np.pi) % (2 * np.pi) - np.pi def get_box_world_coords(pos, yaw, extent): corners = (torch.tensor([[-1, -1], [-1, 1], [1, 1], [1, -1]]) * 0.5).to(pos.device) * ( extent.unsqueeze(-2) ) s = torch.sin(yaw).unsqueeze(-1) c = torch.cos(yaw).unsqueeze(-1) rotM = torch.cat((torch.cat((c, s), dim=-1), torch.cat((-s, c), dim=-1)), dim=-2) rotated_corners = (corners + pos.unsqueeze(-2)) @ rotM return rotated_corners def get_upright_box(pos, extent): yaws = torch.zeros(*pos.shape[:-1], 1).to(pos.device) boxes = get_box_world_coords(pos, yaws, extent) upright_boxes = boxes[..., [0, 2], :] return upright_boxes def batch_nd_transform_points(points, Mat): ndim = Mat.shape[-1] - 1 Mat = torch.transpose(Mat, -1, -2) return (points.unsqueeze(-2) @ Mat[..., :ndim, :ndim]).squeeze(-2) + Mat[ ..., -1:, :ndim ].squeeze(-2) def transform_points_tensor( points: torch.Tensor, transf_matrix: torch.Tensor ) -> torch.Tensor: """ Transform a set of 2D/3D points using the given transformation matrix. Assumes row major ordering of the input points. The transform function has 3 modes: - points (N, F), transf_matrix (F+1, F+1) all points are transformed using the matrix and the output points have shape (N, F). - points (B, N, F), transf_matrix (F+1, F+1) all sequences of points are transformed using the same matrix and the output points have shape (B, N, F). transf_matrix is broadcasted. - points (B, N, F), transf_matrix (B, F+1, F+1) each sequence of points is transformed using its own matrix and the output points have shape (B, N, F). Note this function assumes points.shape[-1] == matrix.shape[-1] - 1, which means that last rows in the matrices do not influence the final results. For 2D points only the first 2x3 parts of the matrices will be used. :param points: Input points of shape (N, F) or (B, N, F) with F = 2 or 3 depending on input points are 2D or 3D points. :param transf_matrix: Transformation matrix of shape (F+1, F+1) or (B, F+1, F+1) with F = 2 or 3. :return: Transformed points of shape (N, F) or (B, N, F) depending on the dimensions of the input points. """ points_log = f" received points with shape {points.shape} " matrix_log = f" received matrices with shape {transf_matrix.shape} " assert points.ndim in [ 2, 3], f"points should have ndim in [2,3],{points_log}" assert transf_matrix.ndim in [ 2, 3, ], f"matrix should have ndim in [2,3],{matrix_log}" assert ( points.ndim >= transf_matrix.ndim ), f"points ndim should be >= than matrix,{points_log},{matrix_log}" points_feat = points.shape[-1] assert points_feat in [ 2, 3], f"last points dimension must be 2 or 3,{points_log}" assert ( transf_matrix.shape[-1] == transf_matrix.shape[-2] ), f"matrix should be a square matrix,{matrix_log}" matrix_feat = transf_matrix.shape[-1] assert matrix_feat in [ 3, 4], f"last matrix dimension must be 3 or 4,{matrix_log}" assert ( points_feat == matrix_feat - 1 ), f"points last dim should be one less than matrix,{points_log},{matrix_log}" def _transform(points: torch.Tensor, transf_matrix: torch.Tensor) -> torch.Tensor: num_dims = transf_matrix.shape[-1] - 1 transf_matrix = torch.permute(transf_matrix, (0, 2, 1)) return ( points @ transf_matrix[:, :num_dims, :num_dims] + transf_matrix[:, -1:, :num_dims] ) if points.ndim == transf_matrix.ndim == 2: points = torch.unsqueeze(points, 0) transf_matrix = torch.unsqueeze(transf_matrix, 0) return _transform(points, transf_matrix)[0] elif points.ndim == transf_matrix.ndim == 3: return _transform(points, transf_matrix) elif points.ndim == 3 and transf_matrix.ndim == 2: transf_matrix = torch.unsqueeze(transf_matrix, 0) return _transform(points, transf_matrix) else: raise NotImplementedError( f"unsupported case!{points_log},{matrix_log}") def PED_PED_collision(p1, p2, S1, S2): if isinstance(p1, torch.Tensor): return ( torch.linalg.norm(p1[..., 0:2] - p2[..., 0:2], dim=-1) - (S1[..., 0] + S2[..., 0]) / 2 ) elif isinstance(p1, np.ndarray): return ( np.linalg.norm(p1[..., 0:2] - p2[..., 0:2], axis=-1) - (S1[..., 0] + S2[..., 0]) / 2 ) else: raise NotImplementedError def batch_rotate_2D(xy, theta): if isinstance(xy, torch.Tensor): x1 = xy[..., 0] * torch.cos(theta) - xy[..., 1] * torch.sin(theta) y1 = xy[..., 1] * torch.cos(theta) + xy[..., 0] * torch.sin(theta) return torch.stack([x1, y1], dim=-1) elif isinstance(xy, np.ndarray): x1 = xy[..., 0] * np.cos(theta) - xy[..., 1] * np.sin(theta) y1 = xy[..., 1] * np.cos(theta) + xy[..., 0] * np.sin(theta) return np.concatenate((x1[..., None], y1[..., None]), axis=-1) def VEH_VEH_collision( p1, p2, S1, S2, alpha=5, return_dis=False, offsetX=1.0, offsetY=0.3 ): if isinstance(p1, torch.Tensor): cornersX = torch.kron( S1[..., 0] + offsetX, torch.tensor([0.5, 0.5, -0.5, -0.5]).to(p1.device) ) cornersY = torch.kron( S1[..., 1] + offsetY, torch.tensor([0.5, -0.5, 0.5, -0.5]).to(p1.device) ) corners = torch.stack([cornersX, cornersY], dim=-1) theta1 = p1[..., 2] theta2 = p2[..., 2] dx = (p1[..., 0:2] - p2[..., 0:2]).repeat_interleave(4, dim=-2) delta_x1 = batch_rotate_2D( corners, theta1.repeat_interleave(4, dim=-1)) + dx delta_x2 = batch_rotate_2D( delta_x1, -theta2.repeat_interleave(4, dim=-1)) dis = torch.maximum( torch.abs(delta_x2[..., 0]) - 0.5 * S2[..., 0].repeat_interleave(4, dim=-1), torch.abs(delta_x2[..., 1]) - 0.5 * S2[..., 1].repeat_interleave(4, dim=-1), ).view(*S1.shape[:-1], 4) min_dis, _ = torch.min(dis, dim=-1) return min_dis elif isinstance(p1, np.ndarray): cornersX = np.kron(S1[..., 0] + offsetX, np.array([0.5, 0.5, -0.5, -0.5])) cornersY = np.kron(S1[..., 1] + offsetY, np.array([0.5, -0.5, 0.5, -0.5])) corners = np.concatenate((cornersX, cornersY), axis=-1) theta1 = p1[..., 2] theta2 = p2[..., 2] dx = (p1[..., 0:2] - p2[..., 0:2]).repeat(4, axis=-2) delta_x1 = batch_rotate_2D(corners, theta1.repeat(4, axis=-1)) + dx delta_x2 = batch_rotate_2D(delta_x1, -theta2.repeat(4, axis=-1)) dis = np.maximum( np.abs(delta_x2[..., 0]) - 0.5 * S2[..., 0].repeat(4, axis=-1), np.abs(delta_x2[..., 1]) - 0.5 * S2[..., 1].repeat(4, axis=-1), ).reshape(*S1.shape[:-1], 4) min_dis = np.min(dis, axis=-1) return min_dis else: raise NotImplementedError def VEH_PED_collision(p1, p2, S1, S2): if isinstance(p1, torch.Tensor): mask = torch.logical_or( torch.abs(p1[..., 2]) > 0.1, torch.linalg.norm( p2[..., 2:4], dim=-1) > 0.1 ).detach() theta = p1[..., 2] dx = batch_rotate_2D(p2[..., 0:2] - p1[..., 0:2], -theta) return torch.maximum( torch.abs(dx[..., 0]) - S1[..., 0] / 2 - S2[..., 0] / 2, torch.abs(dx[..., 1]) - S1[..., 1] / 2 - S2[..., 0] / 2, ) elif isinstance(p1, np.ndarray): theta = p1[..., 2] dx = batch_rotate_2D(p2[..., 0:2] - p1[..., 0:2], -theta) return np.maximum( np.abs(dx[..., 0]) - S1[..., 0] / 2 - S2[..., 0] / 2, np.abs(dx[..., 1]) - S1[..., 1] / 2 - S2[..., 0] / 2, ) else: raise NotImplementedError def PED_VEH_collision(p1, p2, S1, S2): return VEH_PED_collision(p2, p1, S2, S1) def batch_proj(x, line): # x:[batch,3], line:[batch,N,3] line_length = line.shape[-2] batch_dim = x.ndim - 1 if isinstance(x, torch.Tensor): delta = line[..., 0:2] - torch.unsqueeze(x[..., 0:2], dim=-2).repeat( *([1] * batch_dim), line_length, 1 ) dis = torch.linalg.norm(delta, axis=-1) idx0 = torch.argmin(dis, dim=-1) idx = idx0.view(*line.shape[:-2], 1, 1).repeat( *([1] * (batch_dim + 1)), line.shape[-1] ) line_min = torch.squeeze(torch.gather(line, -2, idx), dim=-2) dx = x[..., None, 0] - line[..., 0] dy = x[..., None, 1] - line[..., 1] delta_y = -dx * torch.sin(line_min[..., None, 2]) + dy * torch.cos( line_min[..., None, 2] ) delta_x = dx * torch.cos(line_min[..., None, 2]) + dy * torch.sin( line_min[..., None, 2] ) # ref_pts = torch.stack( # [ # line_min[..., 0] + delta_x * torch.cos(line_min[..., 2]), # line_min[..., 1] + delta_x * torch.sin(line_min[..., 2]), # line_min[..., 2], # ], # dim=-1, # ) delta_psi = round_2pi(x[..., 2] - line_min[..., 2]) return ( delta_x, delta_y, torch.unsqueeze(delta_psi, dim=-1), ) elif isinstance(x, np.ndarray): delta = line[..., 0:2] - np.repeat( x[..., np.newaxis, 0:2], line_length, axis=-2 ) dis = np.linalg.norm(delta, axis=-1) idx0 = np.argmin(dis, axis=-1) idx = idx0.reshape(*line.shape[:-2], 1, 1).repeat(line.shape[-1], axis=-1) line_min = np.squeeze(np.take_along_axis(line, idx, axis=-2), axis=-2) dx = x[..., None, 0] - line[..., 0] dy = x[..., None, 1] - line[..., 1] delta_y = -dx * np.sin(line_min[..., None, 2]) + dy * np.cos( line_min[..., None, 2] ) delta_x = dx * np.cos(line_min[..., None, 2]) + dy * np.sin( line_min[..., None, 2] ) # line_min[..., 0] += delta_x * np.cos(line_min[..., 2]) # line_min[..., 1] += delta_x * np.sin(line_min[..., 2]) delta_psi = round_2pi(x[..., 2] - line_min[..., 2]) return ( delta_x, delta_y, np.expand_dims(delta_psi, axis=-1), ) class CollisionType(IntEnum): """This enum defines the three types of collisions: front, rear and side.""" FRONT = 0 REAR = 1 SIDE = 2 def detect_collision( ego_pos: np.ndarray, ego_yaw: np.ndarray, ego_extent: np.ndarray, other_pos: np.ndarray, other_yaw: np.ndarray, other_extent: np.ndarray, ): """ Computes whether a collision occured between ego and any another agent. Also computes the type of collision: rear, front, or side. For this, we compute the intersection of ego's four sides with a target agent and measure the length of this intersection. A collision is classified into a class, if the corresponding length is maximal, i.e. a front collision exhibits the longest intersection with egos front edge. .. note:: please note that this funciton will stop upon finding the first colision, so it won't return all collisions but only the first one found. :param ego_pos: predicted centroid :param ego_yaw: predicted yaw :param ego_extent: predicted extent :param other_pos: target agents :return: None if not collision was found, and a tuple with the collision type and the agent track_id """ from l5kit.planning import utils ego_bbox = utils._get_bounding_box( centroid=ego_pos, yaw=ego_yaw, extent=ego_extent) # within_range_mask = utils.within_range(ego_pos, ego_extent, other_pos, other_extent) for i in range(other_pos.shape[0]): agent_bbox = utils._get_bounding_box( other_pos[i], other_yaw[i], other_extent[i]) if ego_bbox.intersects(agent_bbox): front_side, rear_side, left_side, right_side = utils._get_sides( ego_bbox) intersection_length_per_side = np.asarray( [ agent_bbox.intersection(front_side).length, agent_bbox.intersection(rear_side).length, agent_bbox.intersection(left_side).length, agent_bbox.intersection(right_side).length, ] ) argmax_side = np.argmax(intersection_length_per_side) # Remap here is needed because there are two sides that are # mapped to the same collision type CollisionType.SIDE max_collision_types = max(CollisionType).value remap_argmax = min(argmax_side, max_collision_types) collision_type = CollisionType(remap_argmax) return collision_type, i return None def calc_distance_map(road_flag, max_dis=10): """mark the image with manhattan distance to the drivable area Args: road_flag (torch.Tensor[B,W,H]): an image with 1 channel, 1 for drivable area, 0 for non-drivable area max_dis (int, optional): maximum distance that the result saturates to. Defaults to 10. """ out = torch.zeros_like(road_flag, dtype=torch.float) out[road_flag == 0] = max_dis out[road_flag == 1] = 0 for i in range(max_dis-1): out[..., 1:, :] = torch.min(out[..., 1:, :], out[..., :-1, :]+1) out[..., :-1, :] = torch.min(out[..., :-1, :], out[..., 1:, :]+1) out[..., :, 1:] = torch.min(out[..., :, 1:], out[..., :, :-1]+1) out[..., :, :-1] = torch.min(out[..., :, :-1], out[..., :, 1:]+1) return out