Skip to content

Commit

Permalink
[RLlib] Remove deprecated execution plan API code. (ray-project#29941)
Browse files Browse the repository at this point in the history
  • Loading branch information
sven1977 committed Nov 2, 2022
1 parent 2511529 commit d9b92eb
Show file tree
Hide file tree
Showing 24 changed files with 61 additions and 1,885 deletions.
7 changes: 0 additions & 7 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2285,13 +2285,6 @@ py_test(
args = ["TestEagerSupportOffPolicy"]
)

py_test(
name = "tests/test_execution",
tags = ["team:rllib", "tests_dir"],
size = "medium",
srcs = ["tests/test_execution.py"]
)

py_test(
name = "tests/test_filters",
tags = ["team:rllib", "tests_dir"],
Expand Down
5 changes: 1 addition & 4 deletions rllib/algorithms/a2c/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ def setup(self, config: PartialAlgorithmConfigDict):
# Create a microbatch variable for collecting gradients on microbatches'.
# These gradients will be accumulated on-the-fly and applied at once (once train
# batch size has been collected) to the model.
if (
self.config["_disable_execution_plan_api"] is True
and self.config["microbatch_size"]
):
if self.config["microbatch_size"]:
self._microbatches_grads = None
self._microbatches_counts = self._num_microbatches = 0

Expand Down
15 changes: 6 additions & 9 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ class Algorithm(Trainable):
You can write your own Algorithm classes by sub-classing from `Algorithm`
or any of its built-in sub-classes.
This allows you to override the `execution_plan` method to implement
This allows you to override the `training_step` method to implement
your own algorithm logic. You can find the different built-in
algorithms' execution plans in their respective main py files,
algorithms' `training_step()` methods in their respective main .py files,
e.g. rllib.algorithms.dqn.dqn.py or rllib.algorithms.impala.impala.py.
The most important API methods a Algorithm exposes are `train()`,
Expand Down Expand Up @@ -2325,7 +2325,7 @@ def validate_config(

if simple_optim_setting is True:
pass
# Multi-GPU setting: Must use MultiGPUTrainOneStep.
# Multi-GPU setting: Must use multi_gpu_train_one_step.
elif config.get("num_gpus", 0) > 1:
# TODO: AlphaStar uses >1 GPUs differently (1 per policy actor), so this is
# ok for tf2 here.
Expand All @@ -2342,8 +2342,8 @@ def validate_config(
)
config["simple_optimizer"] = False
# Auto-setting: Use simple-optimizer for tf-eager or multiagent,
# otherwise: MultiGPUTrainOneStep (if supported by the algo's execution
# plan).
# otherwise: multi_gpu_train_one_step (if supported by the algo's
# `training_step()` method).
elif simple_optim_setting == DEPRECATED_VALUE:
# tf-eager: Must use simple optimizer.
if framework not in ["tf", "torch"]:
Expand Down Expand Up @@ -3137,10 +3137,7 @@ def should_stop(self, results):
# a (tolerable) failure.
return False

# Stopping criteria: Only when using the `training_iteration`
# API, b/c for the `exec_plan` API, the logic to stop is
# already built into the execution plans via the
# `StandardMetricsReporting` op.
# Stopping criteria.
elif self.algo.config["_disable_execution_plan_api"]:
if self.algo._by_agent_steps:
self.sampled = (
Expand Down
56 changes: 0 additions & 56 deletions rllib/algorithms/alpha_zero/alpha_zero.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,13 @@
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from ray.rllib.algorithms.algorithm import Algorithm
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.execution.replay_ops import (
SimpleReplayBuffer,
Replay,
StoreToReplayBuffer,
WaitUntilTimestepsElapsed,
)
from ray.rllib.execution.rollout_ops import (
ParallelRollouts,
ConcatBatches,
synchronous_parallel_sample,
)
from ray.rllib.execution.concurrency_ops import Concurrently
from ray.rllib.execution.train_ops import (
multi_gpu_train_one_step,
train_one_step,
TrainOneStep,
)
from ray.rllib.execution.metric_ops import StandardMetricsReporting
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.models.modelv2 import restore_original_dimensions
from ray.rllib.models.torch.torch_action_dist import TorchCategorical
Expand All @@ -38,7 +26,6 @@
)
from ray.rllib.utils.replay_buffers.utils import validate_buffer_config
from ray.rllib.utils.typing import ResultDict, AlgorithmConfigDict
from ray.util.iter import LocalIterator

from ray.rllib.algorithms.alpha_zero.alpha_zero_policy import AlphaZeroPolicy
from ray.rllib.algorithms.alpha_zero.mcts import MCTS
Expand Down Expand Up @@ -406,49 +393,6 @@ def training_step(self) -> ResultDict:
# Return all collected metrics for the iteration.
return train_results

@staticmethod
@override(Algorithm)
def execution_plan(
workers: WorkerSet, config: AlgorithmConfigDict, **kwargs
) -> LocalIterator[dict]:
assert (
len(kwargs) == 0
), "Alpha zero execution_plan does NOT take any additional parameters"

rollouts = ParallelRollouts(workers, mode="bulk_sync")

if config["simple_optimizer"]:
train_op = rollouts.combine(
ConcatBatches(
min_batch_size=config["train_batch_size"],
count_steps_by=config["multiagent"]["count_steps_by"],
)
).for_each(TrainOneStep(workers, num_sgd_iter=config["num_sgd_iter"]))
else:
replay_buffer = SimpleReplayBuffer(config["buffer_size"])

store_op = rollouts.for_each(
StoreToReplayBuffer(local_buffer=replay_buffer)
)

replay_op = (
Replay(local_buffer=replay_buffer)
.filter(WaitUntilTimestepsElapsed(config["learning_starts"]))
.combine(
ConcatBatches(
min_batch_size=config["train_batch_size"],
count_steps_by=config["multiagent"]["count_steps_by"],
)
)
.for_each(TrainOneStep(workers, num_sgd_iter=config["num_sgd_iter"]))
)

train_op = Concurrently(
[store_op, replay_op], mode="round_robin", output_indexes=[1]
)

return StandardMetricsReporting(train_op, workers, config)


# Deprecated: Use ray.rllib.algorithms.alpha_zero.AlphaZeroConfig instead!
class _deprecated_default_config(dict):
Expand Down
10 changes: 0 additions & 10 deletions rllib/algorithms/apex_ddpg/apex_ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.algorithms.apex_dqn.apex_dqn import ApexDQN
from ray.rllib.algorithms.ddpg.ddpg import DDPG, DDPGConfig
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.utils.annotations import override
from ray.rllib.utils.deprecation import DEPRECATED_VALUE, Deprecated
from ray.rllib.utils.typing import (
PartialAlgorithmConfigDict,
ResultDict,
)
from ray.util.iter import LocalIterator


class ApexDDPGConfig(DDPGConfig):
Expand Down Expand Up @@ -200,14 +198,6 @@ def on_worker_failures(
)
self._sampling_actor_manager.add_workers(new_workers)

@staticmethod
@override(DDPG)
def execution_plan(
workers: WorkerSet, config: dict, **kwargs
) -> LocalIterator[dict]:
"""Use APEX-DQN's execution plan."""
return ApexDQN.execution_plan(workers, config, **kwargs)


# Deprecated: Use ray.rllib.algorithms.apex_ddpg.ApexDDPGConfig instead!
class _deprecated_default_config(dict):
Expand Down
9 changes: 1 addition & 8 deletions rllib/algorithms/appo/appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,9 @@ def __init__(self, config, *args, **kwargs):

@override(Impala)
def setup(self, config: PartialAlgorithmConfigDict):
# Before init: Add the update target and kl hook.
# This hook is called explicitly after each learner step in the
# execution setup for IMPALA.
if config.get("_disable_execution_plan_api", True) is False:
config["after_train_step"] = UpdateTargetAndKL

super().setup(config)

if self.config["_disable_execution_plan_api"] is True:
self.update_kl = UpdateKL(self.workers)
self.update_kl = UpdateKL(self.workers)

def after_train_step(self, train_results: ResultDict) -> None:
"""Updates the target network and the KL coefficient for the APPO-loss.
Expand Down
49 changes: 24 additions & 25 deletions rllib/algorithms/ddppo/ddppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,31 +258,30 @@ def setup(self, config: PartialAlgorithmConfigDict):
super().setup(config)

# Initialize torch process group for
if self.config["_disable_execution_plan_api"] is True:
self._curr_learner_info = {}
ip = ray.get(self.workers.remote_workers()[0].get_node_ip.remote())
port = ray.get(self.workers.remote_workers()[0].find_free_port.remote())
address = "tcp:https://{ip}:{port}".format(ip=ip, port=port)
logger.info("Creating torch process group with leader {}".format(address))

# Get setup tasks in order to throw errors on failure.
ray.get(
[
worker.setup_torch_data_parallel.remote(
url=address,
world_rank=i,
world_size=len(self.workers.remote_workers()),
backend=self.config["torch_distributed_backend"],
)
for i, worker in enumerate(self.workers.remote_workers())
]
)
logger.info("Torch process group init completed")
self._ddppo_worker_manager = AsyncRequestsManager(
self.workers.remote_workers(),
max_remote_requests_in_flight_per_worker=1,
ray_wait_timeout_s=0.03,
)
self._curr_learner_info = {}
ip = ray.get(self.workers.remote_workers()[0].get_node_ip.remote())
port = ray.get(self.workers.remote_workers()[0].find_free_port.remote())
address = "tcp:https://{ip}:{port}".format(ip=ip, port=port)
logger.info("Creating torch process group with leader {}".format(address))

# Get setup tasks in order to throw errors on failure.
ray.get(
[
worker.setup_torch_data_parallel.remote(
url=address,
world_rank=i,
world_size=len(self.workers.remote_workers()),
backend=self.config["torch_distributed_backend"],
)
for i, worker in enumerate(self.workers.remote_workers())
]
)
logger.info("Torch process group init completed")
self._ddppo_worker_manager = AsyncRequestsManager(
self.workers.remote_workers(),
max_remote_requests_in_flight_per_worker=1,
ray_wait_timeout_s=0.03,
)

@override(PPO)
def training_step(self) -> ResultDict:
Expand Down
53 changes: 8 additions & 45 deletions rllib/algorithms/dreamer/dreamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.algorithms.dreamer.dreamer_model import DreamerModel
from ray.rllib.execution.rollout_ops import (
ParallelRollouts,
synchronous_parallel_sample,
)
from ray.rllib.utils.annotations import override
Expand Down Expand Up @@ -332,55 +331,19 @@ def get_default_policy_class(self, config: AlgorithmConfigDict):
@override(Algorithm)
def setup(self, config: PartialAlgorithmConfigDict):
super().setup(config)
# `training_iteration` implementation: Setup buffer in `setup`, not
# in `execution_plan` (deprecated).
if self.config["_disable_execution_plan_api"] is True:
self.local_replay_buffer = EpisodeSequenceBuffer(
replay_sequence_length=config["batch_length"]
)

# Prefill episode buffer with initial exploration (uniform sampling)
while (
total_sampled_timesteps(self.workers.local_worker())
< self.config["prefill_timesteps"]
):
samples = self.workers.local_worker().sample()
self.local_replay_buffer.add(samples)

@staticmethod
@override(Algorithm)
def execution_plan(workers, config, **kwargs):
assert (
len(kwargs) == 0
), "Dreamer execution_plan does NOT take any additional parameters"

# Special replay buffer for Dreamer agent.
episode_buffer = EpisodeSequenceBuffer(
# Setup buffer.
self.local_replay_buffer = EpisodeSequenceBuffer(
replay_sequence_length=config["batch_length"]
)

local_worker = workers.local_worker()

# Prefill episode buffer with initial exploration (uniform sampling)
while total_sampled_timesteps(local_worker) < config["prefill_timesteps"]:
samples = local_worker.sample()
episode_buffer.add(samples)

batch_size = config["batch_size"]
dreamer_train_iters = config["dreamer_train_iters"]
act_repeat = config["action_repeat"]

rollouts = ParallelRollouts(workers)
rollouts = rollouts.for_each(
DreamerIteration(
local_worker,
episode_buffer,
dreamer_train_iters,
batch_size,
act_repeat,
)
)
return rollouts
while (
total_sampled_timesteps(self.workers.local_worker())
< self.config["prefill_timesteps"]
):
samples = self.workers.local_worker().sample()
self.local_replay_buffer.add(samples)

@override(Algorithm)
def training_step(self) -> ResultDict:
Expand Down
30 changes: 0 additions & 30 deletions rllib/algorithms/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
from ray.rllib.execution.learner_thread import LearnerThread
from ray.rllib.execution.multi_gpu_learner_thread import MultiGPULearnerThread
from ray.rllib.execution.parallel_requests import AsyncRequestsManager
from ray.rllib.execution.replay_ops import MixInReplay
from ray.rllib.execution.rollout_ops import ConcatBatches, ParallelRollouts
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.sample_batch import concat_samples
from ray.rllib.utils.actors import create_colocated_actors
Expand Down Expand Up @@ -380,34 +378,6 @@ def make_learner_thread(local_worker, config):
return learner_thread


def gather_experiences_directly(workers, config):
rollouts = ParallelRollouts(
workers,
mode="async",
num_async=config["max_requests_in_flight_per_sampler_worker"],
)

# Augment with replay and concat to desired train batch size.
train_batches = (
rollouts.for_each(lambda batch: batch.decompress_if_needed())
.for_each(
MixInReplay(
num_slots=config["replay_buffer_num_slots"],
replay_proportion=config["replay_proportion"],
)
)
.flatten()
.combine(
ConcatBatches(
min_batch_size=config["train_batch_size"],
count_steps_by=config["multiagent"]["count_steps_by"],
)
)
)

return train_batches


# Update worker weights as they finish generating experiences.
class BroadcastUpdateLearnerWeights:
def __init__(self, learner_thread, workers, broadcast_interval):
Expand Down
4 changes: 2 additions & 2 deletions rllib/algorithms/maddpg/maddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ def get_default_config(cls) -> AlgorithmConfig:
def validate_config(self, config: AlgorithmConfigDict) -> None:
"""Adds the `before_learn_on_batch` hook to the config.
This hook is called explicitly prior to TrainOneStep() in the execution
setups for DQN and APEX.
This hook is called explicitly prior to `train_one_step()` in the
`training_step` methods of DQN and APEX-DQN.
"""
# Call super's validation method.
super().validate_config(config)
Expand Down
4 changes: 2 additions & 2 deletions rllib/algorithms/pg/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class PG(Algorithm):
https://docs.ray.io/en/master/rllib-algorithms.html#pg
Only overrides the default config- and policy selectors
(`get_default_policy` and `get_default_config`). Utilizes
the default `execution_plan()` of `Trainer`.
(`get_default_policy_class` and `get_default_config`). Utilizes
the default `training_step()` method of `Algorithm`.
"""

@classmethod
Expand Down
Loading

0 comments on commit d9b92eb

Please sign in to comment.