|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Metrics for the blocks environment.""" |
|
|
|
from typing import Optional, Text |
|
|
|
import numpy as np |
|
|
|
from tf_agents.metrics import py_metrics |
|
from tf_agents.typing import types |
|
from tf_agents.utils import nest_utils |
|
from tf_agents.utils import numpy_storage |
|
|
|
|
|
class AverageSuccessMetric(py_metrics.StreamingMetric): |
|
"""Computes the average success of the environment.""" |
|
|
|
def __init__( |
|
self, env, name="AverageSuccessMetric", buffer_size=10, batch_size=None |
|
): |
|
"""Creates an AverageReturnMetric.""" |
|
self._np_state = numpy_storage.NumpyState() |
|
self._env = env |
|
|
|
|
|
self._np_state.success = np.float64(0) |
|
super(AverageSuccessMetric, self).__init__( |
|
name, buffer_size=buffer_size, batch_size=batch_size |
|
) |
|
|
|
def _reset(self, batch_size): |
|
"""Resets stat gathering variables.""" |
|
self._np_state.success = np.zeros(shape=(batch_size,), dtype=np.float64) |
|
|
|
def _batched_call(self, trajectory): |
|
"""Processes the trajectory to update the metric. |
|
|
|
Args: |
|
trajectory: a tf_agents.trajectory.Trajectory. |
|
""" |
|
success = self._np_state.success |
|
|
|
is_first = np.where(trajectory.is_first()) |
|
success[is_first] = 0 |
|
|
|
success += self._env.succeeded |
|
|
|
is_last = np.where(trajectory.is_last()) |
|
self.add_to_buffer(success[is_last]) |
|
|
|
|
|
class AverageFinalGoalDistance(py_metrics.StreamingMetric): |
|
"""Computes the average success of the environment.""" |
|
|
|
def __init__( |
|
self, env, name="AverageFinalGoalDistance", buffer_size=10, batch_size=None |
|
): |
|
"""Creates an AverageReturnMetric.""" |
|
self._env = env |
|
super(AverageFinalGoalDistance, self).__init__( |
|
name, buffer_size=buffer_size, batch_size=batch_size |
|
) |
|
|
|
def _reset(self, batch_size): |
|
"""Resets stat gathering variables.""" |
|
pass |
|
|
|
def _batched_call(self, trajectory): |
|
"""Processes the trajectory to update the metric. |
|
|
|
Args: |
|
trajectory: a tf_agents.trajectory.Trajectory. |
|
""" |
|
lasts = trajectory.is_last() |
|
if np.any(lasts): |
|
is_last = np.where(lasts) |
|
goal_distance = np.asarray(self._env.goal_distance, np.float32) |
|
|
|
if goal_distance.shape is (): |
|
goal_distance = nest_utils.batch_nested_array(goal_distance) |
|
|
|
self.add_to_buffer(goal_distance[is_last]) |
|
|