Gymnasium Environment#

This notebook showcases the integration of navground in gymnasium environments.

To start, let us load one of navground benchmark scenarios

[1]:
import warnings
warnings.filterwarnings("ignore")
[2]:
%%writefile scenario.yaml
type: Cross
agent_margin: 0.1
side: 4
target_margin: 0.1
tolerance: 0.5
groups:
  -
    type: thymio
    number: 20
    radius: 0.1
    control_period: 0.1
    speed_tolerance: 0.02
    color: gray
    kinematics:
      type: 2WDiff
      wheel_axis: 0.094
      max_speed: 0.12
    behavior:
      type: HL
      optimal_speed: 0.12
      horizon: 5.0
      tau: 0.25
      eta: 0.5
      safety_margin: 0.1
    state_estimation:
      type: Bounded
      range: 5.0
Overwriting scenario.yaml
[3]:
from navground import sim

with open('scenario.yaml') as f:
    scenario = sim.load_scenario(f.read())

where 20 agents moves between two pairs of waypoints, crossing in the middle.

[4]:
from navground.sim.ui.video import display_video

world = scenario.make_world()
display_video(world, time_step=0.1, duration=100, factor=10, display_width=400)
[4]:

We want setup an gymnasium environment where one of these agents uses a ML policy to navigate using a sensor. We select an high-level sensor that computes the 5 nearest neighbors and returns them as arrays of positions, velocities and radii.

[5]:
%%writefile sensor.yaml
type: Discs
number: 5
range: 5.0
max_speed: 0.12
max_radius: 0.1
Overwriting sensor.yaml
[6]:
with open('sensor.yaml') as f:
    sensor = sim.load_state_estimation(f.read())
[7]:
from navground.learning import DefaultObservationConfig, ControlActionConfig

observation_config = DefaultObservationConfig(include_target_direction=True, include_target_distance=True)
action_config = ControlActionConfig()
observation_config, action_config
[7]:
(DefaultObservationConfig(), ControlActionConfig())
[8]:
import gymnasium as gym
from navground.learning.rewards import SocialReward

env = gym.make('navground.learning.env:navground',
               scenario=scenario,
               sensor=sensor,
               action=action_config,
               observation=observation_config,
               reward=SocialReward(),
               time_step=0.1,
               max_episode_steps=600)

When we import the enviroment implicly, as we have just done by importing the reward, or explictly, e.g., with import navground.learning.env, we can shorten the reference to just its name

[9]:
env = gym.make('navground',
               scenario=scenario,
               sensor=sensor,
               action=action_config,
               observation=observation_config,
               reward=SocialReward(),
               time_step=0.1,
               max_episode_steps=600)

Let’s check its observation and action spaces.

The action space

[10]:
env.action_space
[10]:
Box(-1.0, 1.0, (2,), float32)

is composed of linear and angular speeds (normalized by their maximal values)

[11]:
env.unwrapped.action_config.max_speed, env.unwrapped.action_config.max_angular_speed
[11]:
(0.11999999731779099, 2.5531914234161377)

which in this case have been inherited from the scenario specifications but could have been set explicitly in ControlActionConfig

The observation space

[12]:
env.observation_space
[12]:
Dict('position': Box(-5.0, 5.0, (5, 2), float32), 'radius': Box(0.0, 0.1, (5,), float32), 'valid': Box(0, 1, (5,), uint8), 'velocity': Box(-0.12, 0.12, (5, 2), float32), 'ego_target_direction': Box(-1.0, 1.0, (2,), float32), 'ego_target_distance': Box(0.0, inf, (1,), float32))

is composed of the agent own state (ego_...) and the reading from the sensor, in this case position, radius and velocity of the 5 nearest neighbors, coherently with the sensor’s description

[13]:
sensor.description
[13]:
{'position': BufferDescription(shape=(5, 2), type=dtype('float32'), low=-5.0, high=5.0, categorical=False),
 'radius': BufferDescription(shape=(5,), type=dtype('float32'), low=0.0, high=0.10000000149011612, categorical=False),
 'valid': BufferDescription(shape=(5,), type=dtype('uint8'), low=0.0, high=1.0, categorical=False),
 'velocity': BufferDescription(shape=(5, 2), type=dtype('float32'), low=-0.11999999731779099, high=0.11999999731779099, categorical=False)}

The info map returned by env.reset(...) and env.step(...) contains the action computed by original navground behavior, in this case HL:

[14]:
observation, info = env.reset()
print(f"Observation: {observation}")
print(f"Info {info}")
Observation: {'ego_target_distance': array([1.3484901], dtype=float32), 'ego_target_direction': array([ 1.0000000e+00, -1.5725663e-08], dtype=float32), 'position': array([[-0.00738173, -0.30817246],
       [-0.38925827,  0.01894906],
       [-0.46368217, -0.4778133 ],
       [ 0.15306982, -0.6674728 ],
       [ 0.5088892 , -0.62434775]], dtype=float32), 'radius': array([0.1, 0.1, 0.1, 0.1, 0.1], dtype=float32), 'valid': array([1, 1, 1, 1, 1], dtype=uint8), 'velocity': array([[0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]], dtype=float32)}
Info {'navground_action': array([0.32967997, 0.        ], dtype=float32)}

which we can use to define a policy that simply ask the agent to actuate the action that it has already computed.

Let us collect the reward from the navground policy in this way. To understand the scale, in this case, the reward assigned at each step in maximal (=0) when the agents move straight towards the goal at optimal speed. When the entire safety margin is violated, it gets a penality of at most -1, the same value it gets if it stay in-place, while moving in the opposite target direction at optimal speed gets a -2. Therefore, we can expect an average reward between -1 and 0, and possibly near to 0 for a well-performing navigation behavior.

[15]:
import numpy as np

rewards = []
for n in range(1000):
    action = info['navground_action']
    observation, reward, terminated, truncated, info = env.step(action)
    rewards.append(reward)
    if terminated or truncated:
        print(f'reset after {n} steps')
        observation, info = env.reset()

print(f'mean reward {np.mean(rewards):.3f}')
reset after 599 steps
mean reward -0.289

We compare it (just for fun) with the reward from a random policy, which we expect to be around -1, as an average between of moving towards the target (=0) and away of the target (-2).

[16]:
observation, info = env.reset()
rewards = []
for n in range(1000):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    rewards.append(reward)
    if terminated or truncated:
        print(f'reset after {n} steps')
        observation, info = env.reset()

print(f'mean reward {np.mean(rewards):.3f}')
reset after 599 steps
mean reward -0.912

In general, we want to use a policy to generate the action, i.e. a function that maps observations to actions. In this case, let us try again with a random policy

[17]:
from navground.learning.policies.random_policy import RandomPolicy

policy = RandomPolicy(observation_space=env.observation_space, action_space=env.action_space)

We export this policy to ONNX for later use in the Behavior notebook.

[18]:
import warnings
from navground.learning import onnx

with warnings.catch_warnings():
    warnings.filterwarnings('ignore')
    onnx.export(policy, "policy.onnx")

Policies that follow the StableBaseline3 API, like this one, output a tuple (action, state). Therefore the loop become

[19]:
observation, info = env.reset()
rewards = []
for n in range(1000):
    action, state = policy.predict(observation)
    observation, reward, terminated, truncated, info = env.step(action)
    rewards.append(reward)
    if terminated or truncated:
        print(f'reset after {n} steps')
        observation, info = env.reset()

print(f'mean reward {np.mean(rewards):.3f}')
reset after 599 steps
mean reward -0.984