dynamo_ssl / envs /block_pushing /oracles /multimodal_push_oracle.py
jeffacce
initial commit
393d3de
# coding=utf-8
# Copyright 2022 The Reach ML Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Oracle for multimodal pushing task."""
import random
import envs.block_pushing.oracles.oriented_push_oracle as oriented_push_oracle_module
import numpy as np
from tf_agents.trajectories import policy_step
from tf_agents.trajectories import time_step as ts
from tf_agents.typing import types
# Only used for debug visualization.
import pybullet # pylint: disable=unused-import
class MultimodalOrientedPushOracle(oriented_push_oracle_module.OrientedPushOracle):
"""Oracle for multimodal pushing task."""
def __init__(self, env, goal_dist_tolerance=0.04, action_noise_std=0.0):
super(MultimodalOrientedPushOracle, self).__init__(env)
self._goal_dist_tolerance = goal_dist_tolerance
self._action_noise_std = action_noise_std
def reset(self):
self.origin = None
self.first_preblock = None
self.phase = "move_to_pre_block"
def _get_move_to_preblock(self, xy_pre_block, xy_ee):
max_step_velocity = 0.3
# Go 5 cm away from the block, on the line between the block and target.
xy_delta_to_preblock = xy_pre_block - xy_ee
diff = np.linalg.norm(xy_delta_to_preblock)
if diff < 0.001:
self.phase = "move_to_block"
if self.first_preblock is None:
self.first_preblock = np.copy(xy_pre_block)
xy_delta = xy_delta_to_preblock
return xy_delta, max_step_velocity
def _get_action_for_block_target(self, time_step, block="block", target="target"):
# Specifying this as velocity makes it independent of control frequency.
max_step_velocity = 0.35
info = self._get_action_info(time_step, block, target)
if self.origin is None:
self.origin = np.copy(info.xy_ee)
if self.phase == "move_to_pre_block":
xy_delta, max_step_velocity = self._get_move_to_preblock(
info.xy_pre_block, info.xy_ee
)
if self.phase == "return_to_first_preblock":
max_step_velocity = 0.3
if self.first_preblock is None:
self.first_preblock = self.origin
# Return to the first preblock.
xy_delta_to_origin = self.first_preblock - info.xy_ee
diff = np.linalg.norm(xy_delta_to_origin)
if diff < 0.001:
self.phase = "return_to_origin"
xy_delta = xy_delta_to_origin
if self.phase == "return_to_origin":
max_step_velocity = 0.3
# Go 5 cm away from the block, on the line between the block and target.
xy_delta_to_origin = self.origin - info.xy_ee
diff = np.linalg.norm(xy_delta_to_origin)
if diff < 0.001:
self.phase = "move_to_pre_block"
xy_delta = xy_delta_to_origin
if self.phase == "move_to_block":
xy_delta = self._get_move_to_block(
info.xy_delta_to_nexttoblock,
info.theta_threshold_to_orient,
info.theta_error,
)
if self.phase == "push_block":
xy_delta = self._get_push_block(
info.theta_error,
info.theta_threshold_to_orient,
info.xy_delta_to_touchingblock,
)
orient_circle_diameter = 0.025
if self.phase == "orient_block_left" or self.phase == "orient_block_right":
max_step_velocity = 0.15
if self.phase == "orient_block_left":
xy_delta = self._get_orient_block_left(
info.xy_dir_block_to_ee,
orient_circle_diameter,
info.xy_block,
info.xy_ee,
info.theta_error,
info.theta_threshold_flat_enough,
)
if self.phase == "orient_block_right":
xy_delta = self._get_orient_block_right(
info.xy_dir_block_to_ee,
orient_circle_diameter,
info.xy_block,
info.xy_ee,
info.theta_error,
info.theta_threshold_flat_enough,
)
if self._action_noise_std != 0.0:
xy_delta += self._np_random_state.randn(2) * self._action_noise_std
max_step_distance = max_step_velocity * (1 / self._env.get_control_frequency())
length = np.linalg.norm(xy_delta)
if length > max_step_distance:
xy_direction = xy_delta / length
xy_delta = xy_direction * max_step_distance
return xy_delta
def _choose_goal_order(self):
"""Chooses block->target order for multimodal pushing."""
# Define all possible ((first_block, first_target),
# (second_block, second_target)).
possible_orders = [
(("block", "target"), ("block2", "target2")),
(("block", "target2"), ("block2", "target")),
(("block2", "target"), ("block", "target2")),
(("block2", "target2"), ("block", "target")),
]
return random.choice(possible_orders)
def _action(self, time_step, policy_state):
if time_step.is_first():
self.reset()
(
(self._first_block, self._first_target),
(self._second_block, self._second_target),
) = self._choose_goal_order()
self._current_block, self._current_target = (
self._first_block,
self._first_target,
)
self._has_switched = False
def _block_target_dist(block, target):
dist = np.linalg.norm(
time_step.observation["%s_translation" % block]
- time_step.observation["%s_translation" % target]
)
return dist
if (
_block_target_dist(self._first_block, self._first_target)
< self._goal_dist_tolerance
and not self._has_switched
):
# If first block has been pushed to first target, switch to second block.
self._current_block, self._current_target = (
self._second_block,
self._second_target,
)
self._has_switched = True
self.phase = "return_to_first_preblock"
xy_delta = self._get_action_for_block_target(
time_step, block=self._current_block, target=self._current_target
)
return policy_step.PolicyStep(action=np.asarray(xy_delta, dtype=np.float32))