Different speeds#

[1]:
import warnings

warnings.filterwarnings('ignore')
%config InlineBackend.figure_formats = ['svg']

Scenario#

Contrary to the previous notebook, in this notebook agents are assigned, at initialization, individual target speeds sampled in [0.03, 0.12].

In order to set consistent target angular and linear speeds, we add an initializer to the scenario that set the behavior’s optimal_angular_speed to the maximal angular speed achievable with a maximal linear wheel speed of optimal_speed.

[2]:
from navground import sim

scenario = sim.load_scenario("""
type: CrossTorus
agent_margin: 0.1
side: 2
groups:
  -
    type: thymio
    number: 10
    radius: 0.1
    control_period: 0.1
    speed_tolerance: 0.02
    color: [red, green, blue, yellow]
    kinematics:
      type: 2WDiff
      wheel_axis: 0.094
      max_speed: 0.12
    behavior:
      type: HL
      horizon: 5.0
      tau: 0.25
      eta: 0.5
      safety_margin: 0.1
      barrier_angle: 1.0
      optimal_speed:
        sampler: uniform
        from: 0.03
        to: 0.12
    state_estimation:
      type: Bounded
      range: 1.0
""")

def set_optimal_angular_speed(world: sim.World, seed: int | None = None) -> None:
    for agent in world.agents:
        agent.behavior.optimal_angular_speed = 2 * agent.behavior.optimal_speed / agent.kinematics.wheel_axis

scenario.set_init('angular_speed', set_optimal_angular_speed)

The video below shows HL agents navigating in this scenario. Agents are colored by their target speed (red=lower), which is sampled at the begin of the run, and then kept constant.

[3]:
from navground.sim.ui.video import display_video_from_run, display_video
from navground.sim.ui import svg_color

def decorate(agent):
    f = agent.behavior.optimal_speed / 0.12
    return {'fill': svg_color(1 - f, f, 0.1)}

world = scenario.make_world(seed=1)
display_video(world, time_step=0.1, duration=60.0, factor=5, display_width=400, decorate=decorate)
[3]:

Environment#

In order to make the agents respect their target speed, we add a term to the reward that penalizes excessive speeds, while lower speeds are already penalized by a descrease in efficacy.

[4]:
import dataclasses as dc
from navground.learning.rewards import SocialReward
import numpy as np


@dc.dataclass
class RewardWithSpeeding(SocialReward):
    gamma: float = 1.0
    """The weight of speeding"""

    def __call__(self, agent: sim.Agent, world: sim.World,
                 time_step: float) -> float:
        reward = super().__call__(agent, world, time_step)
        if agent.behavior:
            target_speed = agent.behavior.get_target_speed()
            target_angular_speed = agent.behavior.get_target_angular_speed()
            speed = np.linalg.norm(agent.velocity)
            angular_speed = abs(agent.angular_speed)
            if speed > target_speed:
                reward += self.gamma * (target_speed - speed) / target_speed
            if angular_speed > target_angular_speed:
                reward += self.gamma * (target_angular_speed - angular_speed) / target_angular_speed
        return reward

We also include tha target speed in the observation.

Apart from these changes, we use the same configuration as in the previous notebook.

[5]:
from navground.learning import ControlActionConfig, DefaultObservationConfig
from navground.learning.parallel_env import shared_parallel_env, make_vec_from_penv
from navground.learning import io

sensor = sim.load_state_estimation("""
type: Discs
number: 5
range: 1.0
max_speed: 0.12
max_radius: 0
""")

reward = RewardWithSpeeding(safety_margin=0.1, beta=2.0, gamma=2.0)
action_config = ControlActionConfig(max_acceleration=1.0, max_angular_acceleration=10.0,
                                    use_acceleration_action=True)
observation_config = DefaultObservationConfig(include_target_direction=True, include_target_speed=True,
                                              include_velocity=True, include_angular_speed=True, flat=True)
penv = shared_parallel_env(scenario=scenario, sensor=sensor, action=action_config,
                           observation=observation_config, reward=reward,
                           time_step=0.1, max_duration=120)
venv = make_vec_from_penv(penv)
io.save_env(penv, "different_speed_env.yaml")

We compute the performance of HL in this environment

[6]:
from navground.learning.evaluation import evaluate_policy

mean = {}
stddev = {}

mean['HL'], stddev['HL'] = evaluate_policy(penv.get_policy(0), venv, n_eval_episodes=100)
print(f"HL reward: {mean['HL'] / 1200: .3f} ± {stddev['HL'] / 1200: .3f}")
HL reward: -0.097 ±  0.034

Training#

[7]:
from stable_baselines3 import SAC
from stable_baselines3.common.vec_env import VecMonitor
from stable_baselines3.common.logger import configure
from datetime import datetime as dt

stamp = dt.now().strftime("%Y%m%d_%H%M%S")
train_venv = VecMonitor(venv)
sac = SAC("MlpPolicy", train_venv, policy_kwargs={'net_arch': [128, 128]})
sac.set_logger(configure(f'logs/DifferentSpeed/SAC/{stamp}', ["tensorboard", "csv"]))
[8]:
sac.learn(total_timesteps=3_000_000, progress_bar=True, log_interval=25, reset_num_timesteps=False)
sac.save("DifferentSpeed/SAC/model")

The policy is learn in about 2.5M steps (i.e., 3.5 h of simulated time).

[9]:
import pandas as pd

df = pd.read_csv(f'{sac.logger.get_dir()}/progress.csv')
df.rolling(window=5).mean().plot(x='time/total_timesteps', y='rollout/ep_rew_mean', figsize=(8, 3));
../../_images/tutorials_periodic_crossing_DifferentSpeed_16_0.svg

with a performance

[10]:
mean['SAC'], stddev['SAC'] = evaluate_policy(sac.policy, venv, n_eval_episodes=100)
print(f"SAC reward: {mean['SAC'] / 1200: .3f} ± {stddev['SAC'] / 1200: .3f}")
SAC reward: -0.211 ±  0.062
[11]:
pd.set_option("display.precision", 3)
rewards = pd.DataFrame({"mean": mean, "std dev": stddev})
rewards.index = rewards.index.set_names(['algorithm'])
rewards /= 1200
rewards.to_csv("different_speed_rewards.csv")
rewards
[11]:
mean std dev
algorithm
HL -0.097 0.034
SAC -0.211 0.062

Let us visualize one episode as usual

[12]:
from navground.learning.evaluation import make_experiment_with_env

exp = make_experiment_with_env(penv, policy=sac.policy, record_reward=False)
exp.record_config.pose = True
exp.number_of_runs = 1
exp.run_index = 3
exp.steps = 1200
exp.run()
display_video_from_run(exp.runs[3], factor=5, relative_margin=0,
                       display_width=400, display_shape=False, decorate=decorate)
[12]:

and plot the episode rewards distributions

[13]:
exp = make_experiment_with_env(penv, policy=sac.policy)
exp.number_of_runs = 30
exp.run()
rewards = np.asarray([run.get_record("reward") for run in exp.runs.values()])
sac_rewards = np.mean(rewards, axis=1)
exp = make_experiment_with_env(penv)
exp.number_of_runs = 30
exp.run()
rewards = np.asarray([run.get_record("reward") for run in exp.runs.values()])
hl_rewards = np.mean(rewards, axis=1)
[14]:
from matplotlib import pyplot as plt

plt.figure(figsize=(8, 3))
bins = np.linspace(-0.5, 0, 30)
plt.hist(sac_rewards.flatten(), bins=bins, density=True, label="SAC", alpha=0.5);
plt.hist(hl_rewards.flatten(), bins=bins, density=True, label="HL", alpha=0.5);
plt.xlabel('average reward')
plt.ylabel('probability');
plt.title('Individual target speed: episode rewards distribution')
plt.legend();
../../_images/tutorials_periodic_crossing_DifferentSpeed_24_0.svg

Varying target speed at runtime#

The policy seems to perform well but is it really respecting the target speed?

We run a simple test where we change target speed at run-time, setting it to the same value for all agents, using a sinusoidal modulation of the agents optimal speed

[30]:
from navground import core
from typing import Callable

class VaryOptimalSpeed(core.BehaviorModulation):

    def __init__(self, get_time: Callable[[], float]):
        super().__init__()
        self.get_time = get_time

    def pre(self, behavior: core.Behavior, time_step: float) -> None:
        behavior.optimal_speed = (np.sin(self.get_time() / 10) + 1) * 0.06
        behavior.optimal_angular_speed = 2 * behavior.optimal_speed / behavior.kinematics.wheel_axis

def init_speed_modulation(world: sim.World, seed: int | None = None) -> None:
    for agent in world.agents:
        agent.behavior.add_modulation(VaryOptimalSpeed(lambda: world.time))

and add it to all agents in the scenario

[31]:
from navground.learning.evaluation import InitPolicyBehavior
from navground.learning import GroupConfig

test_scenario = sim.load_scenario("""
type: CrossTorus
agent_margin: 0.1
side: 2
groups:
  -
    type: thymio
    number: 10
    radius: 0.1
    control_period: 0.1
    speed_tolerance: 0.02
    kinematics:
      type: 2WDiff
      wheel_axis: 0.094
      max_speed: 0.12
    behavior:
      type: 'HL'
""")

group = GroupConfig(policy=sac.policy, sensor=sensor,
                    action=action_config, observation=observation_config)
init_policy = InitPolicyBehavior(groups=[group])
[32]:
world = test_scenario.make_world()
init_policy(world)
init_speed_modulation(world)
display_video(world, time_step=0.1, duration=120, factor=5, relative_margin=0,
              display_width=400, decorate=decorate)
[32]:
[ ]: