Spaces:
Starting
on
T4
Starting
on
T4
import math | |
import random | |
import numpy as np | |
from trajdata.maps import VectorMap | |
from submodules.Pplan.Sampling.spline_planner import SplinePlanner | |
import torch | |
import time | |
import math | |
from copy import deepcopy | |
from utils.dynamic_utils import unicycle | |
def constant_tracking(state, path, dt): | |
''' | |
Args: | |
state: current state of the vehicle, of size [x, y, yaw, speed] | |
path: the path to follow, of size (N, [x, y, yaw]) | |
dt: time duration | |
''' | |
# find the nearest point in the path | |
dists = torch.norm(path[:, :2] - state[None, :2], dim=1) | |
nearest_index = torch.argmin(dists) | |
# find the target point | |
lookahead_distance = state[3] * dt | |
target = path[-1] | |
is_end = True | |
for i in range(nearest_index + 1, len(path)): | |
if torch.norm(path[i, :2] - state[:2]) > lookahead_distance: | |
target = path[i] | |
is_end = False | |
break | |
# compute the new state | |
target_distance = torch.norm(target[:2] - state[:2]) | |
ratio = lookahead_distance / target_distance.clamp(min=1e-6) | |
ratio = ratio.clamp(max=1.0) | |
new_state = deepcopy(state) | |
new_state[:2] = state[:2] + ratio * (target[:2] - state[:2]) | |
new_state[2] = torch.atan2( | |
state[2].sin() + ratio * (target[2].sin() - state[2].sin()), | |
state[2].cos() + ratio * (target[2].cos() - state[2].cos()) | |
) | |
if is_end: | |
new_state[3] = 0 | |
return new_state | |
def constant_headaway(states, num_steps, dt): | |
''' | |
Args: | |
states: current states of a batch of vehicles, of size (num_agents, [x, y, yaw, speed]) | |
num_steps: number of steps to move forward | |
dt: time duration | |
Return: | |
trajs: the trajectories of the vehicles, of size (num_agents, num_steps, [x, y, yaw, speed]) | |
''' | |
# state: [x, y, yaw, speed] | |
x = states[:, 0] | |
y = states[:, 1] | |
yaw = states[:, 2] | |
speed = states[:, 3] | |
# Generate time steps | |
t_steps = torch.arange(num_steps) * dt | |
# Calculate dx and dy for each step | |
dx = torch.outer(speed * torch.sin(yaw), t_steps) | |
dy = torch.outer(speed * torch.cos(yaw), t_steps) | |
# Update x and y positions | |
x_traj = x.unsqueeze(1) + dx | |
y_traj = y.unsqueeze(1) + dy | |
# Replicate the yaw and speed for each time step | |
yaw_traj = yaw.unsqueeze(1).repeat(1, num_steps) | |
speed_traj = speed.unsqueeze(1).repeat(1, num_steps) | |
# Stack the x, y, yaw, and speed components to form the trajectory | |
trajs = torch.stack((x_traj, y_traj, yaw_traj, speed_traj), dim=-1) | |
return trajs | |
class IDM: | |
def __init__( | |
self, v0=30.0, s0=5.0, T=2.0, a=2.0, b=4.0, delta=4.0, | |
lookahead_path_length=100, lead_distance_threshold=1.0 | |
): | |
''' | |
Args: | |
v0: desired speed | |
s0: minimum gap | |
T: safe time headway | |
a: max acceleration | |
b: comfortable deceleration | |
delta: acceleration exponent | |
lookahead_path_length: the length of path to look ahead | |
lead_distance_threshold: the distance to consider a vehicle as a lead vehicle | |
''' | |
self.v0 = v0 | |
self.s0 = s0 | |
self.T = T | |
self.a = a | |
self.b = b | |
self.delta = delta | |
self.lookahead_path_length = lookahead_path_length | |
self.lead_distance_threshold = lead_distance_threshold | |
def update(self, state, path, dt, neighbors): | |
''' | |
Args: | |
state: current state of the vehicle, of size [x, y, yaw, speed] | |
path: the path to follow, of size (N, [x, y, yaw]) | |
dt: time duration | |
neighbors: the future states of the neighbors, of size (K, T, [x, y, yaw, speed]) | |
''' | |
if path is None: | |
return deepcopy(state) | |
# find the nearest point in the path | |
dists = torch.norm(path[:, :2] - state[None, :2], dim=1) | |
nearest_index = torch.argmin(dists) | |
# lookahead_distance = state[3] * dt | |
# lookahead_targe = state[:2] + np.array([np.sin(state[2]) * lookahead_distance, np.cos(state[2]) * lookahead_distance]) | |
# # target = path[-1] | |
# is_end = False | |
# target_idx = torch.argmin(torch.norm(path[:, :2] - lookahead_targe, dim=-1)) | |
# target = path[target_idx] | |
# find the target point | |
lookahead_distance = state[3] * dt | |
target = path[-1] | |
is_end = True | |
for i in range(nearest_index + 1, len(path)): | |
if torch.norm(path[i, :2] - state[:2]) > lookahead_distance: | |
target = path[i] | |
is_end = False | |
break | |
# distance between neighbors and the path | |
lookahead_path = path[nearest_index + 1:][:self.lookahead_path_length] | |
lookahead_neighbors = neighbors[..., None, :].expand( | |
-1, -1, lookahead_path.shape[0], -1 | |
) # (K, T, n, 4) | |
dists_neighbors = torch.norm( | |
lookahead_neighbors[..., :2] - lookahead_path[None, None, :, :2], dim=-1 | |
) # (K, T, n) | |
indices_neighbors = torch.arange( | |
lookahead_path.shape[0] | |
)[None, None].expand_as(dists_neighbors) | |
# determine lead vehicles | |
is_lead = (dists_neighbors < self.lead_distance_threshold) | |
if is_lead.any(): | |
# compute lead distance | |
indices_lead = indices_neighbors[is_lead] # (num_lead) | |
lookahead_lengths = torch.cumsum(torch.norm( | |
lookahead_path[1:, :2] - lookahead_path[:-1, :2], dim=1 | |
), dim=0) | |
lookahead_lengths = torch.cat([lookahead_lengths, lookahead_lengths[-1:]]) | |
lead_distance = lookahead_lengths[indices_lead] | |
# compute lead speed | |
states_lead = lookahead_neighbors[is_lead] # (num_lead, 4) | |
ori_speed_lead = states_lead[:, 3] | |
yaw_lead = states_lead[:, 2] | |
yaw_path = lookahead_path[indices_lead, 2] | |
lead_speed = ori_speed_lead * (yaw_lead - yaw_path).cos() | |
# compute acceleration | |
ego_speed = state[3] | |
delta_v = ego_speed - lead_speed | |
s_star = self.s0 + \ | |
(ego_speed * self.T + ego_speed * delta_v / (2 * math.sqrt(self.a * self.b))).clamp(min=0) | |
acceleration = self.a * (1 - (ego_speed / self.v0) ** self.delta - (s_star / lead_distance) ** 2) | |
acceleration = acceleration.min() | |
else: | |
acceleration = self.a * (1 - (state[3] / self.v0) ** self.delta) | |
# compute the new state | |
target_distance = torch.norm(target[:2] - state[:2]) | |
ratio = lookahead_distance / target_distance.clamp(min=1e-6) | |
ratio = ratio.clamp(max=1.0) | |
new_state = deepcopy(state) | |
new_state[:2] = state[:2] + ratio * (target[:2] - state[:2]) | |
new_state[2] = torch.atan2( | |
state[2].sin() + ratio * (target[2].sin() - state[2].sin()), | |
state[2].cos() + ratio * (target[2].cos() - state[2].cos()) | |
) | |
if is_end: | |
new_state[3] = 0 | |
else: | |
new_state[3] = (state[3] + acceleration * dt).clamp(min=0) | |
return new_state | |
class AttackPlanner: | |
def __init__(self, pred_steps=20, ATTACK_FREQ = 3, best_k=1, device='cpu'): | |
self.device = device | |
self.predict_steps = pred_steps | |
self.best_k = best_k | |
self.planner = SplinePlanner( | |
device, | |
N_seg=self.predict_steps, | |
acce_grid=torch.linspace(-2, 5, 10).to(self.device), | |
acce_bound=[-6, 5], | |
vbound=[-2, 50] | |
) | |
self.planner.psi_bound = [-math.pi * 2, math.pi * 2] | |
self.exec_traj = None | |
self.exec_pointer = 1 | |
def update( | |
self, state, unified_map, dt, | |
neighbors, attacked_states, | |
new_plan=True | |
): | |
''' | |
Args: | |
state: current state of the vehicle, of size [x, y, yaw, speed] | |
vector_map: the vector map | |
attacked_states: future states of the attacked agent, of size (T, [x, y, yaw, speed]) | |
neighbors: future states of the neighbors, of size (K, T, [x, y, yaw, speed]) | |
new_plan: whether to generate a new plan | |
''' | |
assert self.exec_pointer > 0 | |
# directly execute the current plan | |
if not new_plan: | |
if self.exec_traj is not None and \ | |
self.exec_pointer < self.exec_traj.shape[0]: | |
next_state = self.exec_traj[self.exec_pointer] | |
self.exec_pointer += 1 | |
return next_state | |
else: | |
new_plan = True | |
assert attacked_states.shape[0] == self.predict_steps | |
# state: [x, y, yaw, speed] | |
x, y, yaw, speed = state | |
# query vector map to get lanes | |
query_xyzr = np.array([x, y, 0, yaw + np.pi / 2]) | |
# query_xyzr = unified_map.xyzr_local2world(np.array([x, y, 0, yaw])) | |
# lanes = unified_map.vector_map.get_lanes_within(query_xyzr[:3], dist=30) | |
# lanes = [unified_map.batch_xyzr_world2local(l.center.xyzh)[:, [0,1,3]] for l in lanes] | |
# lanes = [l.center.xyzh[:, [0,1,3]] for l in lanes] | |
lanes = None | |
# for lane in lanes: | |
# plt.plot(lane[:, 0], lane[:, 1], 'k--', linewidth=0.5, alpha=0.5) | |
# generate spline trajectories | |
x0 = torch.tensor([query_xyzr[0], query_xyzr[1], speed, query_xyzr[3]], device=self.device) | |
possible_trajs, xf_set = self.planner.gen_trajectories(x0, self.predict_steps * dt, lanes, | |
dyn_filter=True) # (num_trajs, T-1, [x, y, v, a, yaw, r, t]) | |
if possible_trajs.shape[0] == 0: | |
trajs = constant_headaway(state[None], self.predict_steps, dt) # (1, T, [x, y, yaw, speed]) | |
else: | |
trajs = torch.cat([ | |
state[None, None].expand(possible_trajs.shape[0], -1, -1), | |
possible_trajs[..., [0, 1, 4, 2]] | |
], dim=1) | |
# select the best trajectory | |
attack_distance = torch.norm(attacked_states[None, :, :2] - trajs[..., :2], dim=-1) | |
cost_attack = attack_distance.min(dim=1).values | |
cost_collision = ( | |
torch.norm(neighbors[None, ..., :2] - trajs[:, None, :, :2], dim=-1).min(dim=-1).values < 2.0).sum( | |
dim=-1) | |
cost = cost_attack + 0.1 * cost_collision | |
values, indices = torch.topk(cost, self.best_k, largest=False) | |
random_index = torch.randint(0, self.best_k, (1,)).item() | |
selected_index = indices[random_index] | |
traj_best = trajs[selected_index] | |
# produce next state | |
self.exec_traj = traj_best | |
self.exec_traj[:, 2] -= np.pi / 2 | |
self.exec_pointer = 1 | |
next_state = self.exec_traj[self.exec_pointer] | |
# next_state[0] = -next_state[0] | |
self.exec_pointer += 1 | |
return next_state | |
class ConstantPlanner: | |
def __init__(self): | |
return | |
def update(self, state, dt): | |
a, b, yaw, v = state | |
a = a - v * np.sin(yaw) * dt | |
b = b + v * np.cos(yaw) * dt | |
return torch.tensor([a, b, yaw, v]) | |
class UnicyclePlanner: | |
def __init__(self, uc_path, speed=1.0): | |
self.uc_model = unicycle.restore(torch.load(uc_path, weights_only=False)) | |
self.t = 0 | |
self.speed = speed | |
def update(self, dt): | |
self.t += dt * self.speed | |
a, b, v, pitchroll, yaw, h = self.uc_model.forward(self.t) | |
# return torch.tensor([a, b, yaw, v]), pitchroll.detach().cpu(), h.item() | |
return torch.tensor([a, b, yaw, v]) | |