|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Multimodal block environments for the XArm.""" |
|
|
|
import einops |
|
import collections |
|
import logging |
|
import math |
|
from typing import Dict, List |
|
|
|
from gym import spaces |
|
from gym.envs import registration |
|
from . import block_pushing |
|
from .utils import utils_pybullet |
|
from .utils.pose3d import Pose3d |
|
from .utils.utils_pybullet import ObjState |
|
from .utils.utils_pybullet import XarmState |
|
import numpy as np |
|
from scipy.spatial import transform |
|
import pybullet |
|
import pybullet_utils.bullet_client as bullet_client |
|
|
|
import torch |
|
|
|
|
|
BLOCK2_URDF_PATH = "third_party/py/envs/assets/block2.urdf" |
|
ZONE2_URDF_PATH = "third_party/py/envs/assets/zone2.urdf" |
|
|
|
|
|
MIN_BLOCK_DIST = 0.1 |
|
MIN_TARGET_DIST = 0.12 |
|
|
|
NUM_RESET_ATTEMPTS = 1000 |
|
|
|
|
|
RANDOM_X_SHIFT = 0.1 |
|
RANDOM_Y_SHIFT = 0.15 |
|
|
|
logging.basicConfig( |
|
level="INFO", |
|
format="%(asctime)s [%(levelname)s] %(message)s", |
|
filemode="w", |
|
) |
|
logger = logging.getLogger() |
|
|
|
|
|
def build_env_name(task, shared_memory, use_image_obs): |
|
"""Construct the env name from parameters.""" |
|
del task |
|
env_name = "BlockPushMultimodal" |
|
|
|
if use_image_obs: |
|
env_name = env_name + "Rgb" |
|
|
|
if shared_memory: |
|
env_name = "Shared" + env_name |
|
|
|
env_name = env_name + "-v0" |
|
|
|
return env_name |
|
|
|
|
|
class BlockPushMultimodal(block_pushing.BlockPush): |
|
"""2 blocks, 2 targets.""" |
|
|
|
def __init__( |
|
self, |
|
control_frequency=10.0, |
|
task=block_pushing.BlockTaskVariant.PUSH, |
|
image_size=(224, 224), |
|
shared_memory=False, |
|
seed=None, |
|
goal_dist_tolerance=0.05, |
|
): |
|
"""Creates an env instance. |
|
|
|
Args: |
|
control_frequency: Control frequency for the arm. Each env step will |
|
advance the simulation by 1/control_frequency seconds. |
|
task: enum for which task, see BlockTaskVariant enum. |
|
image_size: Optional image size (height, width). If None, no image |
|
observations will be used. |
|
shared_memory: If True `pybullet.SHARED_MEMORY` is used to connect to |
|
pybullet. Useful to debug. |
|
seed: Optional seed for the environment. |
|
goal_dist_tolerance: float, how far away from the goal to terminate. |
|
""" |
|
self._target_ids = None |
|
self._target_poses = None |
|
super(BlockPushMultimodal, self).__init__( |
|
control_frequency=control_frequency, |
|
task=task, |
|
image_size=image_size, |
|
shared_memory=shared_memory, |
|
seed=seed, |
|
goal_dist_tolerance=goal_dist_tolerance, |
|
) |
|
self._init_distance = [-1.0, -1.0] |
|
self._in_target = [[-1.0, -1.0], [-1.0, -1.0]] |
|
self._first_move = [-1, -1] |
|
self._step_num = 0 |
|
self.moved = 0 |
|
self.entered = 0 |
|
|
|
@property |
|
def target_poses(self): |
|
return self._target_poses |
|
|
|
def get_goal_translation(self): |
|
"""Return the translation component of the goal (2D).""" |
|
if self._target_poses: |
|
return [i.translation for i in self._target_poses] |
|
else: |
|
return None |
|
|
|
def _setup_pybullet_scene(self): |
|
self._pybullet_client = bullet_client.BulletClient(self._connection_mode) |
|
|
|
|
|
pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_RENDERING, 0) |
|
|
|
self._setup_workspace_and_robot() |
|
|
|
self._target_ids = [ |
|
utils_pybullet.load_urdf(self._pybullet_client, i, useFixedBase=True) |
|
for i in [block_pushing.ZONE_URDF_PATH, ZONE2_URDF_PATH] |
|
] |
|
self._block_ids = [] |
|
for i in [block_pushing.BLOCK_URDF_PATH, BLOCK2_URDF_PATH]: |
|
self._block_ids.append( |
|
utils_pybullet.load_urdf(self._pybullet_client, i, useFixedBase=False) |
|
) |
|
|
|
|
|
pybullet.configureDebugVisualizer(pybullet.COV_ENABLE_RENDERING, 1) |
|
|
|
self.step_simulation_to_stabilize() |
|
|
|
def _reset_block_poses(self, workspace_center_x): |
|
"""Resets block poses.""" |
|
|
|
|
|
def _reset_block_pose(idx, add=0.0, avoid=None): |
|
def _get_random_translation(): |
|
block_x = ( |
|
workspace_center_x |
|
+ add |
|
+ self._rng.uniform(low=-RANDOM_X_SHIFT, high=RANDOM_X_SHIFT) |
|
) |
|
block_y = -0.2 + self._rng.uniform( |
|
low=-RANDOM_Y_SHIFT, high=RANDOM_Y_SHIFT |
|
) |
|
block_translation = np.array([block_x, block_y, 0]) |
|
return block_translation |
|
|
|
if avoid is None: |
|
block_translation = _get_random_translation() |
|
else: |
|
|
|
for _ in range(NUM_RESET_ATTEMPTS): |
|
block_translation = _get_random_translation() |
|
dist = np.linalg.norm(block_translation[0] - avoid[0]) |
|
|
|
if dist > MIN_BLOCK_DIST: |
|
break |
|
block_sampled_angle = self._rng.uniform(math.pi) |
|
block_rotation = transform.Rotation.from_rotvec([0, 0, block_sampled_angle]) |
|
self._pybullet_client.resetBasePositionAndOrientation( |
|
self._block_ids[idx], |
|
block_translation.tolist(), |
|
block_rotation.as_quat().tolist(), |
|
) |
|
return block_translation |
|
|
|
|
|
for _ in range(NUM_RESET_ATTEMPTS): |
|
|
|
b0_translation = _reset_block_pose(0) |
|
|
|
b1_translation = _reset_block_pose(1, avoid=b0_translation) |
|
dist = np.linalg.norm(b0_translation[0] - b1_translation[0]) |
|
if dist > MIN_BLOCK_DIST: |
|
break |
|
else: |
|
raise ValueError("could not find matching block") |
|
assert dist > MIN_BLOCK_DIST |
|
|
|
def _reset_target_poses(self, workspace_center_x): |
|
"""Resets target poses.""" |
|
|
|
def _reset_target_pose(idx, add=0.0, avoid=None): |
|
def _get_random_translation(): |
|
|
|
target_x = ( |
|
workspace_center_x |
|
+ add |
|
+ self._rng.uniform( |
|
low=-0.05 * RANDOM_X_SHIFT, high=0.05 * RANDOM_X_SHIFT |
|
) |
|
) |
|
target_y = 0.2 + self._rng.uniform( |
|
low=-0.05 * RANDOM_Y_SHIFT, high=0.05 * RANDOM_Y_SHIFT |
|
) |
|
target_translation = np.array([target_x, target_y, 0.020]) |
|
return target_translation |
|
|
|
if avoid is None: |
|
target_translation = _get_random_translation() |
|
else: |
|
|
|
for _ in range(NUM_RESET_ATTEMPTS): |
|
target_translation = _get_random_translation() |
|
dist = np.linalg.norm(target_translation[0] - avoid[0]) |
|
|
|
if dist > MIN_TARGET_DIST: |
|
break |
|
target_sampled_angle = math.pi + self._rng.uniform( |
|
low=-math.pi / 30, high=math.pi / 30 |
|
) |
|
target_rotation = transform.Rotation.from_rotvec( |
|
[0, 0, target_sampled_angle] |
|
) |
|
self._pybullet_client.resetBasePositionAndOrientation( |
|
self._target_ids[idx], |
|
target_translation.tolist(), |
|
target_rotation.as_quat().tolist(), |
|
) |
|
self._target_poses[idx] = Pose3d( |
|
rotation=target_rotation, translation=target_translation |
|
) |
|
|
|
if self._target_poses is None: |
|
self._target_poses = [None for _ in range(len(self._target_ids))] |
|
|
|
for _ in range(NUM_RESET_ATTEMPTS): |
|
|
|
add = 0.12 * np.random.choice([-1, 1]) |
|
|
|
_reset_target_pose(0, add=add) |
|
_reset_target_pose(1, add=-add, avoid=self._target_poses[0].translation) |
|
dist = np.linalg.norm( |
|
self._target_poses[0].translation[0] |
|
- self._target_poses[1].translation[0] |
|
) |
|
if dist > MIN_TARGET_DIST: |
|
break |
|
else: |
|
raise ValueError("could not find matching target") |
|
assert dist > MIN_TARGET_DIST |
|
|
|
def _reset_object_poses(self, workspace_center_x, workspace_center_y): |
|
|
|
self._reset_block_poses(workspace_center_x) |
|
|
|
|
|
self._reset_target_poses(workspace_center_x) |
|
|
|
self._init_distance = [-1.0, -1.0] |
|
self._in_target = [[-1.0, -1.0], [-1.0, -1.0]] |
|
self._step_num = 0 |
|
|
|
def reset(self, reset_poses=True): |
|
workspace_center_x = 0.4 |
|
workspace_center_y = 0.0 |
|
self.moved = 0 |
|
self.entered = 0 |
|
|
|
if reset_poses: |
|
self._pybullet_client.restoreState(self._saved_state) |
|
|
|
rotation = transform.Rotation.from_rotvec([0, math.pi, 0]) |
|
translation = np.array([0.3, -0.4, block_pushing.EFFECTOR_HEIGHT]) |
|
starting_pose = Pose3d(rotation=rotation, translation=translation) |
|
self._set_robot_target_effector_pose(starting_pose) |
|
self._reset_object_poses(workspace_center_x, workspace_center_y) |
|
|
|
else: |
|
self._target_poses = [ |
|
self._get_target_pose(idx) for idx in self._target_ids |
|
] |
|
|
|
if reset_poses: |
|
self.step_simulation_to_stabilize() |
|
|
|
state = self._compute_state() |
|
self._previous_state = state |
|
return state |
|
|
|
def _get_target_pose(self, idx): |
|
( |
|
target_translation, |
|
target_orientation_quat, |
|
) = self._pybullet_client.getBasePositionAndOrientation(idx) |
|
target_rotation = transform.Rotation.from_quat(target_orientation_quat) |
|
target_translation = np.array(target_translation) |
|
return Pose3d(rotation=target_rotation, translation=target_translation) |
|
|
|
def _compute_reach_target(self, state): |
|
xy_block = state["block_translation"] |
|
xy_target = state["target_translation"] |
|
|
|
xy_block_to_target = xy_target - xy_block |
|
xy_dir_block_to_target = (xy_block_to_target) / np.linalg.norm( |
|
xy_block_to_target |
|
) |
|
self.reach_target_translation = xy_block + -1 * xy_dir_block_to_target * 0.05 |
|
|
|
def _compute_state(self): |
|
effector_pose = self._robot.forward_kinematics() |
|
|
|
def _get_block_pose(idx): |
|
block_position_and_orientation = ( |
|
self._pybullet_client.getBasePositionAndOrientation( |
|
self._block_ids[idx] |
|
) |
|
) |
|
block_pose = Pose3d( |
|
rotation=transform.Rotation.from_quat( |
|
block_position_and_orientation[1] |
|
), |
|
translation=block_position_and_orientation[0], |
|
) |
|
return block_pose |
|
|
|
block_poses = [_get_block_pose(i) for i in range(len(self._block_ids))] |
|
|
|
def _yaw_from_pose(pose): |
|
return np.array([pose.rotation.as_euler("xyz", degrees=False)[-1]]) |
|
|
|
obs = collections.OrderedDict( |
|
block_translation=block_poses[0].translation[0:2], |
|
block_orientation=_yaw_from_pose(block_poses[0]), |
|
block2_translation=block_poses[1].translation[0:2], |
|
block2_orientation=_yaw_from_pose(block_poses[1]), |
|
effector_translation=effector_pose.translation[0:2], |
|
effector_target_translation=self._target_effector_pose.translation[0:2], |
|
target_translation=self._target_poses[0].translation[0:2], |
|
target_orientation=_yaw_from_pose(self._target_poses[0]), |
|
target2_translation=self._target_poses[1].translation[0:2], |
|
target2_orientation=_yaw_from_pose(self._target_poses[1]), |
|
) |
|
|
|
for i in range(2): |
|
new_distance = np.linalg.norm( |
|
block_poses[i].translation[0:2] |
|
) |
|
if self._init_distance[i] == -1: |
|
self._init_distance[i] = new_distance |
|
else: |
|
if self._init_distance[i] != 100: |
|
if np.abs(new_distance - self._init_distance[i]) > 1e-3: |
|
logger.info(f"Block {i} moved on step {self._step_num}") |
|
self.moved += 1 |
|
self._init_distance[i] = 100 |
|
|
|
self._step_num += 1 |
|
return obs |
|
|
|
def step(self, action): |
|
self._step_robot_and_sim(action) |
|
|
|
state = self._compute_state() |
|
done = False |
|
reward = self._get_reward(state) |
|
if reward >= 0.5: |
|
|
|
done = True |
|
return state, reward, done, {} |
|
|
|
def _get_reward(self, state): |
|
|
|
targets = ["target", "target2"] |
|
|
|
def _block_target_dist(block, target): |
|
return np.linalg.norm( |
|
state["%s_translation" % block] - state["%s_translation" % target] |
|
) |
|
|
|
def _closest_target(block): |
|
|
|
dists = [_block_target_dist(block, t) for t in targets] |
|
|
|
closest_target = targets[np.argmin(dists)] |
|
closest_dist = np.min(dists) |
|
|
|
in_target = closest_dist < self.goal_dist_tolerance |
|
return closest_target, in_target |
|
|
|
blocks = ["block", "block2"] |
|
|
|
reward = 0.0 |
|
|
|
for t_i, t in enumerate(targets): |
|
for b_i, b in enumerate(blocks): |
|
if self._in_target[t_i][b_i] == -1: |
|
dist = _block_target_dist(b, t) |
|
if dist < self.goal_dist_tolerance: |
|
self._in_target[t_i][b_i] = 0 |
|
logger.info( |
|
f"Block {b_i} entered target {t_i} on step {self._step_num}" |
|
) |
|
self.entered += 1 |
|
reward += 0.49 |
|
|
|
b0_closest_target, b0_in_target = _closest_target("block") |
|
b1_closest_target, b1_in_target = _closest_target("block2") |
|
|
|
if b0_in_target and b1_in_target and (b0_closest_target != b1_closest_target): |
|
reward = 0.51 |
|
return reward |
|
|
|
def _compute_goal_distance(self, state): |
|
blocks = ["block", "block2"] |
|
|
|
def _target_block_dist(target, block): |
|
return np.linalg.norm( |
|
state["%s_translation" % block] - state["%s_translation" % target] |
|
) |
|
|
|
def _closest_block_dist(target): |
|
dists = [_target_block_dist(target, b) for b in blocks] |
|
closest_dist = np.min(dists) |
|
return closest_dist |
|
|
|
t0_closest_dist = _closest_block_dist("target") |
|
t1_closest_dist = _closest_block_dist("target2") |
|
return np.mean([t0_closest_dist, t1_closest_dist]) |
|
|
|
@property |
|
def succeeded(self): |
|
state = self._compute_state() |
|
reward = self._get_reward(state) |
|
if reward >= 0.5: |
|
return True |
|
return False |
|
|
|
def _create_observation_space(self, image_size): |
|
pi2 = math.pi * 2 |
|
|
|
obs_dict = collections.OrderedDict( |
|
block_translation=spaces.Box(low=-5, high=5, shape=(2,)), |
|
block_orientation=spaces.Box(low=-pi2, high=pi2, shape=(1,)), |
|
block2_translation=spaces.Box(low=-5, high=5, shape=(2,)), |
|
block2_orientation=spaces.Box(low=-pi2, high=pi2, shape=(1,)), |
|
effector_translation=spaces.Box( |
|
low=block_pushing.WORKSPACE_BOUNDS[0] - 0.1, |
|
high=block_pushing.WORKSPACE_BOUNDS[1] + 0.1, |
|
), |
|
effector_target_translation=spaces.Box( |
|
low=block_pushing.WORKSPACE_BOUNDS[0] - 0.1, |
|
high=block_pushing.WORKSPACE_BOUNDS[1] + 0.1, |
|
), |
|
target_translation=spaces.Box(low=-5, high=5, shape=(2,)), |
|
target_orientation=spaces.Box( |
|
low=-pi2, |
|
high=pi2, |
|
shape=(1,), |
|
), |
|
target2_translation=spaces.Box(low=-5, high=5, shape=(2,)), |
|
target2_orientation=spaces.Box( |
|
low=-pi2, |
|
high=pi2, |
|
shape=(1,), |
|
), |
|
) |
|
if image_size is not None: |
|
obs_dict["rgb"] = spaces.Box( |
|
low=0, high=255, shape=(image_size[0], image_size[1], 3), dtype=np.uint8 |
|
) |
|
return spaces.Dict(obs_dict) |
|
|
|
def get_pybullet_state(self): |
|
"""Save pybullet state of the scene. |
|
|
|
Returns: |
|
dict containing 'robots', 'robot_end_effectors', 'targets', 'objects', |
|
each containing a list of ObjState. |
|
""" |
|
state: Dict[str, List[ObjState]] = {} |
|
|
|
state["robots"] = [ |
|
XarmState.get_bullet_state( |
|
self._pybullet_client, |
|
self.robot.xarm, |
|
target_effector_pose=self._target_effector_pose, |
|
goal_translation=None, |
|
) |
|
] |
|
|
|
state["robot_end_effectors"] = [] |
|
if self.robot.end_effector: |
|
state["robot_end_effectors"].append( |
|
ObjState.get_bullet_state( |
|
self._pybullet_client, self.robot.end_effector |
|
) |
|
) |
|
|
|
state["targets"] = [] |
|
if self._target_ids: |
|
for target_id in self._target_ids: |
|
state["targets"].append( |
|
ObjState.get_bullet_state(self._pybullet_client, target_id) |
|
) |
|
|
|
state["objects"] = [] |
|
for obj_id in self.get_obj_ids(): |
|
state["objects"].append( |
|
ObjState.get_bullet_state(self._pybullet_client, obj_id) |
|
) |
|
|
|
return state |
|
|
|
def set_pybullet_state(self, state): |
|
"""Restore pyullet state. |
|
|
|
WARNING: py_environment wrapper assumes environments aren't reset in their |
|
constructor and will often reset the environment unintentionally. It is |
|
always recommeneded that you call env.reset on the tfagents wrapper before |
|
playback (replaying pybullet_state). |
|
|
|
Args: |
|
state: dict containing 'robots', 'robot_end_effectors', 'targets', |
|
'objects', each containing a list of ObjState. |
|
""" |
|
|
|
assert isinstance(state["robots"][0], XarmState) |
|
xarm_state: XarmState = state["robots"][0] |
|
xarm_state.set_bullet_state(self._pybullet_client, self.robot.xarm) |
|
self._set_robot_target_effector_pose(xarm_state.target_effector_pose) |
|
|
|
def _set_state_safe(obj_state, obj_id): |
|
if obj_state is not None: |
|
assert obj_id is not None, "Cannot set state for missing object." |
|
obj_state.set_bullet_state(self._pybullet_client, obj_id) |
|
else: |
|
assert obj_id is None, f"No state found for obj_id {obj_id}" |
|
|
|
robot_end_effectors = state["robot_end_effectors"] |
|
_set_state_safe( |
|
None if not robot_end_effectors else robot_end_effectors[0], |
|
self.robot.end_effector, |
|
) |
|
|
|
for target_state, target_id in zip(state["targets"], self._target_ids): |
|
_set_state_safe(target_state, target_id) |
|
|
|
obj_ids = self.get_obj_ids() |
|
assert len(state["objects"]) == len(obj_ids), "State length mismatch" |
|
for obj_state, obj_id in zip(state["objects"], obj_ids): |
|
_set_state_safe(obj_state, obj_id) |
|
|
|
self.reset(reset_poses=False) |
|
|
|
|
|
class BlockPushMultimodalMultiview(BlockPushMultimodal): |
|
def __init__(self, id=None, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.observation_space = spaces.Box( |
|
low=0, high=1, shape=(2, 3, self._image_size[0], self._image_size[1]) |
|
) |
|
self._step = 0 |
|
|
|
def _get_obs(self): |
|
|
|
view0 = self._render_camera(self._image_size, view=0) |
|
view1 = self._render_camera(self._image_size, view=1) |
|
obs = np.stack([view0, view1], axis=0) |
|
return einops.rearrange(obs, "V H W C -> V C H W") |
|
|
|
def step(self, action): |
|
action = action * 0.03 |
|
self._step_robot_and_sim(action) |
|
|
|
state = self._compute_state() |
|
reward = self._get_reward(state) |
|
|
|
obs = self._get_obs() |
|
image = einops.rearrange(obs, "V C H W -> H (V W) C") |
|
obs = obs / 255.0 |
|
self._step += 1 |
|
done = (reward >= 0.5) or (self._step >= 300) |
|
|
|
return ( |
|
obs, |
|
reward, |
|
done, |
|
{ |
|
"state": state, |
|
"image": image, |
|
"all_completions_ids": [], |
|
"moved": self.moved, |
|
"entered": self.entered, |
|
}, |
|
) |
|
|
|
def reset(self, reset_poses=True, *args, **kwargs): |
|
print("resetting env") |
|
self._step = 0 |
|
state = super().reset(reset_poses=reset_poses) |
|
obs = self._get_obs() |
|
obs = obs / 255.0 |
|
return obs |
|
|
|
def set_state(self, state: collections.OrderedDict): |
|
robot_t = np.array( |
|
[*state["effector_translation"], block_pushing.EFFECTOR_HEIGHT] |
|
) |
|
robot_r = transform.Rotation.from_rotvec([0, np.pi, 0]) |
|
robot_pose = Pose3d(rotation=robot_r, translation=robot_t) |
|
self._set_robot_target_effector_pose(robot_pose) |
|
self.step_simulation_to_stabilize() |
|
|
|
block_t = [*state["block_translation"], 0] |
|
block_r = transform.Rotation.from_rotvec([0, 0, state["block_orientation"]]) |
|
self._pybullet_client.resetBasePositionAndOrientation( |
|
self._block_ids[0], |
|
block_t, |
|
block_r.as_quat().tolist(), |
|
) |
|
|
|
block2_t = [*state["block2_translation"], 0] |
|
block2_r = transform.Rotation.from_rotvec([0, 0, state["block2_orientation"]]) |
|
self._pybullet_client.resetBasePositionAndOrientation( |
|
self._block_ids[1], |
|
block2_t, |
|
block2_r.as_quat().tolist(), |
|
) |
|
|
|
target_t = [*state["target_translation"], 0.02] |
|
target_r = transform.Rotation.from_rotvec([0, 0, state["target_orientation"]]) |
|
self._pybullet_client.resetBasePositionAndOrientation( |
|
self._target_ids[0], |
|
target_t, |
|
target_r.as_quat().tolist(), |
|
) |
|
|
|
target2_t = [*state["target2_translation"], 0.02] |
|
target2_r = transform.Rotation.from_rotvec([0, 0, state["target2_orientation"]]) |
|
self._pybullet_client.resetBasePositionAndOrientation( |
|
self._target_ids[1], |
|
target2_t, |
|
target2_r.as_quat().tolist(), |
|
) |
|
|
|
self.step_simulation_to_stabilize() |
|
|
|
|
|
class BlockPushHorizontalMultimodal(BlockPushMultimodal): |
|
def _reset_object_poses(self, workspace_center_x, workspace_center_y): |
|
|
|
self._reset_block_poses(workspace_center_y) |
|
|
|
|
|
self._reset_target_poses(workspace_center_y) |
|
|
|
def _reset_block_poses(self, workspace_center_y): |
|
"""Resets block poses.""" |
|
|
|
|
|
def _reset_block_pose(idx, add=0.0, avoid=None): |
|
def _get_random_translation(): |
|
block_x = 0.35 + 0.5 * self._rng.uniform( |
|
low=-RANDOM_X_SHIFT, high=RANDOM_X_SHIFT |
|
) |
|
block_y = ( |
|
workspace_center_y |
|
+ add |
|
+ 0.5 * self._rng.uniform(low=-RANDOM_Y_SHIFT, high=RANDOM_Y_SHIFT) |
|
) |
|
block_translation = np.array([block_x, block_y, 0]) |
|
return block_translation |
|
|
|
if avoid is None: |
|
block_translation = _get_random_translation() |
|
else: |
|
|
|
for _ in range(NUM_RESET_ATTEMPTS): |
|
block_translation = _get_random_translation() |
|
dist = np.linalg.norm(block_translation[0] - avoid[0]) |
|
|
|
if dist > MIN_BLOCK_DIST: |
|
break |
|
block_sampled_angle = self._rng.uniform(math.pi) |
|
block_rotation = transform.Rotation.from_rotvec([0, 0, block_sampled_angle]) |
|
self._pybullet_client.resetBasePositionAndOrientation( |
|
self._block_ids[idx], |
|
block_translation.tolist(), |
|
block_rotation.as_quat().tolist(), |
|
) |
|
return block_translation |
|
|
|
|
|
for _ in range(NUM_RESET_ATTEMPTS): |
|
|
|
add = 0.2 * np.random.choice([-1, 1]) |
|
b0_translation = _reset_block_pose(0, add=add) |
|
|
|
b1_translation = _reset_block_pose(1, add=-add, avoid=b0_translation) |
|
dist = np.linalg.norm(b0_translation[0] - b1_translation[0]) |
|
if dist > MIN_BLOCK_DIST: |
|
break |
|
else: |
|
raise ValueError("could not find matching block") |
|
assert dist > MIN_BLOCK_DIST |
|
|
|
def _reset_target_poses(self, workspace_center_y): |
|
"""Resets target poses.""" |
|
|
|
def _reset_target_pose(idx, add=0.0, avoid=None): |
|
def _get_random_translation(): |
|
|
|
target_x = 0.5 + self._rng.uniform( |
|
low=-0.05 * RANDOM_X_SHIFT, high=0.05 * RANDOM_X_SHIFT |
|
) |
|
target_y = ( |
|
workspace_center_y |
|
+ add |
|
+ self._rng.uniform( |
|
low=-0.05 * RANDOM_Y_SHIFT, high=0.05 * RANDOM_Y_SHIFT |
|
) |
|
) |
|
target_translation = np.array([target_x, target_y, 0.020]) |
|
return target_translation |
|
|
|
if avoid is None: |
|
target_translation = _get_random_translation() |
|
else: |
|
|
|
for _ in range(NUM_RESET_ATTEMPTS): |
|
target_translation = _get_random_translation() |
|
dist = np.linalg.norm(target_translation[0] - avoid[0]) |
|
|
|
if dist > MIN_TARGET_DIST: |
|
break |
|
target_sampled_angle = math.pi + self._rng.uniform( |
|
low=-math.pi / 30, high=math.pi / 30 |
|
) |
|
target_rotation = transform.Rotation.from_rotvec( |
|
[0, 0, target_sampled_angle] |
|
) |
|
self._pybullet_client.resetBasePositionAndOrientation( |
|
self._target_ids[idx], |
|
target_translation.tolist(), |
|
target_rotation.as_quat().tolist(), |
|
) |
|
self._target_poses[idx] = Pose3d( |
|
rotation=target_rotation, translation=target_translation |
|
) |
|
|
|
if self._target_poses is None: |
|
self._target_poses = [None for _ in range(len(self._target_ids))] |
|
|
|
for _ in range(NUM_RESET_ATTEMPTS): |
|
|
|
add = 0.2 * np.random.choice([-1, 1]) |
|
|
|
_reset_target_pose(0, add=add) |
|
_reset_target_pose(1, add=-add, avoid=self._target_poses[0].translation) |
|
dist = np.linalg.norm( |
|
self._target_poses[0].translation[0] |
|
- self._target_poses[1].translation[0] |
|
) |
|
break |
|
|
|
|
|
else: |
|
raise ValueError("could not find matching target") |
|
|
|
|
|
|
|
if "BlockPushMultimodal-v0" in registration.registry.env_specs: |
|
del registration.registry["BlockPushMultimodal-v0"] |
|
|
|
registration.register( |
|
id="BlockPushMultimodal-v0", entry_point=BlockPushMultimodal, max_episode_steps=350 |
|
) |
|
|
|
registration.register( |
|
id="BlockPushMultimodalFlipped-v0", |
|
entry_point=BlockPushHorizontalMultimodal, |
|
max_episode_steps=25, |
|
) |
|
|
|
registration.register( |
|
id="SharedBlockPushMultimodal-v0", |
|
entry_point=BlockPushMultimodal, |
|
kwargs=dict(shared_memory=True), |
|
max_episode_steps=350, |
|
) |
|
registration.register( |
|
id="BlockPushMultimodalRgb-v0", |
|
entry_point=BlockPushMultimodal, |
|
max_episode_steps=350, |
|
kwargs=dict(image_size=(block_pushing.IMAGE_HEIGHT, block_pushing.IMAGE_WIDTH)), |
|
) |
|
registration.register( |
|
id="BlockPushMultimodalMultiview-v0", |
|
entry_point=BlockPushMultimodalMultiview, |
|
max_episode_steps=350, |
|
kwargs=dict(image_size=(block_pushing.IMAGE_HEIGHT, block_pushing.IMAGE_WIDTH)), |
|
) |
|
|