|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Oracle for multimodal pushing task.""" |
|
import random |
|
|
|
import envs.block_pushing.oracles.oriented_push_oracle as oriented_push_oracle_module |
|
import numpy as np |
|
from tf_agents.trajectories import policy_step |
|
from tf_agents.trajectories import time_step as ts |
|
from tf_agents.typing import types |
|
|
|
|
|
import pybullet |
|
|
|
|
|
class MultimodalOrientedPushOracle(oriented_push_oracle_module.OrientedPushOracle): |
|
"""Oracle for multimodal pushing task.""" |
|
|
|
def __init__(self, env, goal_dist_tolerance=0.04, action_noise_std=0.0): |
|
super(MultimodalOrientedPushOracle, self).__init__(env) |
|
self._goal_dist_tolerance = goal_dist_tolerance |
|
self._action_noise_std = action_noise_std |
|
|
|
def reset(self): |
|
self.origin = None |
|
self.first_preblock = None |
|
self.phase = "move_to_pre_block" |
|
|
|
def _get_move_to_preblock(self, xy_pre_block, xy_ee): |
|
max_step_velocity = 0.3 |
|
|
|
xy_delta_to_preblock = xy_pre_block - xy_ee |
|
diff = np.linalg.norm(xy_delta_to_preblock) |
|
if diff < 0.001: |
|
self.phase = "move_to_block" |
|
if self.first_preblock is None: |
|
self.first_preblock = np.copy(xy_pre_block) |
|
xy_delta = xy_delta_to_preblock |
|
return xy_delta, max_step_velocity |
|
|
|
def _get_action_for_block_target(self, time_step, block="block", target="target"): |
|
|
|
max_step_velocity = 0.35 |
|
|
|
info = self._get_action_info(time_step, block, target) |
|
|
|
if self.origin is None: |
|
self.origin = np.copy(info.xy_ee) |
|
|
|
if self.phase == "move_to_pre_block": |
|
xy_delta, max_step_velocity = self._get_move_to_preblock( |
|
info.xy_pre_block, info.xy_ee |
|
) |
|
|
|
if self.phase == "return_to_first_preblock": |
|
max_step_velocity = 0.3 |
|
if self.first_preblock is None: |
|
self.first_preblock = self.origin |
|
|
|
xy_delta_to_origin = self.first_preblock - info.xy_ee |
|
diff = np.linalg.norm(xy_delta_to_origin) |
|
if diff < 0.001: |
|
self.phase = "return_to_origin" |
|
xy_delta = xy_delta_to_origin |
|
|
|
if self.phase == "return_to_origin": |
|
max_step_velocity = 0.3 |
|
|
|
xy_delta_to_origin = self.origin - info.xy_ee |
|
diff = np.linalg.norm(xy_delta_to_origin) |
|
if diff < 0.001: |
|
self.phase = "move_to_pre_block" |
|
xy_delta = xy_delta_to_origin |
|
|
|
if self.phase == "move_to_block": |
|
xy_delta = self._get_move_to_block( |
|
info.xy_delta_to_nexttoblock, |
|
info.theta_threshold_to_orient, |
|
info.theta_error, |
|
) |
|
|
|
if self.phase == "push_block": |
|
xy_delta = self._get_push_block( |
|
info.theta_error, |
|
info.theta_threshold_to_orient, |
|
info.xy_delta_to_touchingblock, |
|
) |
|
|
|
orient_circle_diameter = 0.025 |
|
|
|
if self.phase == "orient_block_left" or self.phase == "orient_block_right": |
|
max_step_velocity = 0.15 |
|
|
|
if self.phase == "orient_block_left": |
|
xy_delta = self._get_orient_block_left( |
|
info.xy_dir_block_to_ee, |
|
orient_circle_diameter, |
|
info.xy_block, |
|
info.xy_ee, |
|
info.theta_error, |
|
info.theta_threshold_flat_enough, |
|
) |
|
|
|
if self.phase == "orient_block_right": |
|
xy_delta = self._get_orient_block_right( |
|
info.xy_dir_block_to_ee, |
|
orient_circle_diameter, |
|
info.xy_block, |
|
info.xy_ee, |
|
info.theta_error, |
|
info.theta_threshold_flat_enough, |
|
) |
|
|
|
if self._action_noise_std != 0.0: |
|
xy_delta += self._np_random_state.randn(2) * self._action_noise_std |
|
|
|
max_step_distance = max_step_velocity * (1 / self._env.get_control_frequency()) |
|
length = np.linalg.norm(xy_delta) |
|
if length > max_step_distance: |
|
xy_direction = xy_delta / length |
|
xy_delta = xy_direction * max_step_distance |
|
return xy_delta |
|
|
|
def _choose_goal_order(self): |
|
"""Chooses block->target order for multimodal pushing.""" |
|
|
|
|
|
possible_orders = [ |
|
(("block", "target"), ("block2", "target2")), |
|
(("block", "target2"), ("block2", "target")), |
|
(("block2", "target"), ("block", "target2")), |
|
(("block2", "target2"), ("block", "target")), |
|
] |
|
return random.choice(possible_orders) |
|
|
|
def _action(self, time_step, policy_state): |
|
if time_step.is_first(): |
|
self.reset() |
|
( |
|
(self._first_block, self._first_target), |
|
(self._second_block, self._second_target), |
|
) = self._choose_goal_order() |
|
self._current_block, self._current_target = ( |
|
self._first_block, |
|
self._first_target, |
|
) |
|
self._has_switched = False |
|
|
|
def _block_target_dist(block, target): |
|
dist = np.linalg.norm( |
|
time_step.observation["%s_translation" % block] |
|
- time_step.observation["%s_translation" % target] |
|
) |
|
return dist |
|
|
|
if ( |
|
_block_target_dist(self._first_block, self._first_target) |
|
< self._goal_dist_tolerance |
|
and not self._has_switched |
|
): |
|
|
|
self._current_block, self._current_target = ( |
|
self._second_block, |
|
self._second_target, |
|
) |
|
self._has_switched = True |
|
self.phase = "return_to_first_preblock" |
|
|
|
xy_delta = self._get_action_for_block_target( |
|
time_step, block=self._current_block, target=self._current_target |
|
) |
|
|
|
return policy_step.PolicyStep(action=np.asarray(xy_delta, dtype=np.float32)) |
|
|