Skip to content

Commit

Permalink
[RLlib] APPO+new-stack (Atari benchmark) - Preparatory PR 03 - PyTorc…
Browse files Browse the repository at this point in the history
  • Loading branch information
sven1977 committed May 5, 2023
1 parent 9ec5050 commit adfdbbd
Show file tree
Hide file tree
Showing 56 changed files with 1,371 additions and 655 deletions.
5 changes: 2 additions & 3 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ py_test(
py_test(
name = "learning_tests_cartpole_appo_w_rl_modules_and_learner",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "tf_only", "no_tf_static_graph"],
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "no_tf_static_graph"],
size = "medium", # bazel may complain about it being too long sometimes - medium is on purpose as some frameworks take longer
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/appo/cartpole-appo-w-rl-modules-and-learner.yaml"],
Expand Down Expand Up @@ -599,7 +599,6 @@ py_test(
args = ["--dir=tuned_examples/ppo"]
)

# TODO (Sven): Enable tf2 for this test.
py_test(
name = "learning_tests_pendulum_ppo_with_rl_module",
main = "tests/run_regression_tests.py",
Expand Down Expand Up @@ -930,7 +929,7 @@ py_test(
name = "test_appo_learner",
tags = ["team:rllib", "algorithms_dir"],
size = "medium",
srcs = ["algorithms/appo/tests/tf/test_appo_learner.py"]
srcs = ["algorithms/appo/tests/test_appo_learner.py"]
)

# ARS
Expand Down
8 changes: 4 additions & 4 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,10 @@ def setup(self, config: AlgorithmConfig) -> None:
self.learner_group = None
if self.config._enable_learner_api:
# TODO (Kourosh): This is an interim solution where policies and modules
# co-exist. In this world we have both policy_map and MARLModule that need
# to be consistent with one another. To make a consistent parity between
# the two we need to loop through the policy modules and create a simple
# MARLModule from the RLModule within each policy.
# co-exist. In this world we have both policy_map and MARLModule that need
# to be consistent with one another. To make a consistent parity between
# the two we need to loop through the policy modules and create a simple
# MARLModule from the RLModule within each policy.
local_worker = self.workers.local_worker()
module_spec = local_worker.marl_module_spec
learner_group_config = self.config.get_learner_group_config(module_spec)
Expand Down
14 changes: 13 additions & 1 deletion rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Callable,
Container,
Dict,
List,
Mapping,
Optional,
Tuple,
Expand Down Expand Up @@ -315,6 +316,7 @@ def __init__(self, algo_class=None):
# `self.training()`
self.gamma = 0.99
self.lr = 0.001
self.lr_schedule = None
self.grad_clip = None
self.grad_clip_by = "global_norm"
self.train_batch_size = 32
Expand Down Expand Up @@ -1588,6 +1590,7 @@ def training(
*,
gamma: Optional[float] = NotProvided,
lr: Optional[float] = NotProvided,
lr_schedule: Optional[List[List[Union[int, float]]]] = NotProvided,
grad_clip: Optional[float] = NotProvided,
grad_clip_by: Optional[str] = NotProvided,
train_batch_size: Optional[int] = NotProvided,
Expand All @@ -1602,6 +1605,10 @@ def training(
Args:
gamma: Float specifying the discount factor of the Markov Decision process.
lr: The default learning rate.
lr_schedule: Learning rate schedule. In the format of
[[timestep, lr-value], [timestep, lr-value], ...]
Intermediary timesteps will be assigned to interpolated learning rate
values. A schedule should normally start from timestep 0.
grad_clip: The value to use for gradient clipping. Depending on the
`grad_clip_by` setting, gradients will either be clipped by value,
norm, or global_norm (see docstring on `grad_clip_by` below for more
Expand Down Expand Up @@ -1653,6 +1660,8 @@ def training(
self.gamma = gamma
if lr is not NotProvided:
self.lr = lr
if lr_schedule is not NotProvided:
self.lr_schedule = lr_schedule
if grad_clip is not NotProvided:
self.grad_clip = grad_clip
if grad_clip_by is not NotProvided:
Expand Down Expand Up @@ -3129,6 +3138,9 @@ def get_learner_group_config(self, module_spec: ModuleSpec) -> LearnerGroupConfi
.learner(
learner_class=self.learner_class,
# TODO (Kourosh): optimizer config can now be more complicated.
# TODO (Sven): Shouldn't optimizer config be part of learner HPs?
# E.g. if we have a lr schedule, this will have to be managed by
# the learner, NOT the optimizer directly.
optimizer_config={
"lr": self.lr,
"grad_clip": self.grad_clip,
Expand Down Expand Up @@ -3159,7 +3171,7 @@ def get_learner_hyperparameters(self) -> LearnerHyperparameters:
Note that LearnerHyperparameters should always be derived directly from a
AlgorithmConfig object's own settings and considered frozen/read-only.
"""
return LearnerHyperparameters()
return LearnerHyperparameters(lr_schedule=self.lr_schedule)

def __setattr__(self, key, value):
"""Gatekeeper in case we are in frozen state and need to error."""
Expand Down
1 change: 0 additions & 1 deletion rllib/algorithms/alpha_star/alpha_star.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ def __init__(self, algo_class=None):

# Override some of APPOConfig's default values with AlphaStar-specific
# values.
self.vtrace_drop_last_ts = False
self.min_time_s_per_iteration = 2
self.policies = None
self.simple_optimizer = True
Expand Down
91 changes: 43 additions & 48 deletions rllib/algorithms/appo/appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
NUM_AGENT_STEPS_SAMPLED,
NUM_ENV_STEPS_SAMPLED,
NUM_TARGET_UPDATES,
NUM_ENV_STEPS_TRAINED,
NUM_AGENT_STEPS_TRAINED,
)
from ray.rllib.utils.metrics import ALL_MODULES, LEARNER_STATS_KEY
from ray.rllib.utils.typing import (
Expand Down Expand Up @@ -213,7 +211,13 @@ def training(

@override(ImpalaConfig)
def get_default_learner_class(self):
if self.framework_str == "tf2":
if self.framework_str == "torch":
from ray.rllib.algorithms.appo.torch.appo_torch_learner import (
APPOTorchLearner,
)

return APPOTorchLearner
elif self.framework_str == "tf2":
from ray.rllib.algorithms.appo.tf.appo_tf_learner import APPOTfLearner

return APPOTfLearner
Expand All @@ -222,16 +226,21 @@ def get_default_learner_class(self):

@override(ImpalaConfig)
def get_default_rl_module_spec(self) -> SingleAgentRLModuleSpec:
if self.framework_str == "tf2":
from ray.rllib.algorithms.appo.appo_catalog import APPOCatalog
from ray.rllib.algorithms.appo.tf.appo_tf_rl_module import APPOTfRLModule

return SingleAgentRLModuleSpec(
module_class=APPOTfRLModule, catalog_class=APPOCatalog
if self.framework_str == "torch":
from ray.rllib.algorithms.appo.torch.appo_torch_rl_module import (
APPOTorchRLModule as RLModule,
)
elif self.framework_str == "tf2":
from ray.rllib.algorithms.appo.tf.appo_tf_rl_module import (
APPOTfRLModule as RLModule,
)
else:
raise ValueError(f"The framework {self.framework_str} is not supported.")

from ray.rllib.algorithms.appo.appo_catalog import APPOCatalog

return SingleAgentRLModuleSpec(module_class=RLModule, catalog_class=APPOCatalog)

@override(ImpalaConfig)
def get_learner_hyperparameters(self) -> AppoHyperparameters:
base_hps = super().get_learner_hyperparameters()
Expand All @@ -241,6 +250,9 @@ def get_learner_hyperparameters(self) -> AppoHyperparameters:
kl_coeff=self.kl_coeff,
clip_param=self.clip_param,
tau=self.tau,
target_update_frequency_ts=(
self.train_batch_size * self.num_sgd_iter * self.target_update_frequency
),
**dataclasses.asdict(base_hps),
)

Expand Down Expand Up @@ -289,43 +301,14 @@ def after_train_step(self, train_results: ResultDict) -> None:
training step.
"""

last_update = self._counters[LAST_TARGET_UPDATE_TS]

if self.config._enable_learner_api and train_results:
# using steps trained here instead of sampled ... I'm not sure why the
# other implemenetation uses sampled.
# to be quite frank, im not sure if I understand how their target update
# freq would work. The difference in steps sampled/trained is pretty
# much always going to be larger than self.config.num_sgd_iter *
# self.config.minibatch_buffer_size unless the number of steps collected
# is really small. The thing is that the default rollout fragment length
# is 50, so the minibatch buffer size * num_sgd_iter is going to be
# have to be 50 to even meet the threshold of having delayed target
# updates.
# we should instead have the target / kl threshold update be based off
# of the train_batch_size * some target update frequency * num_sgd_iter.
cur_ts = self._counters[
NUM_ENV_STEPS_TRAINED
if self.config.count_steps_by == "env_steps"
else NUM_AGENT_STEPS_TRAINED
]
target_update_steps_freq = (
self.config.train_batch_size
* self.config.num_sgd_iter
* self.config.target_update_frequency
)
if (cur_ts - last_update) >= target_update_steps_freq:
kls_to_update = {}
for module_id, module_results in train_results.items():
if module_id != ALL_MODULES:
kls_to_update[module_id] = module_results[LEARNER_STATS_KEY][
LEARNER_RESULTS_KL_KEY
]
self._counters[NUM_TARGET_UPDATES] += 1
self._counters[LAST_TARGET_UPDATE_TS] = cur_ts
self.learner_group.additional_update(sampled_kls=kls_to_update)

if self.config._enable_learner_api:
if NUM_TARGET_UPDATES in train_results:
self._counters[NUM_TARGET_UPDATES] += train_results[NUM_TARGET_UPDATES]
self._counters[LAST_TARGET_UPDATE_TS] = train_results[
LAST_TARGET_UPDATE_TS
]
else:
last_update = self._counters[LAST_TARGET_UPDATE_TS]
cur_ts = self._counters[
NUM_AGENT_STEPS_SAMPLED
if self.config.count_steps_by == "agent_steps"
Expand Down Expand Up @@ -367,6 +350,17 @@ def update(pi, pi_id):
# Worker.
self.workers.local_worker().foreach_policy_to_train(update)

@override(Impala)
def _get_additional_update_kwargs(self, train_results) -> dict:
return dict(
last_update=self._counters[LAST_TARGET_UPDATE_TS],
mean_kl_loss_per_module={
mid: r[LEARNER_STATS_KEY][LEARNER_RESULTS_KL_KEY]
for mid, r in train_results.items()
if mid != ALL_MODULES
},
)

@override(Impala)
def training_step(self) -> ResultDict:
train_results = super().training_step()
Expand All @@ -388,10 +382,11 @@ def get_default_policy_class(
) -> Optional[Type[Policy]]:
if config["framework"] == "torch":
if config._enable_rl_module_api:
raise ValueError(
"APPO with the torch backend is not yet supported by "
" the RLModule and Learner API."
from ray.rllib.algorithms.appo.torch.appo_torch_policy_rlm import (
APPOTorchPolicyWithRLModule,
)

return APPOTorchPolicyWithRLModule
else:
from ray.rllib.algorithms.appo.appo_torch_policy import APPOTorchPolicy

Expand Down
70 changes: 49 additions & 21 deletions rllib/algorithms/appo/appo_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
from dataclasses import dataclass
from typing import Any, Dict, Mapping

import numpy as np

from ray.rllib.algorithms.impala.impala_learner import (
ImpalaLearner,
ImpalaHyperparameters,
)
from ray.rllib.core.rl_module.marl_module import ModuleID
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import get_variable
from ray.rllib.utils.metrics import LAST_TARGET_UPDATE_TS, NUM_TARGET_UPDATES


LEARNER_RESULTS_KL_KEY = "mean_kl_loss"
Expand All @@ -35,6 +33,7 @@ class to configure your algorithm.
kl_target: float = None
clip_param: float = None
tau: float = None
target_update_frequency_ts: int = None


class AppoLearner(ImpalaLearner):
Expand All @@ -46,34 +45,63 @@ class AppoLearner(ImpalaLearner):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Create framework-specific variables (simple python vars for torch).
self.kl_coeffs = defaultdict(
lambda: get_variable(
self._hps.kl_coeff,
framework=self.framework,
trainable=False,
dtype=np.float32,
)

# We need to make sure kl_coeff are available as framework tensors that are
# registered as part of the graph so that upon update the graph can be updated
# (e.g. in TF with eager tracing).
self.curr_kl_coeffs_per_module = defaultdict(
lambda: self._get_tensor_variable(self.hps.kl_coeff)
)

@override(ImpalaLearner)
def remove_module(self, module_id: str):
super().remove_module(module_id)
self.kl_coeffs.pop(module_id)
self.curr_kl_coeffs_per_module.pop(module_id)

@override(ImpalaLearner)
def additional_update_per_module(
self, module_id: ModuleID, sampled_kls: Dict[ModuleID, float], **kwargs
self,
module_id: ModuleID,
*,
last_update: int,
mean_kl_loss_per_module: dict,
timestep: int,
**kwargs,
) -> Mapping[str, Any]:
"""Updates the target networks and KL loss coefficients (per module).
Args:
module_id:
"""
self._update_module_target_networks(module_id)
if self._hps.use_kl_loss:
self._update_module_kl_coeff(module_id, sampled_kls)
return {}
# TODO (avnish) Using steps trained here instead of sampled ... I'm not sure
# why the other implementation uses sampled.
# The difference in steps sampled/trained is pretty
# much always going to be larger than self.config.num_sgd_iter *
# self.config.minibatch_buffer_size unless the number of steps collected
# is really small. The thing is that the default rollout fragment length
# is 50, so the minibatch buffer size * num_sgd_iter is going to be
# have to be 50 to even meet the threshold of having delayed target
# updates.
# We should instead have the target / kl threshold update be based off
# of the train_batch_size * some target update frequency * num_sgd_iter.
results = super().additional_update_per_module(module_id, timestep=timestep)

if (timestep - last_update) >= self.hps.target_update_frequency_ts:
self._update_module_target_networks(module_id)
results[NUM_TARGET_UPDATES] = 1
results[LAST_TARGET_UPDATE_TS] = timestep
else:
results[NUM_TARGET_UPDATES] = 0
results[LAST_TARGET_UPDATE_TS] = last_update

if self.hps.use_kl_loss and module_id in mean_kl_loss_per_module:
results.update(
self._update_module_kl_coeff(
module_id, mean_kl_loss_per_module[module_id]
)
)

return results

@abc.abstractmethod
def _update_module_target_networks(self, module_id: ModuleID) -> None:
Expand All @@ -88,7 +116,7 @@ def _update_module_target_networks(self, module_id: ModuleID) -> None:
@abc.abstractmethod
def _update_module_kl_coeff(
self, module_id: ModuleID, sampled_kls: Dict[ModuleID, float]
) -> None:
) -> Mapping[str, Any]:
"""Dynamically update the KL loss coefficients of each module with.
The update is completed using the mean KL divergence between the action
Expand All @@ -97,7 +125,7 @@ def _update_module_kl_coeff(
Args:
module_id: The module whose KL loss coefficient to update.
sampled_kls: The KL divergence between the action distributions of
the current policy and old policy of each module.
sampled_kls: Mapping from Module ID to this module's KL divergence between
the action distributions of the current (most recently updated) module
and the old module version.
"""
File renamed without changes.
Loading

0 comments on commit adfdbbd

Please sign in to comment.