Skip to content

Commit

Permalink
FEATURE: Upgrading ChaseTag env
Browse files Browse the repository at this point in the history
  • Loading branch information
vikashplus committed Dec 31, 2023
1 parent 89fe9b2 commit 9f32f15
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 27 deletions.
100 changes: 80 additions & 20 deletions robohive/envs/myo/myochallenge/chasetag_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
================================================= """

import collections
from robohive.utils.import_utils import gym
from robohive.utils import gym
import numpy as np
import pink
import os
from enum import Enum
from typing import Optional

from robohive.envs.myo.base_v0 import BaseV0
from robohive.envs.myo.myobase.walk_v0 import WalkEnvV0
Expand Down Expand Up @@ -199,6 +200,17 @@ def __init__(self,
self.relief_range = relief_range
self._populate_patches()

def flatten_agent_patch(self, qpos):
"""
Turn terrain in the patch around the agent to flat.
"""
# convert position to map position
pos = self.cart2map(qpos[:2])
# get patch that belongs to the position
i = pos[0] // self.patch_size
j = pos[1] // self.patch_size
self._fill_patch(i, j, terrain_type=TerrainTypes.FLAT)

def _compute_patch_data(self, terrain_type):
if terrain_type.name == 'FLAT':
return np.zeros((self.patch_size, self.patch_size))
Expand All @@ -223,32 +235,48 @@ def _populate_patches(self):
self._fill_patch(i, j, terrain_type)
# put special terrain only once in 20% of episodes
if self.rng.uniform() < 0.2:
i = self.rng.choice(range(self.patches_per_side))
j = self.rng.choice(range(self.patches_per_side))
i, j = np.random.randint(0, self.patches_per_side, size=2)
self._fill_patch(i, j, SpecialTerrains.RELIEF)

def _fill_patch(self, i, j, terrain_type='FLAT'):
def _fill_patch(self, i, j, terrain_type=TerrainTypes.FLAT):
"""
Fill patch at position <i> ,<j> with terrain <type>
"""
self.hfield.data[i * self.patch_size: i*self.patch_size + self.patch_size,
j * self.patch_size: j * self.patch_size + self.patch_size] = self._compute_patch_data(terrain_type)

def get_heightmap_obs(self):
"""
Get heightmap observation.
"""
if self.heightmap_window is None:
self.heightmap_window = np.zeros((10, 10))
self._measure_height()
return self.heightmap_window[:].flatten().copy()

def cart2map(self, pos):
def cart2map(self,
points_1: list,
points_2: Optional[list] = None):
"""
Transform cartesian position [m * m] to rounded map position [nrow * ncol]
If only points_1 is given: Expects cartesian positions in [x, y] format.
If also points_2 is given: Expects points_1 = [x1, x2, ...] points_2 = [y1, y2, ...]
"""
delta_map = self.real_length / self.nrow
offset = self.hfield.data.shape[0] / 2
return pos[:] / delta_map + offset
# x, y needs to be switched to match hfield.
if points_2 is None:
return np.array(points_1[::-1] / delta_map + offset, dtype=np.int16)
else:
ret1 = np.array(points_1[:] / delta_map + offset, dtype=np.int16)
ret2 = np.array(points_2[:] / delta_map + offset, dtype=np.int16)
return ret2, ret1

def sample(self, rng=None):
"""
Sample an entire heightfield for the episode.
Update geom in viewer if rendering.
"""
if not rng is None:
self.rng = rng
self._populate_patches()
Expand All @@ -257,19 +285,28 @@ def sample(self, rng=None):

# Patch types ---------------
def _compute_rough_terrain(self):
"""
Compute data for a random noise rough terrain.
"""
rough = self.rng.uniform(low=-1.0, high=1.0, size=(self.patch_size, self.patch_size))
normalized_data = (rough - np.min(rough)) / (np.max(rough) - np.min(rough))
scalar, offset = .08, .02
scalar = self.rng.uniform(low=self.rough_range[0], high=self.rough_range[1])
return normalized_data * scalar - offset

def _compute_relief_terrain(self):
"""
Compute data for a special logo terrain.
"""
curr_dir = os.path.dirname(__file__)
relief = np.load(os.path.join(curr_dir, '../assets/myo_relief.npy'))
normalized_data = (relief - np.min(relief)) / (np.max(relief) - np.min(relief))
return np.flipud(normalized_data) * self.rng.uniform(self.relief_range[0], self.relief_range[1])

def _compute_hilly_terrain(self):
"""
Compute data for a terrain with smooth hills.
"""
frequency = 10
scalar = self.rng.uniform(low=self.hills_range[0], high=self.hills_range[1])
data = np.sin(np.linspace(0, frequency * np.pi, self.patch_size * self.patch_size) + np.pi / 2) - 1
Expand All @@ -280,7 +317,7 @@ def _compute_hilly_terrain(self):
return normalized_data

def _init_height_points(self):
""" Compute points at which height measurments are sampled (in base frame)
""" Compute grid points at which height measurements are sampled (in base frame)
Saves the points in ndarray of shape (self.num_height_points, 3)
"""
measured_points_x = [-0.4, -0.3, -0.2, -0.1, 0., 0.1, 0.2, 0.3, 0.4, 0.5]
Expand All @@ -296,10 +333,14 @@ def _init_height_points(self):
self.height_points = points

def _measure_height(self):
"""
Update heights at grid points around
model.
"""
rot_direction = quat2euler(self.sim.data.qpos[3:7])[2]
rot_mat = euler2mat([0, 0, rot_direction])
# rotate points around z-direction to match model
points = self.height_points @ rot_mat
points = np.einsum("ij,kj->ik", self.height_points, rot_mat)
# increase point spacing
points = (points * self.view_distance)
# translate points to model frame
Expand All @@ -308,20 +349,17 @@ def _measure_height(self):
px = self.points[:, 0]
py = self.points[:, 1]
# get map_index coordinates of points
px = np.asarray(self.cart2map(px), dtype=np.int16)
py = np.asarray(self.cart2map(py), dtype=np.int16)
px, py = self.cart2map(px, py)
# avoid out-of-bounds by clipping indices to map boundaries
# -2 because we go one further and shape is 1 longer than map index
px = np.clip(px, 0, self.hfield.data.shape[0] - 2)
py = np.clip(py, 0, self.hfield.data.shape[1] - 2)
# switch x and y here because of array indexing
heights = self.hfield.data[py, px]

heights = self.hfield.data[px, py]
if not hasattr(self, 'length'):
self.length = 0
self.length += 1
# align with egocentric view of model
self.heightmap_window[:] = np.rot90((heights).reshape(10, 10))
self.heightmap_window[:] = np.flipud(np.rot90(heights.reshape(10, 10), axes=(1,0)))

@property
def size(self):
Expand Down Expand Up @@ -376,7 +414,6 @@ def __init__(self, model_path, obsd_model_path=None, seed=None, **kwargs):
# first construct the inheritance chain, which is just __init__ calls all the way down, with env_base
# creating the sim / sim_obsd instances. Next we run through "setup" which relies on sim / sim_obsd
# created in __init__ to complete the setup.
# base().__init__(model_path=model_path, obsd_model_path=obsd_model_path, seed=seed)
BaseV0.__init__(self, model_path=model_path, obsd_model_path=obsd_model_path, seed=seed, env_credits=self.MYO_CREDIT)
self._setup(**kwargs)

Expand Down Expand Up @@ -519,22 +556,32 @@ def get_metrics(self, paths):

def step(self, *args, **kwargs):
self.opponent.update_opponent_state()
obs, reward, done, info = super().step(*args, **kwargs)
return obs, reward, done, info
results = super().step(*args, **kwargs)
return results

def reset(self):
def reset(self, **kwargs):
# randomized terrain types
self._maybe_sample_terrain()
# randomized tasks
self._sample_task()
# randomized initial state
qpos, qvel = self._get_reset_state()
self._maybe_flatten_agent_patch(qpos)
self.robot.sync_sims(self.sim, self.sim_obsd)
obs = super(WalkEnvV0, self).reset(reset_qpos=qpos, reset_qvel=qvel)
obs = super(WalkEnvV0, self).reset(reset_qpos=qpos, reset_qvel=qvel, **kwargs)
self.opponent.reset_opponent(player_task=self.current_task.name, rng=self.np_random)
self.sim.forward()
return obs

def _maybe_flatten_agent_patch(self, qpos):
"""
Ensure that initial state patch is flat.
"""
if self.heightfield is not None:
self.heightfield.flatten_agent_patch(qpos)
if hasattr(self.sim, 'renderer') and not self.sim.renderer._window is None:
self.sim.renderer._window.update_hfield(0)

def _sample_task(self):
if self.task_choice == 'random':
self.current_task = self.np_random.choice(Task)
Expand All @@ -560,6 +607,8 @@ def _randomize_position_orientation(self, qpos, qvel):
euler_angle = quat2euler(qpos[3:7])
euler_angle[-1] = orientation
qpos[3:7] = euler2quat(euler_angle)
# rotate original velocity with unit direction vector
qvel[:2] = np.array([np.cos(orientation), np.sin(orientation)]) * np.linalg.norm(qvel[:2])
return qpos, qvel

def _get_reset_state(self):
Expand All @@ -571,6 +620,17 @@ def _get_reset_state(self):
else:
return self.sim.model.key_qpos[0], self.sim.model.key_qvel[0]

def _maybe_adjust_height(self, qpos, qvel):
"""
Currently not used.
"""
if self.heightfield is not None:
map_i, map_j = self.heightfield.cart2map(qpos[:2])
hfield_val = self.heightfield.hfield.data[map_i, map_j]
if hfield_val > 0.05:
qpos[2] += hfield_val
return qpos, qvel

def viewer_setup(self, *args, **kwargs):
"""
Setup the default camera
Expand Down Expand Up @@ -749,4 +809,4 @@ def _get_fallen_condition(self):
if head[2] - mean[2] < 0.2:
return 1
else:
return 0
return 0
11 changes: 7 additions & 4 deletions robohive/logger/examine_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@
"""

@click.command(help=DESC)
@click.option('-e', '--env_name', type=str, help='environment to load', default="AdroitBananaPass-v0")
@click.option('-e', '--env_name', type=str, help='environment to load', default="MyoHandBananaPass-v0")
@click.option('-h', '--horizon', type=int, help='playback horizon', default=-1)
@click.option('-n', '--num_playback', type=int, help='Number of time to loop playback', default=1)
@click.option('-r', '--render', type=click.Choice(['onscreen', 'none']), help='visualize onscreen?', default='onscreen')
def examine_reference(env_name, horizon, num_playback, render):
env = gym.make(env_name)

# fixed or random reference
if horizon==1:
horizon = env.spec.max_episode_steps

# infer reference horizon
env = env.unwrapped
if horizon==-1:
horizon = env.env.ref.horizon
if horizon==1: # fixed or random reference
horizon = env.env.horizon
horizon = env.ref.horizon

# Start playback loops
print(f"Rending reference motion (total frames: {horizon})")
Expand Down
4 changes: 1 addition & 3 deletions robohive/tests/test_myo.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,4 @@ def no_test_myomimic(self):


if __name__ == '__main__':
unittest.main()


unittest.main()

0 comments on commit 9f32f15

Please sign in to comment.