From a4ab4e0114b44d353283d7bf5067c77b67d47faa Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 16 Dec 2020 22:16:08 +0100 Subject: [PATCH 01/18] WIP. --- rllib/agents/dqn/dqn.py | 44 ++++++++++++++++++++++++++++-- rllib/agents/dqn/dqn_tf_policy.py | 8 +++--- rllib/agents/dqn/tests/test_dqn.py | 29 +++++++++++++++++++- rllib/policy/dynamic_tf_policy.py | 8 ++++-- 4 files changed, 80 insertions(+), 9 deletions(-) diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index 73d24e2bb5a19..3adcf3220fb05 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -22,7 +22,8 @@ from ray.rllib.execution.replay_buffer import LocalReplayBuffer from ray.rllib.execution.replay_ops import Replay, StoreToReplayBuffer from ray.rllib.execution.rollout_ops import ParallelRollouts -from ray.rllib.execution.train_ops import TrainOneStep, UpdateTargetNetwork +from ray.rllib.execution.train_ops import TrainOneStep, UpdateTargetNetwork, \ + TrainTFMultiGPU from ray.rllib.policy.policy import LEARNER_STATS_KEY, Policy from ray.rllib.utils.typing import TrainerConfigDict from ray.util.iter import LocalIterator @@ -132,6 +133,13 @@ "worker_side_prioritization": False, # Prevent iterations from going lower than this time span "min_iter_time_s": 1, + + # TODO: Experimental. + "simple_optimizer": False, + # Whether to fake GPUs (using CPUs). + # Set this to True for debugging on non-GPU machines (set `num_gpus` > 0). + "_fake_gpus": False, + }) # __sphinx_doc_end__ # yapf: enable @@ -166,6 +174,21 @@ def validate_config(config: TrainerConfigDict) -> None: raise ValueError("Prioritized replay is not supported when " "replay_sequence_length > 1.") + # Multi-gpu not supported for PyTorch and tf-eager. + if config["framework"] in ["tf2", "tfe", "torch"]: + config["simple_optimizer"] = True + # Performance warning, if "simple" optimizer used with (static-graph) tf. + elif config["simple_optimizer"]: + logger.warning( + "Using the simple minibatch optimizer. This will significantly " + "reduce performance, consider simple_optimizer=False.") + # Multi-agent mode and multi-GPU optimizer. + elif config["multiagent"]["policies"] and not config["simple_optimizer"]: + logger.info( + "In multi-agent mode, policies will be optimized sequentially " + "by the multi-GPU optimizer. Consider setting " + "simple_optimizer=True if this doesn't work for you.") + def execution_plan(workers: WorkerSet, config: TrainerConfigDict) -> LocalIterator[dict]: @@ -225,9 +248,26 @@ def update_prio(item): # returned from the LocalReplay() iterator is passed to TrainOneStep to # take a SGD step, and then we decide whether to update the target network. post_fn = config.get("before_learn_on_batch") or (lambda b, *a: b) + + if config["simple_optimizer"]: + train_op = TrainOneStep(workers) + else: + train_op = TrainTFMultiGPU( + workers, + sgd_minibatch_size=config["train_batch_size"], + num_sgd_iter=1, + num_gpus=config["num_gpus"], + rollout_fragment_length="doesnt_matter", # :) config["rollout_fragment_length"], + num_envs_per_worker="doesnt_matter", # :) config["num_envs_per_worker"], + train_batch_size="doesnt_matter", # :) + shuffle_sequences=True,#DQN: always shuffle, no sequences. + _fake_gpus=config.get("_fake_gpus", False), + framework=config.get("framework")) + + replay_op = Replay(local_buffer=local_replay_buffer) \ .for_each(lambda x: post_fn(x, workers, config)) \ - .for_each(TrainOneStep(workers)) \ + .for_each(train_op) \ .for_each(update_prio) \ .for_each(UpdateTargetNetwork( workers, config["target_network_update_freq"])) diff --git a/rllib/agents/dqn/dqn_tf_policy.py b/rllib/agents/dqn/dqn_tf_policy.py index d1d6d4570ea17..48cd432c98735 100644 --- a/rllib/agents/dqn/dqn_tf_policy.py +++ b/rllib/agents/dqn/dqn_tf_policy.py @@ -164,7 +164,7 @@ def build_q_model(policy: Policy, obs_space: gym.spaces.Space, else: num_outputs = action_space.n - policy.q_model = ModelCatalog.get_model_v2( + q_model = ModelCatalog.get_model_v2( obs_space=obs_space, action_space=action_space, num_outputs=num_outputs, @@ -206,7 +206,7 @@ def build_q_model(policy: Policy, obs_space: gym.spaces.Space, getattr(policy, "exploration", None), ParameterNoise) or config["exploration_config"]["type"] == "ParameterNoise") - return policy.q_model + return q_model def get_distribution_inputs_and_class(policy: Policy, @@ -239,7 +239,7 @@ def build_q_losses(policy: Policy, model, _, # q network evaluation q_t, q_logits_t, q_dist_t = compute_q_values( policy, - policy.q_model, + model, train_batch[SampleBatch.CUR_OBS], explore=False) @@ -263,7 +263,7 @@ def build_q_losses(policy: Policy, model, _, if config["double_q"]: q_tp1_using_online_net, q_logits_tp1_using_online_net, \ q_dist_tp1_using_online_net = compute_q_values( - policy, policy.q_model, + policy, model, train_batch[SampleBatch.NEXT_OBS], explore=False) q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1) diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py index 5f8fe368aa4d5..61054a4a40477 100644 --- a/rllib/agents/dqn/tests/test_dqn.py +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -1,3 +1,4 @@ +import copy import numpy as np import unittest @@ -10,7 +11,7 @@ class TestDQN(unittest.TestCase): @classmethod def setUpClass(cls) -> None: - ray.init() + ray.init(local_mode=True)#TODO @classmethod def tearDownClass(cls) -> None: @@ -50,6 +51,32 @@ def test_dqn_compilation(self): check_compute_single_action(trainer) trainer.stop() + def test_ppo_fake_multi_gpu_learning(self): + """Test whether PPOTrainer can learn CartPole w/ faked multi-GPU.""" + config = copy.deepcopy(dqn.DEFAULT_CONFIG) + # Fake GPU setup. + config["num_gpus"] = 2 + config["_fake_gpus"] = True + config["framework"] = "tf" + # Mimick tuned_example for PPO CartPole. + config["num_workers"] = 1 + config["lr"] = 0.0003 + #config["observation_filter"] = "MeanStdFilter" + config["model"]["fcnet_hiddens"] = [32] + config["model"]["fcnet_activation"] = "linear" + + trainer = dqn.DQNTrainer(config=config, env="CartPole-v0") + num_iterations = 200 + learnt = False + for i in range(num_iterations): + results = trainer.train() + print(results) + if results["episode_reward_mean"] > 150: + learnt = True + break + assert learnt, "DQN multi-GPU (with fake-GPUs) did not learn CartPole!" + trainer.stop() + def test_dqn_exploration_and_soft_q_config(self): """Tests, whether a DQN Agent outputs exploration/softmaxed actions.""" config = dqn.DEFAULT_CONFIG.copy() diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index 432e384f27fdc..cd04715765d4c 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -160,7 +160,11 @@ def __init__( # Setup self.model. if existing_model: - self.model = existing_model + if isinstance(existing_model, list): + self.model = existing_model[0] + #TODO: (sven) total hack, but works for `target_q_model` + for i in range(1, len(existing_model)): + setattr(self, existing_model[i][0], existing_model[i][1]) elif make_model: self.model = make_model(self, obs_space, action_space, config) else: @@ -366,7 +370,7 @@ def copy(self, self.action_space, self.config, existing_inputs=input_dict, - existing_model=self.model) + existing_model=[self.model, ("target_q_model", getattr(self, "target_q_model"))]) instance._loss_input_dict = input_dict loss = instance._do_loss_init(input_dict) From cc31641971db03172700f6bb93af76aaf722c239 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Sat, 19 Dec 2020 14:54:12 +0100 Subject: [PATCH 02/18] WIP. --- rllib/agents/dqn/tests/test_dqn.py | 2 +- rllib/execution/train_ops.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py index 61054a4a40477..f23d78a82da33 100644 --- a/rllib/agents/dqn/tests/test_dqn.py +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -51,7 +51,7 @@ def test_dqn_compilation(self): check_compute_single_action(trainer) trainer.stop() - def test_ppo_fake_multi_gpu_learning(self): + def test_dqn_fake_multi_gpu_learning(self): """Test whether PPOTrainer can learn CartPole w/ faked multi-GPU.""" config = copy.deepcopy(dqn.DEFAULT_CONFIG) # Fake GPU setup. diff --git a/rllib/execution/train_ops.py b/rllib/execution/train_ops.py index e2411ed3279a9..c8d71292533eb 100644 --- a/rllib/execution/train_ops.py +++ b/rllib/execution/train_ops.py @@ -207,6 +207,8 @@ def __call__(self, self.sess, permutation[batch_index] * self.per_device_batch_size) for k, v in batch_fetches[LEARNER_STATS_KEY].items(): + TODO: multi-GPU optimizer here does not collect td_error (which is stored outside LEARNER_STATS_KEY) + that's why it doesn't show up in the returned fetches. iter_extra_fetches[k].append(v) if logger.getEffectiveLevel() <= logging.DEBUG: avg = averaged(iter_extra_fetches) From 67cf4d38f17bd905026239840b9f2f43445b4eb2 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Sun, 20 Dec 2020 04:32:05 -0500 Subject: [PATCH 03/18] WIP. --- rllib/agents/dqn/dqn.py | 9 +++------ rllib/agents/ppo/ppo.py | 12 +++++++----- rllib/execution/multi_gpu_impl.py | 5 +++-- rllib/execution/train_ops.py | 29 +++++++++++++++++++++++------ rllib/policy/dynamic_tf_policy.py | 2 +- 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index 3adcf3220fb05..7a68f5cb394e7 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -233,12 +233,9 @@ def update_prio(item): if config.get("prioritized_replay"): prio_dict = {} for policy_id, info in info_dict.items(): - # TODO(sven): This is currently structured differently for - # torch/tf. Clean up these results/info dicts across - # policies (note: fixing this in torch_policy.py will - # break e.g. DDPPO!). - td_error = info.get("td_error", - info[LEARNER_STATS_KEY].get("td_error")) + #TODO: check, whether correct now and resolve below disambiguation + td_error = info.get("td_error") + #info[LEARNER_STATS_KEY].get("td_error")) prio_dict[policy_id] = (samples.policy_batches[policy_id] .data.get("batch_indexes"), td_error) local_replay_buffer.update_priorities(prio_dict) diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index 0269882013f2a..8657c404ecc8e 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -20,7 +20,7 @@ StandardizeFields, SelectExperiences from ray.rllib.execution.train_ops import TrainOneStep, TrainTFMultiGPU from ray.rllib.execution.metric_ops import StandardMetricsReporting -from ray.rllib.policy.policy import Policy +from ray.rllib.policy.policy import LEARNER_STATS_KEY, Policy from ray.rllib.utils.typing import TrainerConfigDict from ray.util.iter import LocalIterator @@ -174,12 +174,14 @@ def __init__(self, workers): def __call__(self, fetches): def update(pi, pi_id): - assert "kl" not in fetches, ( - "kl should be nested under policy id key", fetches) + assert LEARNER_STATS_KEY not in fetches, \ + ("{} should be nested under policy id key".format( + LEARNER_STATS_KEY), fetches) if pi_id in fetches: - assert "kl" in fetches[pi_id], (fetches, pi_id) + kl = fetches[pi_id].get(LEARNER_STATS_KEY, {}).get("kl") + assert kl is not None, (fetches, pi_id) # Make the actual `Policy.update_kl()` call. - pi.update_kl(fetches[pi_id]["kl"]) + pi.update_kl(kl) else: logger.warning("No data for {}, not updating kl".format(pi_id)) diff --git a/rllib/execution/multi_gpu_impl.py b/rllib/execution/multi_gpu_impl.py index 340fa8a84dd61..c9de265b9e552 100644 --- a/rllib/execution/multi_gpu_impl.py +++ b/rllib/execution/multi_gpu_impl.py @@ -251,8 +251,9 @@ def optimize(self, sess, batch_index): feed_dict.update(tower.loss_graph.extra_compute_grad_feed_dict()) fetches = {"train": self._train_op} - for tower in self._towers: - fetches.update(tower.loss_graph._get_grad_and_stats_fetches()) + for tower_num, tower in enumerate(self._towers): + tower_fetch = tower.loss_graph._get_grad_and_stats_fetches() + fetches["tower_{}".format(tower_num)] = tower_fetch return sess.run(fetches, feed_dict=feed_dict) diff --git a/rllib/execution/train_ops.py b/rllib/execution/train_ops.py index c8d71292533eb..79096e1711f53 100644 --- a/rllib/execution/train_ops.py +++ b/rllib/execution/train_ops.py @@ -2,6 +2,7 @@ import logging import numpy as np import math +import tree from typing import List, Tuple, Any import ray @@ -206,14 +207,30 @@ def __call__(self, batch_fetches = optimizer.optimize( self.sess, permutation[batch_index] * self.per_device_batch_size) - for k, v in batch_fetches[LEARNER_STATS_KEY].items(): - TODO: multi-GPU optimizer here does not collect td_error (which is stored outside LEARNER_STATS_KEY) - that's why it doesn't show up in the returned fetches. - iter_extra_fetches[k].append(v) + + #def mapping_fn(*s): + # s + + iter_extra_fetches = tree.map_structure( + lambda *s: np.array(s), + *(batch_fetches["tower_{}".format(i)] for i in range(len(self.devices)))) + + #for k, v in batch_fetches.items(): + # if k == LEARNER_STATS_KEY: + # for k, v in batch_fetches[LEARNER_STATS_KEY].items(): + # #TODO: multi-GPU optimizer here does not collect td_error (which is stored outside LEARNER_STATS_KEY) + # #that's why it doesn't show up in the returned fetches. + # iter_extra_fetches[k].append(v) + # elif k == "train_op": + # continue + # else: + # iter_extra_fetches[k].append(v) if logger.getEffectiveLevel() <= logging.DEBUG: - avg = averaged(iter_extra_fetches) + avg = tree.map_structure_with_path(lambda p, s: np.nanmean(s, axis=0) if p[0] != "td_error" else np.concatenate(s, axis=0), iter_extra_fetches) logger.debug("{} {}".format(i, avg)) - fetches[policy_id] = averaged(iter_extra_fetches, axis=0) + fetches[policy_id] = tree.map_structure_with_path(lambda p, s: np.nanmean(s, axis=0) if p[0] != "td_error" else np.concatenate(s, axis=0), iter_extra_fetches) + #fetches[policy_id] = tree.map_structure(lambda s: np.nanmean(s, axis=0), iter_extra_fetches)#averaged(iter_extra_fetches, axis=0) + #fetches[policy_id] = tree.unflatten_as() load_timer.push_units_processed(samples.count) learn_timer.push_units_processed(samples.count) diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index cd04715765d4c..f9db05e012a20 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -370,7 +370,7 @@ def copy(self, self.action_space, self.config, existing_inputs=input_dict, - existing_model=[self.model, ("target_q_model", getattr(self, "target_q_model"))]) + existing_model=[self.model, ("target_q_model", getattr(self, "target_q_model", None))]) instance._loss_input_dict = input_dict loss = instance._do_loss_init(input_dict) From 270cc6b547ee3b3e57f5f65fe4286c987dcc1223 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 13 Jan 2021 08:18:53 +0100 Subject: [PATCH 04/18] WIP. --- rllib/policy/sample_batch.py | 646 ----------------------------------- 1 file changed, 646 deletions(-) delete mode 100644 rllib/policy/sample_batch.py diff --git a/rllib/policy/sample_batch.py b/rllib/policy/sample_batch.py deleted file mode 100644 index 162c62499ad03..0000000000000 --- a/rllib/policy/sample_batch.py +++ /dev/null @@ -1,646 +0,0 @@ -import collections -import numpy as np -import sys -import itertools -from typing import Dict, Iterable, List, Optional, Set, Union - -from ray.rllib.utils.annotations import PublicAPI, DeveloperAPI -from ray.rllib.utils.compression import pack, unpack, is_compressed -from ray.rllib.utils.memory import concat_aligned -from ray.rllib.utils.typing import PolicyID, TensorType - -# Default policy id for single agent environments -DEFAULT_POLICY_ID = "default_policy" - - -@PublicAPI -class SampleBatch: - """Wrapper around a dictionary with string keys and array-like values. - - For example, {"obs": [1, 2, 3], "reward": [0, -1, 1]} is a batch of three - samples, each with an "obs" and "reward" attribute. - """ - - # Outputs from interacting with the environment - OBS = "obs" - CUR_OBS = "obs" - NEXT_OBS = "new_obs" - ACTIONS = "actions" - REWARDS = "rewards" - PREV_ACTIONS = "prev_actions" - PREV_REWARDS = "prev_rewards" - DONES = "dones" - INFOS = "infos" - - # Extra action fetches keys. - ACTION_DIST_INPUTS = "action_dist_inputs" - ACTION_PROB = "action_prob" - ACTION_LOGP = "action_logp" - - # Uniquely identifies an episode. - EPS_ID = "eps_id" - - # Uniquely identifies a sample batch. This is important to distinguish RNN - # sequences from the same episode when multiple sample batches are - # concatenated (fusing sequences across batches can be unsafe). - UNROLL_ID = "unroll_id" - - # Uniquely identifies an agent within an episode. - AGENT_INDEX = "agent_index" - - # Value function predictions emitted by the behaviour policy. - VF_PREDS = "vf_preds" - - @PublicAPI - def __init__(self, *args, **kwargs): - """Constructs a sample batch (same params as dict constructor).""" - - # Possible seq_lens (TxB or BxT) setup. - self.time_major = kwargs.pop("_time_major", None) - self.seq_lens = kwargs.pop("_seq_lens", None) - self.dont_check_lens = kwargs.pop("_dont_check_lens", False) - self.max_seq_len = None - if self.seq_lens is not None and len(self.seq_lens) > 0: - self.max_seq_len = max(self.seq_lens) - - # The actual data, accessible by column name (str). - self.data = dict(*args, **kwargs) - - lengths = [] - for k, v in self.data.copy().items(): - assert isinstance(k, str), self - lengths.append(len(v)) - if isinstance(v, list): - self.data[k] = np.array(v) - if not lengths: - raise ValueError("Empty sample batch") - if not self.dont_check_lens: - assert len(set(lengths)) == 1, \ - "Data columns must be same length, but lens are " \ - "{}".format(lengths) - if self.seq_lens is not None and len(self.seq_lens) > 0: - self.count = sum(self.seq_lens) - else: - self.count = len(next(iter(self.data.values()))) - - # Keeps track of new columns added after initial ones. - self.new_columns = [] - - @PublicAPI - def __len__(self): - """Returns the amount of samples in the sample batch.""" - return self.count - - @staticmethod - @PublicAPI - def concat_samples(samples: List["SampleBatch"]) -> \ - Union["SampleBatch", "MultiAgentBatch"]: - """Concatenates n data dicts or MultiAgentBatches. - - Args: - samples (List[Dict[TensorType]]]): List of dicts of data (numpy). - - Returns: - Union[SampleBatch, MultiAgentBatch]: A new (compressed) - SampleBatch or MultiAgentBatch. - """ - if isinstance(samples[0], MultiAgentBatch): - return MultiAgentBatch.concat_samples(samples) - seq_lens = [] - concat_samples = [] - for s in samples: - if s.count > 0: - concat_samples.append(s) - if s.seq_lens is not None: - seq_lens.extend(s.seq_lens) - - out = {} - for k in concat_samples[0].keys(): - out[k] = concat_aligned( - [s[k] for s in concat_samples], - time_major=concat_samples[0].time_major) - return SampleBatch( - out, - _seq_lens=np.array(seq_lens, dtype=np.int32), - _time_major=concat_samples[0].time_major, - _dont_check_lens=True) - - @PublicAPI - def concat(self, other: "SampleBatch") -> "SampleBatch": - """Returns a new SampleBatch with each data column concatenated. - - Args: - other (SampleBatch): The other SampleBatch object to concat to this - one. - - Returns: - SampleBatch: The new SampleBatch, resulting from concating `other` - to `self`. - - Examples: - >>> b1 = SampleBatch({"a": [1, 2]}) - >>> b2 = SampleBatch({"a": [3, 4, 5]}) - >>> print(b1.concat(b2)) - {"a": [1, 2, 3, 4, 5]} - """ - - if self.keys() != other.keys(): - raise ValueError( - "SampleBatches to concat must have same columns! {} vs {}". - format(list(self.keys()), list(other.keys()))) - out = {} - for k in self.keys(): - out[k] = concat_aligned([self[k], other[k]]) - return SampleBatch(out) - - @PublicAPI - def copy(self) -> "SampleBatch": - """Creates a (deep) copy of this SampleBatch and returns it. - - Returns: - SampleBatch: A (deep) copy of this SampleBatch object. - """ - return SampleBatch( - {k: np.array(v, copy=True) - for (k, v) in self.data.items()}, - _seq_lens=self.seq_lens) - - @PublicAPI - def rows(self) -> Dict[str, TensorType]: - """Returns an iterator over data rows, i.e. dicts with column values. - - Yields: - Dict[str, TensorType]: The column values of the row in this - iteration. - - Examples: - >>> batch = SampleBatch({"a": [1, 2, 3], "b": [4, 5, 6]}) - >>> for row in batch.rows(): - print(row) - {"a": 1, "b": 4} - {"a": 2, "b": 5} - {"a": 3, "b": 6} - """ - - for i in range(self.count): - row = {} - for k in self.keys(): - row[k] = self[k][i] - yield row - - @PublicAPI - def columns(self, keys: List[str]) -> List[any]: - """Returns a list of the batch-data in the specified columns. - - Args: - keys (List[str]): List of column names fo which to return the data. - - Returns: - List[any]: The list of data items ordered by the order of column - names in `keys`. - - Examples: - >>> batch = SampleBatch({"a": [1], "b": [2], "c": [3]}) - >>> print(batch.columns(["a", "b"])) - [[1], [2]] - """ - - out = [] - for k in keys: - out.append(self[k]) - return out - - @PublicAPI - def shuffle(self) -> None: - """Shuffles the rows of this batch in-place.""" - - permutation = np.random.permutation(self.count) - for key, val in self.items(): - self[key] = val[permutation] - - @PublicAPI - def split_by_episode(self) -> List["SampleBatch"]: - """Splits this batch's data by `eps_id`. - - Returns: - List[SampleBatch]: List of batches, one per distinct episode. - """ - - slices = [] - cur_eps_id = self.data["eps_id"][0] - offset = 0 - for i in range(self.count): - next_eps_id = self.data["eps_id"][i] - if next_eps_id != cur_eps_id: - slices.append(self.slice(offset, i)) - offset = i - cur_eps_id = next_eps_id - slices.append(self.slice(offset, self.count)) - for s in slices: - slen = len(set(s["eps_id"])) - assert slen == 1, (s, slen) - assert sum(s.count for s in slices) == self.count, (slices, self.count) - return slices - - @PublicAPI - def slice(self, start: int, end: int) -> "SampleBatch": - """Returns a slice of the row data of this batch (w/o copying). - - Args: - start (int): Starting index. - end (int): Ending index. - - Returns: - SampleBatch: A new SampleBatch, which has a slice of this batch's - data. - """ - if self.seq_lens is not None and len(self.seq_lens) > 0: - data = {k: v[start:end] for k, v in self.data.items()} - # Fix state_in_x data. - count = 0 - state_start = None - seq_lens = None - for i, seq_len in enumerate(self.seq_lens): - count += seq_len - if count >= end: - state_idx = 0 - state_key = "state_in_{}".format(state_idx) - while state_key in self.data: - data[state_key] = self.data[state_key][state_start:i + - 1] - state_idx += 1 - state_key = "state_in_{}".format(state_idx) - seq_lens = list(self.seq_lens[state_start:i]) + [ - seq_len - (count - end) - ] - assert sum(seq_lens) == (end - start) - break - elif state_start is None and count > start: - state_start = i - - return SampleBatch( - data, - _seq_lens=np.array(seq_lens, dtype=np.int32), - _time_major=self.time_major, - _dont_check_lens=True) - else: - return SampleBatch( - {k: v[start:end] - for k, v in self.data.items()}, - _seq_lens=None, - _time_major=self.time_major) - - @PublicAPI - def timeslices(self, k: int) -> List["SampleBatch"]: - """Returns SampleBatches, each one representing a k-slice of this one. - - Will start from timestep 0 and produce slices of size=k. - - Args: - k (int): The size (in timesteps) of each returned SampleBatch. - - Returns: - List[SampleBatch]: The list of (new) SampleBatches (each one of - size k). - """ - out = [] - i = 0 - while i < self.count: - out.append(self.slice(i, i + k)) - i += k - return out - - @PublicAPI - def keys(self) -> Iterable[str]: - """ - Returns: - Iterable[str]: The keys() iterable over `self.data`. - """ - return self.data.keys() - - @PublicAPI - def items(self) -> Iterable[TensorType]: - """ - Returns: - Iterable[TensorType]: The values() iterable over `self.data`. - """ - return self.data.items() - - @PublicAPI - def get(self, key: str) -> Optional[TensorType]: - """Returns one column (by key) from the data or None if key not found. - - Args: - key (str): The key (column name) to return. - - Returns: - Optional[TensorType]: The data under the given key. None if key - not found in data. - """ - return self.data.get(key) - - @PublicAPI - def size_bytes(self) -> int: - """ - Returns: - int: The overall size in bytes of the data buffer (all columns). - """ - return sum(sys.getsizeof(d) for d in self.data.values()) - - @PublicAPI - def __getitem__(self, key: str) -> TensorType: - """Returns one column (by key) from the data. - - Args: - key (str): The key (column name) to return. - - Returns: - TensorType: The data under the given key. - """ - return self.data[key] - - @PublicAPI - def __setitem__(self, key, item) -> None: - """Inserts (overrides) an entire column (by key) in the data buffer. - - Args: - key (str): The column name to set a value for. - item (TensorType): The data to insert. - """ - if key not in self.data: - self.new_columns.append(key) - self.data[key] = item - - @DeveloperAPI - def compress(self, - bulk: bool = False, - columns: Set[str] = frozenset(["obs", "new_obs"])) -> None: - """Compresses the data buffers (by column) in place. - - Args: - bulk (bool): Whether to compress across the batch dimension (0) - as well. If False will compress n separate list items, where n - is the batch size. - columns (Set[str]): The columns to compress. Default: Only - compress the obs and new_obs columns. - """ - for key in columns: - if key in self.data: - if bulk: - self.data[key] = pack(self.data[key]) - else: - self.data[key] = np.array( - [pack(o) for o in self.data[key]]) - - @DeveloperAPI - def decompress_if_needed(self, - columns: Set[str] = frozenset( - ["obs", "new_obs"])) -> "SampleBatch": - """Decompresses data buffers (per column if not compressed) in place. - - Args: - columns (Set[str]): The columns to decompress. Default: Only - decompress the obs and new_obs columns. - - Returns: - SampleBatch: This very SampleBatch. - """ - for key in columns: - if key in self.data: - arr = self.data[key] - if is_compressed(arr): - self.data[key] = unpack(arr) - elif len(arr) > 0 and is_compressed(arr[0]): - self.data[key] = np.array( - [unpack(o) for o in self.data[key]]) - return self - - def __str__(self): - return "SampleBatch({})".format(str(self.data)) - - def __repr__(self): - return "SampleBatch({})".format(str(self.data)) - - def __iter__(self): - return self.data.__iter__() - - def __contains__(self, x): - return x in self.data - - -@PublicAPI -class MultiAgentBatch: - """A batch of experiences from multiple agents in the environment. - - Attributes: - policy_batches (Dict[PolicyID, SampleBatch]): Mapping from policy - ids to SampleBatches of experiences. - count (int): The number of env steps in this batch. - """ - - @PublicAPI - def __init__(self, policy_batches: Dict[PolicyID, SampleBatch], - env_steps: int): - """Initialize a MultiAgentBatch object. - - Args: - policy_batches (Dict[PolicyID, SampleBatch]): Mapping from policy - ids to SampleBatches of experiences. - env_steps (int): The number of environment steps in the environment - this batch contains. This will be less than the number of - transitions this batch contains across all policies in total. - """ - - for v in policy_batches.values(): - assert isinstance(v, SampleBatch) - self.policy_batches = policy_batches - # Called "count" for uniformity with SampleBatch. - # Prefer to access this via the `env_steps()` method when possible - # for clarity. - self.count = env_steps - - @PublicAPI - def env_steps(self) -> int: - """The number of env steps (there are >= 1 agent steps per env step). - - Returns: - int: The number of environment steps contained in this batch. - """ - return self.count - - @PublicAPI - def agent_steps(self) -> int: - """The number of agent steps (there are >= 1 agent steps per env step). - - Returns: - int: The number of agent steps total in this batch. - """ - ct = 0 - for batch in self.policy_batches.values(): - ct += batch.count - return ct - - @PublicAPI - def timeslices(self, k: int) -> List["MultiAgentBatch"]: - """Returns k-step batches holding data for each agent at those steps. - - For examples, suppose we have agent1 observations [a1t1, a1t2, a1t3], - for agent2, [a2t1, a2t3], and for agent3, [a3t3] only. - - Calling timeslices(1) would return three MultiAgentBatches containing - [a1t1, a2t1], [a1t2], and [a1t3, a2t3, a3t3]. - - Calling timeslices(2) would return two MultiAgentBatches containing - [a1t1, a1t2, a2t1], and [a1t3, a2t3, a3t3]. - - This method is used to implement "lockstep" replay mode. Note that this - method does not guarantee each batch contains only data from a single - unroll. Batches might contain data from multiple different envs. - """ - from ray.rllib.evaluation.sample_batch_builder import \ - SampleBatchBuilder - - # Build a sorted set of (eps_id, t, policy_id, data...) - steps = [] - for policy_id, batch in self.policy_batches.items(): - for row in batch.rows(): - steps.append((row[SampleBatch.EPS_ID], row["t"], - row["agent_index"], policy_id, row)) - steps.sort() - - finished_slices = [] - cur_slice = collections.defaultdict(SampleBatchBuilder) - cur_slice_size = 0 - - def finish_slice(): - nonlocal cur_slice_size - assert cur_slice_size > 0 - batch = MultiAgentBatch( - {k: v.build_and_reset() - for k, v in cur_slice.items()}, cur_slice_size) - cur_slice_size = 0 - finished_slices.append(batch) - - # For each unique env timestep. - for _, group in itertools.groupby(steps, lambda x: x[:2]): - # Accumulate into the current slice. - for _, _, _, policy_id, row in group: - cur_slice[policy_id].add_values(**row) - cur_slice_size += 1 - # Slice has reached target number of env steps. - if cur_slice_size >= k: - finish_slice() - assert cur_slice_size == 0 - - if cur_slice_size > 0: - finish_slice() - - assert len(finished_slices) > 0, finished_slices - return finished_slices - - @staticmethod - @PublicAPI - def wrap_as_needed( - policy_batches: Dict[PolicyID, SampleBatch], - env_steps: int) -> Union[SampleBatch, "MultiAgentBatch"]: - """Returns SampleBatch or MultiAgentBatch, depending on given policies. - - Args: - policy_batches (Dict[PolicyID, SampleBatch]): Mapping from policy - ids to SampleBatch. - env_steps (int): Number of env steps in the batch. - - Returns: - Union[SampleBatch, MultiAgentBatch]: The single default policy's - SampleBatch or a MultiAgentBatch (more than one policy). - """ - if len(policy_batches) == 1 and DEFAULT_POLICY_ID in policy_batches: - return policy_batches[DEFAULT_POLICY_ID] - return MultiAgentBatch( - policy_batches=policy_batches, env_steps=env_steps) - - @staticmethod - @PublicAPI - def concat_samples(samples: List["MultiAgentBatch"]) -> "MultiAgentBatch": - """Concatenates a list of MultiAgentBatches into a new MultiAgentBatch. - - Args: - samples (List[MultiAgentBatch]): List of MultiagentBatch objects - to concatenate. - - Returns: - MultiAgentBatch: A new MultiAgentBatch consisting of the - concatenated inputs. - """ - policy_batches = collections.defaultdict(list) - env_steps = 0 - for s in samples: - if not isinstance(s, MultiAgentBatch): - raise ValueError( - "`MultiAgentBatch.concat_samples()` can only concat " - "MultiAgentBatch types, not {}!".format(type(s).__name__)) - for key, batch in s.policy_batches.items(): - policy_batches[key].append(batch) - env_steps += s.env_steps() - out = {} - for key, batches in policy_batches.items(): - out[key] = SampleBatch.concat_samples(batches) - return MultiAgentBatch(out, env_steps) - - @PublicAPI - def copy(self) -> "MultiAgentBatch": - """Deep-copies self into a new MultiAgentBatch. - - Returns: - MultiAgentBatch: The copy of self with deep-copied data. - """ - return MultiAgentBatch( - {k: v.copy() - for (k, v) in self.policy_batches.items()}, self.count) - - @PublicAPI - def size_bytes(self) -> int: - """ - Returns: - int: The overall size in bytes of all policy batches (all columns). - """ - return sum(b.size_bytes() for b in self.policy_batches.values()) - - @DeveloperAPI - def compress(self, - bulk: bool = False, - columns: Set[str] = frozenset(["obs", "new_obs"])) -> None: - """Compresses each policy batch (per column) in place. - - Args: - bulk (bool): Whether to compress across the batch dimension (0) - as well. If False will compress n separate list items, where n - is the batch size. - columns (Set[str]): Set of column names to compress. - """ - for batch in self.policy_batches.values(): - batch.compress(bulk=bulk, columns=columns) - - @DeveloperAPI - def decompress_if_needed(self, - columns: Set[str] = frozenset( - ["obs", "new_obs"])) -> "MultiAgentBatch": - """Decompresses each policy batch (per column), if already compressed. - - Args: - columns (Set[str]): Set of column names to decompress. - - Returns: - MultiAgentBatch: This very MultiAgentBatch. - """ - for batch in self.policy_batches.values(): - batch.decompress_if_needed(columns) - return self - - def __str__(self): - return "MultiAgentBatch({}, env_steps={})".format( - str(self.policy_batches), self.count) - - def __repr__(self): - return "MultiAgentBatch({}, env_steps={})".format( - str(self.policy_batches), self.count) From a2d71f6d41611d82f1b6d9b4d218b2f6f97d1bd0 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 13 Jan 2021 08:50:39 +0100 Subject: [PATCH 05/18] WIP. --- rllib/policy/sample_batch.py | 646 +++++++++++++++++++++++++++++++++++ 1 file changed, 646 insertions(+) create mode 100644 rllib/policy/sample_batch.py diff --git a/rllib/policy/sample_batch.py b/rllib/policy/sample_batch.py new file mode 100644 index 0000000000000..162c62499ad03 --- /dev/null +++ b/rllib/policy/sample_batch.py @@ -0,0 +1,646 @@ +import collections +import numpy as np +import sys +import itertools +from typing import Dict, Iterable, List, Optional, Set, Union + +from ray.rllib.utils.annotations import PublicAPI, DeveloperAPI +from ray.rllib.utils.compression import pack, unpack, is_compressed +from ray.rllib.utils.memory import concat_aligned +from ray.rllib.utils.typing import PolicyID, TensorType + +# Default policy id for single agent environments +DEFAULT_POLICY_ID = "default_policy" + + +@PublicAPI +class SampleBatch: + """Wrapper around a dictionary with string keys and array-like values. + + For example, {"obs": [1, 2, 3], "reward": [0, -1, 1]} is a batch of three + samples, each with an "obs" and "reward" attribute. + """ + + # Outputs from interacting with the environment + OBS = "obs" + CUR_OBS = "obs" + NEXT_OBS = "new_obs" + ACTIONS = "actions" + REWARDS = "rewards" + PREV_ACTIONS = "prev_actions" + PREV_REWARDS = "prev_rewards" + DONES = "dones" + INFOS = "infos" + + # Extra action fetches keys. + ACTION_DIST_INPUTS = "action_dist_inputs" + ACTION_PROB = "action_prob" + ACTION_LOGP = "action_logp" + + # Uniquely identifies an episode. + EPS_ID = "eps_id" + + # Uniquely identifies a sample batch. This is important to distinguish RNN + # sequences from the same episode when multiple sample batches are + # concatenated (fusing sequences across batches can be unsafe). + UNROLL_ID = "unroll_id" + + # Uniquely identifies an agent within an episode. + AGENT_INDEX = "agent_index" + + # Value function predictions emitted by the behaviour policy. + VF_PREDS = "vf_preds" + + @PublicAPI + def __init__(self, *args, **kwargs): + """Constructs a sample batch (same params as dict constructor).""" + + # Possible seq_lens (TxB or BxT) setup. + self.time_major = kwargs.pop("_time_major", None) + self.seq_lens = kwargs.pop("_seq_lens", None) + self.dont_check_lens = kwargs.pop("_dont_check_lens", False) + self.max_seq_len = None + if self.seq_lens is not None and len(self.seq_lens) > 0: + self.max_seq_len = max(self.seq_lens) + + # The actual data, accessible by column name (str). + self.data = dict(*args, **kwargs) + + lengths = [] + for k, v in self.data.copy().items(): + assert isinstance(k, str), self + lengths.append(len(v)) + if isinstance(v, list): + self.data[k] = np.array(v) + if not lengths: + raise ValueError("Empty sample batch") + if not self.dont_check_lens: + assert len(set(lengths)) == 1, \ + "Data columns must be same length, but lens are " \ + "{}".format(lengths) + if self.seq_lens is not None and len(self.seq_lens) > 0: + self.count = sum(self.seq_lens) + else: + self.count = len(next(iter(self.data.values()))) + + # Keeps track of new columns added after initial ones. + self.new_columns = [] + + @PublicAPI + def __len__(self): + """Returns the amount of samples in the sample batch.""" + return self.count + + @staticmethod + @PublicAPI + def concat_samples(samples: List["SampleBatch"]) -> \ + Union["SampleBatch", "MultiAgentBatch"]: + """Concatenates n data dicts or MultiAgentBatches. + + Args: + samples (List[Dict[TensorType]]]): List of dicts of data (numpy). + + Returns: + Union[SampleBatch, MultiAgentBatch]: A new (compressed) + SampleBatch or MultiAgentBatch. + """ + if isinstance(samples[0], MultiAgentBatch): + return MultiAgentBatch.concat_samples(samples) + seq_lens = [] + concat_samples = [] + for s in samples: + if s.count > 0: + concat_samples.append(s) + if s.seq_lens is not None: + seq_lens.extend(s.seq_lens) + + out = {} + for k in concat_samples[0].keys(): + out[k] = concat_aligned( + [s[k] for s in concat_samples], + time_major=concat_samples[0].time_major) + return SampleBatch( + out, + _seq_lens=np.array(seq_lens, dtype=np.int32), + _time_major=concat_samples[0].time_major, + _dont_check_lens=True) + + @PublicAPI + def concat(self, other: "SampleBatch") -> "SampleBatch": + """Returns a new SampleBatch with each data column concatenated. + + Args: + other (SampleBatch): The other SampleBatch object to concat to this + one. + + Returns: + SampleBatch: The new SampleBatch, resulting from concating `other` + to `self`. + + Examples: + >>> b1 = SampleBatch({"a": [1, 2]}) + >>> b2 = SampleBatch({"a": [3, 4, 5]}) + >>> print(b1.concat(b2)) + {"a": [1, 2, 3, 4, 5]} + """ + + if self.keys() != other.keys(): + raise ValueError( + "SampleBatches to concat must have same columns! {} vs {}". + format(list(self.keys()), list(other.keys()))) + out = {} + for k in self.keys(): + out[k] = concat_aligned([self[k], other[k]]) + return SampleBatch(out) + + @PublicAPI + def copy(self) -> "SampleBatch": + """Creates a (deep) copy of this SampleBatch and returns it. + + Returns: + SampleBatch: A (deep) copy of this SampleBatch object. + """ + return SampleBatch( + {k: np.array(v, copy=True) + for (k, v) in self.data.items()}, + _seq_lens=self.seq_lens) + + @PublicAPI + def rows(self) -> Dict[str, TensorType]: + """Returns an iterator over data rows, i.e. dicts with column values. + + Yields: + Dict[str, TensorType]: The column values of the row in this + iteration. + + Examples: + >>> batch = SampleBatch({"a": [1, 2, 3], "b": [4, 5, 6]}) + >>> for row in batch.rows(): + print(row) + {"a": 1, "b": 4} + {"a": 2, "b": 5} + {"a": 3, "b": 6} + """ + + for i in range(self.count): + row = {} + for k in self.keys(): + row[k] = self[k][i] + yield row + + @PublicAPI + def columns(self, keys: List[str]) -> List[any]: + """Returns a list of the batch-data in the specified columns. + + Args: + keys (List[str]): List of column names fo which to return the data. + + Returns: + List[any]: The list of data items ordered by the order of column + names in `keys`. + + Examples: + >>> batch = SampleBatch({"a": [1], "b": [2], "c": [3]}) + >>> print(batch.columns(["a", "b"])) + [[1], [2]] + """ + + out = [] + for k in keys: + out.append(self[k]) + return out + + @PublicAPI + def shuffle(self) -> None: + """Shuffles the rows of this batch in-place.""" + + permutation = np.random.permutation(self.count) + for key, val in self.items(): + self[key] = val[permutation] + + @PublicAPI + def split_by_episode(self) -> List["SampleBatch"]: + """Splits this batch's data by `eps_id`. + + Returns: + List[SampleBatch]: List of batches, one per distinct episode. + """ + + slices = [] + cur_eps_id = self.data["eps_id"][0] + offset = 0 + for i in range(self.count): + next_eps_id = self.data["eps_id"][i] + if next_eps_id != cur_eps_id: + slices.append(self.slice(offset, i)) + offset = i + cur_eps_id = next_eps_id + slices.append(self.slice(offset, self.count)) + for s in slices: + slen = len(set(s["eps_id"])) + assert slen == 1, (s, slen) + assert sum(s.count for s in slices) == self.count, (slices, self.count) + return slices + + @PublicAPI + def slice(self, start: int, end: int) -> "SampleBatch": + """Returns a slice of the row data of this batch (w/o copying). + + Args: + start (int): Starting index. + end (int): Ending index. + + Returns: + SampleBatch: A new SampleBatch, which has a slice of this batch's + data. + """ + if self.seq_lens is not None and len(self.seq_lens) > 0: + data = {k: v[start:end] for k, v in self.data.items()} + # Fix state_in_x data. + count = 0 + state_start = None + seq_lens = None + for i, seq_len in enumerate(self.seq_lens): + count += seq_len + if count >= end: + state_idx = 0 + state_key = "state_in_{}".format(state_idx) + while state_key in self.data: + data[state_key] = self.data[state_key][state_start:i + + 1] + state_idx += 1 + state_key = "state_in_{}".format(state_idx) + seq_lens = list(self.seq_lens[state_start:i]) + [ + seq_len - (count - end) + ] + assert sum(seq_lens) == (end - start) + break + elif state_start is None and count > start: + state_start = i + + return SampleBatch( + data, + _seq_lens=np.array(seq_lens, dtype=np.int32), + _time_major=self.time_major, + _dont_check_lens=True) + else: + return SampleBatch( + {k: v[start:end] + for k, v in self.data.items()}, + _seq_lens=None, + _time_major=self.time_major) + + @PublicAPI + def timeslices(self, k: int) -> List["SampleBatch"]: + """Returns SampleBatches, each one representing a k-slice of this one. + + Will start from timestep 0 and produce slices of size=k. + + Args: + k (int): The size (in timesteps) of each returned SampleBatch. + + Returns: + List[SampleBatch]: The list of (new) SampleBatches (each one of + size k). + """ + out = [] + i = 0 + while i < self.count: + out.append(self.slice(i, i + k)) + i += k + return out + + @PublicAPI + def keys(self) -> Iterable[str]: + """ + Returns: + Iterable[str]: The keys() iterable over `self.data`. + """ + return self.data.keys() + + @PublicAPI + def items(self) -> Iterable[TensorType]: + """ + Returns: + Iterable[TensorType]: The values() iterable over `self.data`. + """ + return self.data.items() + + @PublicAPI + def get(self, key: str) -> Optional[TensorType]: + """Returns one column (by key) from the data or None if key not found. + + Args: + key (str): The key (column name) to return. + + Returns: + Optional[TensorType]: The data under the given key. None if key + not found in data. + """ + return self.data.get(key) + + @PublicAPI + def size_bytes(self) -> int: + """ + Returns: + int: The overall size in bytes of the data buffer (all columns). + """ + return sum(sys.getsizeof(d) for d in self.data.values()) + + @PublicAPI + def __getitem__(self, key: str) -> TensorType: + """Returns one column (by key) from the data. + + Args: + key (str): The key (column name) to return. + + Returns: + TensorType: The data under the given key. + """ + return self.data[key] + + @PublicAPI + def __setitem__(self, key, item) -> None: + """Inserts (overrides) an entire column (by key) in the data buffer. + + Args: + key (str): The column name to set a value for. + item (TensorType): The data to insert. + """ + if key not in self.data: + self.new_columns.append(key) + self.data[key] = item + + @DeveloperAPI + def compress(self, + bulk: bool = False, + columns: Set[str] = frozenset(["obs", "new_obs"])) -> None: + """Compresses the data buffers (by column) in place. + + Args: + bulk (bool): Whether to compress across the batch dimension (0) + as well. If False will compress n separate list items, where n + is the batch size. + columns (Set[str]): The columns to compress. Default: Only + compress the obs and new_obs columns. + """ + for key in columns: + if key in self.data: + if bulk: + self.data[key] = pack(self.data[key]) + else: + self.data[key] = np.array( + [pack(o) for o in self.data[key]]) + + @DeveloperAPI + def decompress_if_needed(self, + columns: Set[str] = frozenset( + ["obs", "new_obs"])) -> "SampleBatch": + """Decompresses data buffers (per column if not compressed) in place. + + Args: + columns (Set[str]): The columns to decompress. Default: Only + decompress the obs and new_obs columns. + + Returns: + SampleBatch: This very SampleBatch. + """ + for key in columns: + if key in self.data: + arr = self.data[key] + if is_compressed(arr): + self.data[key] = unpack(arr) + elif len(arr) > 0 and is_compressed(arr[0]): + self.data[key] = np.array( + [unpack(o) for o in self.data[key]]) + return self + + def __str__(self): + return "SampleBatch({})".format(str(self.data)) + + def __repr__(self): + return "SampleBatch({})".format(str(self.data)) + + def __iter__(self): + return self.data.__iter__() + + def __contains__(self, x): + return x in self.data + + +@PublicAPI +class MultiAgentBatch: + """A batch of experiences from multiple agents in the environment. + + Attributes: + policy_batches (Dict[PolicyID, SampleBatch]): Mapping from policy + ids to SampleBatches of experiences. + count (int): The number of env steps in this batch. + """ + + @PublicAPI + def __init__(self, policy_batches: Dict[PolicyID, SampleBatch], + env_steps: int): + """Initialize a MultiAgentBatch object. + + Args: + policy_batches (Dict[PolicyID, SampleBatch]): Mapping from policy + ids to SampleBatches of experiences. + env_steps (int): The number of environment steps in the environment + this batch contains. This will be less than the number of + transitions this batch contains across all policies in total. + """ + + for v in policy_batches.values(): + assert isinstance(v, SampleBatch) + self.policy_batches = policy_batches + # Called "count" for uniformity with SampleBatch. + # Prefer to access this via the `env_steps()` method when possible + # for clarity. + self.count = env_steps + + @PublicAPI + def env_steps(self) -> int: + """The number of env steps (there are >= 1 agent steps per env step). + + Returns: + int: The number of environment steps contained in this batch. + """ + return self.count + + @PublicAPI + def agent_steps(self) -> int: + """The number of agent steps (there are >= 1 agent steps per env step). + + Returns: + int: The number of agent steps total in this batch. + """ + ct = 0 + for batch in self.policy_batches.values(): + ct += batch.count + return ct + + @PublicAPI + def timeslices(self, k: int) -> List["MultiAgentBatch"]: + """Returns k-step batches holding data for each agent at those steps. + + For examples, suppose we have agent1 observations [a1t1, a1t2, a1t3], + for agent2, [a2t1, a2t3], and for agent3, [a3t3] only. + + Calling timeslices(1) would return three MultiAgentBatches containing + [a1t1, a2t1], [a1t2], and [a1t3, a2t3, a3t3]. + + Calling timeslices(2) would return two MultiAgentBatches containing + [a1t1, a1t2, a2t1], and [a1t3, a2t3, a3t3]. + + This method is used to implement "lockstep" replay mode. Note that this + method does not guarantee each batch contains only data from a single + unroll. Batches might contain data from multiple different envs. + """ + from ray.rllib.evaluation.sample_batch_builder import \ + SampleBatchBuilder + + # Build a sorted set of (eps_id, t, policy_id, data...) + steps = [] + for policy_id, batch in self.policy_batches.items(): + for row in batch.rows(): + steps.append((row[SampleBatch.EPS_ID], row["t"], + row["agent_index"], policy_id, row)) + steps.sort() + + finished_slices = [] + cur_slice = collections.defaultdict(SampleBatchBuilder) + cur_slice_size = 0 + + def finish_slice(): + nonlocal cur_slice_size + assert cur_slice_size > 0 + batch = MultiAgentBatch( + {k: v.build_and_reset() + for k, v in cur_slice.items()}, cur_slice_size) + cur_slice_size = 0 + finished_slices.append(batch) + + # For each unique env timestep. + for _, group in itertools.groupby(steps, lambda x: x[:2]): + # Accumulate into the current slice. + for _, _, _, policy_id, row in group: + cur_slice[policy_id].add_values(**row) + cur_slice_size += 1 + # Slice has reached target number of env steps. + if cur_slice_size >= k: + finish_slice() + assert cur_slice_size == 0 + + if cur_slice_size > 0: + finish_slice() + + assert len(finished_slices) > 0, finished_slices + return finished_slices + + @staticmethod + @PublicAPI + def wrap_as_needed( + policy_batches: Dict[PolicyID, SampleBatch], + env_steps: int) -> Union[SampleBatch, "MultiAgentBatch"]: + """Returns SampleBatch or MultiAgentBatch, depending on given policies. + + Args: + policy_batches (Dict[PolicyID, SampleBatch]): Mapping from policy + ids to SampleBatch. + env_steps (int): Number of env steps in the batch. + + Returns: + Union[SampleBatch, MultiAgentBatch]: The single default policy's + SampleBatch or a MultiAgentBatch (more than one policy). + """ + if len(policy_batches) == 1 and DEFAULT_POLICY_ID in policy_batches: + return policy_batches[DEFAULT_POLICY_ID] + return MultiAgentBatch( + policy_batches=policy_batches, env_steps=env_steps) + + @staticmethod + @PublicAPI + def concat_samples(samples: List["MultiAgentBatch"]) -> "MultiAgentBatch": + """Concatenates a list of MultiAgentBatches into a new MultiAgentBatch. + + Args: + samples (List[MultiAgentBatch]): List of MultiagentBatch objects + to concatenate. + + Returns: + MultiAgentBatch: A new MultiAgentBatch consisting of the + concatenated inputs. + """ + policy_batches = collections.defaultdict(list) + env_steps = 0 + for s in samples: + if not isinstance(s, MultiAgentBatch): + raise ValueError( + "`MultiAgentBatch.concat_samples()` can only concat " + "MultiAgentBatch types, not {}!".format(type(s).__name__)) + for key, batch in s.policy_batches.items(): + policy_batches[key].append(batch) + env_steps += s.env_steps() + out = {} + for key, batches in policy_batches.items(): + out[key] = SampleBatch.concat_samples(batches) + return MultiAgentBatch(out, env_steps) + + @PublicAPI + def copy(self) -> "MultiAgentBatch": + """Deep-copies self into a new MultiAgentBatch. + + Returns: + MultiAgentBatch: The copy of self with deep-copied data. + """ + return MultiAgentBatch( + {k: v.copy() + for (k, v) in self.policy_batches.items()}, self.count) + + @PublicAPI + def size_bytes(self) -> int: + """ + Returns: + int: The overall size in bytes of all policy batches (all columns). + """ + return sum(b.size_bytes() for b in self.policy_batches.values()) + + @DeveloperAPI + def compress(self, + bulk: bool = False, + columns: Set[str] = frozenset(["obs", "new_obs"])) -> None: + """Compresses each policy batch (per column) in place. + + Args: + bulk (bool): Whether to compress across the batch dimension (0) + as well. If False will compress n separate list items, where n + is the batch size. + columns (Set[str]): Set of column names to compress. + """ + for batch in self.policy_batches.values(): + batch.compress(bulk=bulk, columns=columns) + + @DeveloperAPI + def decompress_if_needed(self, + columns: Set[str] = frozenset( + ["obs", "new_obs"])) -> "MultiAgentBatch": + """Decompresses each policy batch (per column), if already compressed. + + Args: + columns (Set[str]): Set of column names to decompress. + + Returns: + MultiAgentBatch: This very MultiAgentBatch. + """ + for batch in self.policy_batches.values(): + batch.decompress_if_needed(columns) + return self + + def __str__(self): + return "MultiAgentBatch({}, env_steps={})".format( + str(self.policy_batches), self.count) + + def __repr__(self): + return "MultiAgentBatch({}, env_steps={})".format( + str(self.policy_batches), self.count) From 670b8f3a3b31b8400a32a6141ec5dd8ec97f3c53 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 10 Feb 2021 22:36:33 +0100 Subject: [PATCH 06/18] wip. --- rllib/agents/dqn/dqn.py | 21 +++++++---------- rllib/agents/dqn/dqn_tf_policy.py | 5 +--- rllib/agents/dqn/tests/test_dqn.py | 10 ++++---- rllib/agents/ppo/ppo.py | 13 ++++------- rllib/execution/train_ops.py | 37 ++++++++---------------------- rllib/policy/dynamic_tf_policy.py | 5 +++- 6 files changed, 31 insertions(+), 60 deletions(-) diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index 7a68f5cb394e7..d416182acfd21 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -24,7 +24,7 @@ from ray.rllib.execution.rollout_ops import ParallelRollouts from ray.rllib.execution.train_ops import TrainOneStep, UpdateTargetNetwork, \ TrainTFMultiGPU -from ray.rllib.policy.policy import LEARNER_STATS_KEY, Policy +from ray.rllib.policy.policy import Policy from ray.rllib.utils.typing import TrainerConfigDict from ray.util.iter import LocalIterator @@ -233,9 +233,7 @@ def update_prio(item): if config.get("prioritized_replay"): prio_dict = {} for policy_id, info in info_dict.items(): - #TODO: check, whether correct now and resolve below disambiguation td_error = info.get("td_error") - #info[LEARNER_STATS_KEY].get("td_error")) prio_dict[policy_id] = (samples.policy_batches[policy_id] .data.get("batch_indexes"), td_error) local_replay_buffer.update_priorities(prio_dict) @@ -250,16 +248,13 @@ def update_prio(item): train_op = TrainOneStep(workers) else: train_op = TrainTFMultiGPU( - workers, - sgd_minibatch_size=config["train_batch_size"], - num_sgd_iter=1, - num_gpus=config["num_gpus"], - rollout_fragment_length="doesnt_matter", # :) config["rollout_fragment_length"], - num_envs_per_worker="doesnt_matter", # :) config["num_envs_per_worker"], - train_batch_size="doesnt_matter", # :) - shuffle_sequences=True,#DQN: always shuffle, no sequences. - _fake_gpus=config.get("_fake_gpus", False), - framework=config.get("framework")) + workers=workers, + sgd_minibatch_size=config["train_batch_size"], + num_sgd_iter=1, + num_gpus=config["num_gpus"], + shuffle_sequences=True, #DQN: always shuffle, no sequences. + _fake_gpus=config.get("_fake_gpus", False), + framework=config.get("framework")) replay_op = Replay(local_buffer=local_replay_buffer) \ diff --git a/rllib/agents/dqn/dqn_tf_policy.py b/rllib/agents/dqn/dqn_tf_policy.py index 5d6bd0d316d7e..6bfcdd2da0b82 100644 --- a/rllib/agents/dqn/dqn_tf_policy.py +++ b/rllib/agents/dqn/dqn_tf_policy.py @@ -238,10 +238,7 @@ def build_q_losses(policy: Policy, model, _, config = policy.config # q network evaluation q_t, q_logits_t, q_dist_t = compute_q_values( - policy, - model, - train_batch[SampleBatch.CUR_OBS], - explore=False) + policy, model, train_batch[SampleBatch.CUR_OBS], explore=False) # target q network evalution q_tp1, q_logits_tp1, q_dist_tp1 = compute_q_values( diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py index f23d78a82da33..229b96a16170e 100644 --- a/rllib/agents/dqn/tests/test_dqn.py +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -11,7 +11,7 @@ class TestDQN(unittest.TestCase): @classmethod def setUpClass(cls) -> None: - ray.init(local_mode=True)#TODO + ray.init(local_mode=True) #TODO @classmethod def tearDownClass(cls) -> None: @@ -58,11 +58,9 @@ def test_dqn_fake_multi_gpu_learning(self): config["num_gpus"] = 2 config["_fake_gpus"] = True config["framework"] = "tf" - # Mimick tuned_example for PPO CartPole. - config["num_workers"] = 1 - config["lr"] = 0.0003 - #config["observation_filter"] = "MeanStdFilter" - config["model"]["fcnet_hiddens"] = [32] + # Mimick tuned_example for DQN CartPole. + config["n_step"] = 3 + config["model"]["fcnet_hiddens"] = [64] config["model"]["fcnet_activation"] = "linear" trainer = dqn.DQNTrainer(config=config, env="CartPole-v0") diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index e7f55ea8f7a6d..aa18f05434923 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -188,7 +188,7 @@ def update(pi, pi_id): ("{} should be nested under policy id key".format( LEARNER_STATS_KEY), fetches) if pi_id in fetches: - kl = fetches[pi_id].get(LEARNER_STATS_KEY, {}).get("kl") + kl = fetches[pi_id].get("kl") assert kl is not None, (fetches, pi_id) # Make the actual `Policy.update_kl()` call. pi.update_kl(kl) @@ -207,9 +207,9 @@ def warn_about_bad_reward_scales(config, result): # Warn about excessively high VF loss. learner_stats = result["info"]["learner"] if DEFAULT_POLICY_ID in learner_stats: - scaled_vf_loss = (config["vf_loss_coeff"] * - learner_stats[DEFAULT_POLICY_ID]["vf_loss"]) - policy_loss = learner_stats[DEFAULT_POLICY_ID]["policy_loss"] + scaled_vf_loss = config["vf_loss_coeff"] * \ + learner_stats[DEFAULT_POLICY_ID][LEARNER_STATS_KEY]["vf_loss"] + policy_loss = learner_stats[DEFAULT_POLICY_ID][LEARNER_STATS_KEY]["policy_loss"] if config.get("model", {}).get("vf_share_layers") and \ scaled_vf_loss > 100: logger.warning( @@ -274,13 +274,10 @@ def execution_plan(workers: WorkerSet, else: train_op = rollouts.for_each( TrainTFMultiGPU( - workers, + workers=workers, sgd_minibatch_size=config["sgd_minibatch_size"], num_sgd_iter=config["num_sgd_iter"], num_gpus=config["num_gpus"], - rollout_fragment_length=config["rollout_fragment_length"], - num_envs_per_worker=config["num_envs_per_worker"], - train_batch_size=config["train_batch_size"], shuffle_sequences=config["shuffle_sequences"], _fake_gpus=config["_fake_gpus"], framework=config.get("framework"))) diff --git a/rllib/execution/train_ops.py b/rllib/execution/train_ops.py index bf4c460c879f2..934003e39d810 100644 --- a/rllib/execution/train_ops.py +++ b/rllib/execution/train_ops.py @@ -107,13 +107,11 @@ class TrainTFMultiGPU: """ def __init__(self, + *, workers: WorkerSet, sgd_minibatch_size: int, num_sgd_iter: int, num_gpus: int, - rollout_fragment_length: int, - num_envs_per_worker: int, - train_batch_size: int, shuffle_sequences: bool, policies: List[PolicyID] = frozenset([]), _fake_gpus: bool = False, @@ -208,37 +206,20 @@ def __call__(self, 1, int(tuples_per_device) // int(self.per_device_batch_size)) logger.debug("== sgd epochs for {} ==".format(policy_id)) - for i in range(self.num_sgd_iter): - iter_extra_fetches = defaultdict(list) + for _ in range(self.num_sgd_iter): permutation = np.random.permutation(num_batches) + batch_fetches_all_towers = [] for batch_index in range(num_batches): batch_fetches = optimizer.optimize( self.sess, permutation[batch_index] * self.per_device_batch_size) - #def mapping_fn(*s): - # s - - iter_extra_fetches = tree.map_structure( - lambda *s: np.array(s), - *(batch_fetches["tower_{}".format(i)] for i in range(len(self.devices)))) - - #for k, v in batch_fetches.items(): - # if k == LEARNER_STATS_KEY: - # for k, v in batch_fetches[LEARNER_STATS_KEY].items(): - # #TODO: multi-GPU optimizer here does not collect td_error (which is stored outside LEARNER_STATS_KEY) - # #that's why it doesn't show up in the returned fetches. - # iter_extra_fetches[k].append(v) - # elif k == "train_op": - # continue - # else: - # iter_extra_fetches[k].append(v) - if logger.getEffectiveLevel() <= logging.DEBUG: - avg = tree.map_structure_with_path(lambda p, s: np.nanmean(s, axis=0) if p[0] != "td_error" else np.concatenate(s, axis=0), iter_extra_fetches) - logger.debug("{} {}".format(i, avg)) - fetches[policy_id] = tree.map_structure_with_path(lambda p, s: np.nanmean(s, axis=0) if p[0] != "td_error" else np.concatenate(s, axis=0), iter_extra_fetches) - #fetches[policy_id] = tree.map_structure(lambda s: np.nanmean(s, axis=0), iter_extra_fetches)#averaged(iter_extra_fetches, axis=0) - #fetches[policy_id] = tree.unflatten_as() + batch_fetches_all_towers.append(tree.map_structure( + lambda *s: np.nanmean(s), + *(batch_fetches["tower_{}".format(tower_num)] + for tower_num in range(len(self.devices))))) + + fetches[policy_id] = tree.map_structure(lambda *s: np.nanmean(s), *batch_fetches_all_towers) load_timer.push_units_processed(samples.count) learn_timer.push_units_processed(samples.count) diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index 9614d8fda3d52..ab623319fd9b3 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -370,7 +370,10 @@ def copy(self, self.action_space, self.config, existing_inputs=input_dict, - existing_model=[self.model, ("target_q_model", getattr(self, "target_q_model", None))]) + existing_model=[ + self.model, ("target_q_model", + getattr(self, "target_q_model", None)) + ]) instance._loss_input_dict = input_dict loss = instance._do_loss_init(input_dict) From 81013b18a68b44530be1e0e3e435e292f9d898ba Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 23 Feb 2021 14:50:00 +0100 Subject: [PATCH 07/18] wip. --- rllib/agents/ddpg/ddpg_tf_policy.py | 2 ++ rllib/agents/dqn/dqn.py | 24 ++++++----------- rllib/agents/dqn/tests/test_dqn.py | 14 ++++++---- rllib/agents/pg/tests/test_pg.py | 32 ++++++++++++++++++++++- rllib/agents/ppo/ppo.py | 25 ++++++++---------- rllib/agents/ppo/tests/test_ppo.py | 2 +- rllib/agents/sac/tests/test_sac.py | 2 +- rllib/agents/trainer.py | 36 ++++++++++++++++++++++++-- rllib/agents/trainer_template.py | 18 +++++++++++-- rllib/examples/multi_agent_cartpole.py | 4 +-- rllib/examples/sumo_env_local.py | 2 +- rllib/execution/train_ops.py | 26 ++++++++++++++----- rllib/policy/dynamic_tf_policy.py | 7 ++--- rllib/tests/test_lstm.py | 4 +-- 14 files changed, 142 insertions(+), 56 deletions(-) diff --git a/rllib/agents/ddpg/ddpg_tf_policy.py b/rllib/agents/ddpg/ddpg_tf_policy.py index 203add618ce62..d6b90e5bf3700 100644 --- a/rllib/agents/ddpg/ddpg_tf_policy.py +++ b/rllib/agents/ddpg/ddpg_tf_policy.py @@ -123,6 +123,8 @@ def ddpg_actor_critic_loss(policy, model, _, train_batch): model_out_tp1, _ = model(input_dict_next, [], None) target_model_out_tp1, _ = policy.target_model(input_dict_next, [], None) + policy.target_q_func_vars = policy.target_model.variables() + # Policy network evaluation. policy_t = model.get_policy_output(model_out_t) policy_tp1 = \ diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index d416182acfd21..00cb19e038ce2 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -133,13 +133,6 @@ "worker_side_prioritization": False, # Prevent iterations from going lower than this time span "min_iter_time_s": 1, - - # TODO: Experimental. - "simple_optimizer": False, - # Whether to fake GPUs (using CPUs). - # Set this to True for debugging on non-GPU machines (set `num_gpus` > 0). - "_fake_gpus": False, - }) # __sphinx_doc_end__ # yapf: enable @@ -175,15 +168,15 @@ def validate_config(config: TrainerConfigDict) -> None: "replay_sequence_length > 1.") # Multi-gpu not supported for PyTorch and tf-eager. - if config["framework"] in ["tf2", "tfe", "torch"]: - config["simple_optimizer"] = True + #if config["framework"] in ["tf2", "tfe", "torch"]: + # config["simple_optimizer"] = True # Performance warning, if "simple" optimizer used with (static-graph) tf. - elif config["simple_optimizer"]: - logger.warning( - "Using the simple minibatch optimizer. This will significantly " - "reduce performance, consider simple_optimizer=False.") + #if config["simple_optimizer"]: + # logger.warning( + # "Using the simple minibatch optimizer. This will significantly " + # "reduce performance, consider simple_optimizer=False.") # Multi-agent mode and multi-GPU optimizer. - elif config["multiagent"]["policies"] and not config["simple_optimizer"]: + if config["multiagent"]["policies"] and not config["simple_optimizer"]: logger.info( "In multi-agent mode, policies will be optimized sequentially " "by the multi-GPU optimizer. Consider setting " @@ -253,10 +246,9 @@ def update_prio(item): num_sgd_iter=1, num_gpus=config["num_gpus"], shuffle_sequences=True, #DQN: always shuffle, no sequences. - _fake_gpus=config.get("_fake_gpus", False), + _fake_gpus=config["_fake_gpus"], framework=config.get("framework")) - replay_op = Replay(local_buffer=local_replay_buffer) \ .for_each(lambda x: post_fn(x, workers, config)) \ .for_each(train_op) \ diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py index 229b96a16170e..012c9b3f9c5ab 100644 --- a/rllib/agents/dqn/tests/test_dqn.py +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -11,7 +11,7 @@ class TestDQN(unittest.TestCase): @classmethod def setUpClass(cls) -> None: - ray.init(local_mode=True) #TODO + ray.init(local_mode=True)#TODO @classmethod def tearDownClass(cls) -> None: @@ -52,13 +52,17 @@ def test_dqn_compilation(self): trainer.stop() def test_dqn_fake_multi_gpu_learning(self): - """Test whether PPOTrainer can learn CartPole w/ faked multi-GPU.""" + """Test whether DQNTrainer can learn CartPole w/ faked multi-GPU.""" config = copy.deepcopy(dqn.DEFAULT_CONFIG) + # Fake GPU setup. config["num_gpus"] = 2 config["_fake_gpus"] = True + config["framework"] = "tf" - # Mimick tuned_example for DQN CartPole. + # Double batch size (2 GPUs). + config["train_batch_size"] = 64 + # Mimic tuned_example for DQN CartPole. config["n_step"] = 3 config["model"]["fcnet_hiddens"] = [64] config["model"]["fcnet_activation"] = "linear" @@ -68,8 +72,8 @@ def test_dqn_fake_multi_gpu_learning(self): learnt = False for i in range(num_iterations): results = trainer.train() - print(results) - if results["episode_reward_mean"] > 150: + print("reward={}".format(results["episode_reward_mean"])) + if results["episode_reward_mean"] > 100.0: learnt = True break assert learnt, "DQN multi-GPU (with fake-GPUs) did not learn CartPole!" diff --git a/rllib/agents/pg/tests/test_pg.py b/rllib/agents/pg/tests/test_pg.py index ec77ea79a69b7..538818ebd5bf6 100644 --- a/rllib/agents/pg/tests/test_pg.py +++ b/rllib/agents/pg/tests/test_pg.py @@ -1,3 +1,4 @@ +import copy import numpy as np import unittest @@ -24,13 +25,42 @@ def test_pg_compilation(self): config["num_workers"] = 0 num_iterations = 2 - for _ in framework_iterator(config): + for fw in framework_iterator(config): + # For tf, build with fake-GPUs. + config["_fake_gpus"] = fw == "tf" + config["num_gpus"] = 2 if fw == "tf" else 0 trainer = pg.PGTrainer(config=config, env="CartPole-v0") for i in range(num_iterations): print(trainer.train()) check_compute_single_action( trainer, include_prev_action_reward=True) + def test_pg_fake_multi_gpu_learning(self): + """Test whether PGTrainer can learn CartPole w/ faked multi-GPU.""" + config = copy.deepcopy(pg.DEFAULT_CONFIG) + + # Fake GPU setup. + config["num_gpus"] = 2 + config["_fake_gpus"] = True + + config["framework"] = "tf" + # Mimic tuned_example for PG CartPole. + config["model"]["fcnet_hiddens"] = [64] + config["model"]["fcnet_activation"] = "linear" + + trainer = pg.PGTrainer(config=config, env="CartPole-v0") + num_iterations = 200 + learnt = False + for i in range(num_iterations): + results = trainer.train() + print("reward={}".format(results["episode_reward_mean"])) + # Make this test quite short (75.0). + if results["episode_reward_mean"] > 75.0: + learnt = True + break + assert learnt, "PG multi-GPU (with fake-GPUs) did not learn CartPole!" + trainer.stop() + def test_pg_loss_functions(self): """Tests the PG loss function math.""" config = pg.DEFAULT_CONFIG.copy() diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index aa18f05434923..aba48c8a54e3f 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -86,13 +86,6 @@ "batch_mode": "truncate_episodes", # Which observation filter to apply to the observation. "observation_filter": "NoFilter", - # Uses the sync samples optimizer instead of the multi-gpu one. This is - # usually slower, but you might want to try it if you run into issues with - # the default optimizer. - "simple_optimizer": False, - # Whether to fake GPUs (using CPUs). - # Set this to True for debugging on non-GPU machines (set `num_gpus` > 0). - "_fake_gpus": False, # Deprecated keys: # Share layers for value function. If you set this to True, it's important @@ -140,15 +133,19 @@ def validate_config(config: TrainerConfigDict) -> None: "trajectory). Consider setting batch_mode=complete_episodes.") # Multi-gpu not supported for PyTorch and tf-eager. - if config["framework"] in ["tf2", "tfe", "torch"]: - config["simple_optimizer"] = True + #if config["framework"] in ["tf2", "tfe", "torch"]: + # if config["num_gpus"] > 1: + # raise ValueError( + # "`num_gpus` > 1 not supported by PPO-{} yet!".format( + # config["framework"])) + # config["simple_optimizer"] = True # Performance warning, if "simple" optimizer used with (static-graph) tf. - elif config["simple_optimizer"]: - logger.warning( - "Using the simple minibatch optimizer. This will significantly " - "reduce performance, consider simple_optimizer=False.") + #if config["simple_optimizer"]: + # logger.warning( + # "Using the simple minibatch optimizer. This will significantly " + # "reduce performance! Consider `simple_optimizer=False`.") # Multi-agent mode and multi-GPU optimizer. - elif config["multiagent"]["policies"] and not config["simple_optimizer"]: + if config["multiagent"]["policies"] and not config["simple_optimizer"]: logger.info( "In multi-agent mode, policies will be optimized sequentially " "by the multi-GPU optimizer. Consider setting " diff --git a/rllib/agents/ppo/tests/test_ppo.py b/rllib/agents/ppo/tests/test_ppo.py index a9addefd192ca..732c149cca743 100644 --- a/rllib/agents/ppo/tests/test_ppo.py +++ b/rllib/agents/ppo/tests/test_ppo.py @@ -111,7 +111,7 @@ def test_ppo_fake_multi_gpu_learning(self): config["num_gpus"] = 2 config["_fake_gpus"] = True config["framework"] = "tf" - # Mimick tuned_example for PPO CartPole. + # Mimic tuned_example for PPO CartPole. config["num_workers"] = 1 config["lr"] = 0.0003 config["observation_filter"] = "MeanStdFilter" diff --git a/rllib/agents/sac/tests/test_sac.py b/rllib/agents/sac/tests/test_sac.py index b32beaac13fd6..a6086634b980c 100644 --- a/rllib/agents/sac/tests/test_sac.py +++ b/rllib/agents/sac/tests/test_sac.py @@ -56,7 +56,7 @@ def step(self, action): class TestSAC(unittest.TestCase): @classmethod def setUpClass(cls) -> None: - ray.init(local_mode=True) + ray.init() @classmethod def tearDownClass(cls) -> None: diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index b2c57d0b13116..7e9145a955fce 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -93,9 +93,15 @@ # === Settings for the Trainer process === # Number of GPUs to allocate to the trainer process. Note that not all - # algorithms can take advantage of trainer GPUs. This can be fractional - # (e.g., 0.3 GPUs). + # algorithms can take advantage of trainer GPUs. Support for multi-GPU + # is currently only available for tf-[PPO/IMPALA/DQN/PG]. + # This can be fractional (e.g., 0.3 GPUs). "num_gpus": 0, + # Set to True for debugging (multi-)?GPU funcitonality on a CPU machine. + # GPU towers will be simulated by graphs located on CPUs in this case. + # Use `num_gpus` to test for different numbers of fake GPUs. + "_fake_gpus": False, + # Training batch size, if applicable. Should be >= rollout_fragment_length. # Samples batches will be concatenated together to a batch of this size, # which is then passed to SGD. @@ -398,6 +404,13 @@ # The number of contiguous environment steps to replay at once. This may # be set to greater than 1 to support recurrent models. "replay_sequence_length": 1, + + # Deprecated values. + # Uses the sync samples optimizer instead of the multi-gpu one. This is + # usually slower, but you might want to try it if you run into issues with + # the default optimizer. + # This will be set automatically from now on. + "simple_optimizer": DEPRECATED_VALUE, } # __sphinx_doc_end__ # yapf: enable @@ -1097,6 +1110,23 @@ def _validate_config(config: PartialTrainerConfigDict): if model_config is None: config["model"] = model_config = {} + # Multi-GPU settings. + simple_optim_setting = config.get("simple_optimizer", DEPRECATED_VALUE) + if simple_optim_setting != DEPRECATED_VALUE: + deprecation_warning("simple_optimizer", error=False) + + if config.get("num_gpus", 0) > 1: + if config.get("framework") in ["tfe", "tf2", "torch"]: + raise ValueError("`num_gpus` > 1 not supported yet for " + "framework={}!".format(config.get("framework"))) + elif simple_optim_setting is True: + raise ValueError("Cannot use `simple_optimizer` if `num_gpus` > 1! " + "Consider `simple_optimizer=False`.") + config["simple_optimizer"] = False + elif simple_optim_setting == DEPRECATED_VALUE: + config["simple_optimizer"] = True + + # Trajectory View API settings. if not config.get("_use_trajectory_view_api"): traj_view_framestacks = model_config.get("num_framestacks", "auto") if model_config.get("_time_major"): @@ -1107,6 +1137,7 @@ def _validate_config(config: PartialTrainerConfigDict): "iff `_use_trajectory_view_api` is True!") model_config["num_framestacks"] = 0 + # Offline RL settings. if isinstance(config["input_evaluation"], tuple): config["input_evaluation"] = list(config["input_evaluation"]) elif not isinstance(config["input_evaluation"], list): @@ -1133,6 +1164,7 @@ def _validate_config(config: PartialTrainerConfigDict): "complete_episodes]! Got {}".format( config["batch_mode"])) + # Check multi-agent batch count mode. if config["multiagent"].get("count_steps_by", "env_steps") not in \ ["env_steps", "agent_steps"]: raise ValueError( diff --git a/rllib/agents/trainer_template.py b/rllib/agents/trainer_template.py index 600cbef12bd9d..111ed02e5c67a 100644 --- a/rllib/agents/trainer_template.py +++ b/rllib/agents/trainer_template.py @@ -5,7 +5,7 @@ from ray.rllib.env.env_context import EnvContext from ray.rllib.evaluation.worker_set import WorkerSet from ray.rllib.execution.rollout_ops import ParallelRollouts, ConcatBatches -from ray.rllib.execution.train_ops import TrainOneStep +from ray.rllib.execution.train_ops import TrainOneStep, TrainTFMultiGPU from ray.rllib.execution.metric_ops import StandardMetricsReporting from ray.rllib.policy import Policy from ray.rllib.utils import add_mixins @@ -26,7 +26,21 @@ def default_execution_plan(workers: WorkerSet, config: TrainerConfigDict): ConcatBatches( min_batch_size=config["train_batch_size"], count_steps_by=config["multiagent"]["count_steps_by"], - )).for_each(TrainOneStep(workers)) + )) + + if config.get("simple_optimizer") is True: + train_op = train_op.for_each(TrainOneStep(workers)) + else: + train_op = train_op.for_each( + TrainTFMultiGPU( + workers=workers, + sgd_minibatch_size=config.get( + "sgd_minibatch_size", config["train_batch_size"]), + num_sgd_iter=config.get("num_sgd_iter", 1), + num_gpus=config["num_gpus"], + shuffle_sequences=config.get("shuffle_sequences", False), + _fake_gpus=config["_fake_gpus"], + framework=config["framework"])) # Add on the standard episode reward, etc. metrics reporting. This returns # a LocalIterator[metrics_dict] representing metrics for each train step. diff --git a/rllib/examples/multi_agent_cartpole.py b/rllib/examples/multi_agent_cartpole.py index d878a6515b2be..8e34eb0ca8c1c 100644 --- a/rllib/examples/multi_agent_cartpole.py +++ b/rllib/examples/multi_agent_cartpole.py @@ -33,7 +33,7 @@ parser.add_argument("--stop-iters", type=int, default=200) parser.add_argument("--stop-reward", type=float, default=150) parser.add_argument("--stop-timesteps", type=int, default=100000) -parser.add_argument("--simple", action="store_true") +#parser.add_argument("--simple", action="store_true") parser.add_argument("--num-cpus", type=int, default=0) parser.add_argument("--as-test", action="store_true") parser.add_argument( @@ -82,7 +82,7 @@ def gen_policy(i): "env_config": { "num_agents": args.num_agents, }, - "simple_optimizer": args.simple, + #"simple_optimizer": args.simple, # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0. "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")), "num_sgd_iter": 10, diff --git a/rllib/examples/sumo_env_local.py b/rllib/examples/sumo_env_local.py index baa96e4525e34..9ac02a9523102 100644 --- a/rllib/examples/sumo_env_local.py +++ b/rllib/examples/sumo_env_local.py @@ -73,7 +73,7 @@ config["num_workers"] = args.num_workers config["rollout_fragment_length"] = 200 config["sgd_minibatch_size"] = 256 - config["simple_optimizer"] = True + #config["simple_optimizer"] = True config["train_batch_size"] = 4000 config["batch_mode"] = "complete_episodes" diff --git a/rllib/execution/train_ops.py b/rllib/execution/train_ops.py index d5f0f2ba2e3b3..564d3448a9559 100644 --- a/rllib/execution/train_ops.py +++ b/rllib/execution/train_ops.py @@ -123,7 +123,7 @@ def __init__(self, self.shuffle_sequences = shuffle_sequences self.framework = framework - # Collect actual devices to use. + # Collect actual GPU devices to use. if not num_gpus: _fake_gpus = True num_gpus = 1 @@ -132,10 +132,12 @@ def __init__(self, "/{}:{}".format(type_, i) for i in range(int(math.ceil(num_gpus))) ] + # Total batch size (all towers). Make sure it is dividable by num towers. self.batch_size = int(sgd_minibatch_size / len(self.devices)) * len( self.devices) assert self.batch_size % len(self.devices) == 0 assert self.batch_size >= len(self.devices), "batch size too small" + # Batch size per tower. self.per_device_batch_size = int(self.batch_size / len(self.devices)) # per-GPU graph copies created below must share vars with the policy @@ -176,8 +178,8 @@ def __call__(self, metrics = _get_shared_metrics() load_timer = metrics.timers[LOAD_BATCH_TIMER] learn_timer = metrics.timers[LEARN_ON_BATCH_TIMER] + # Load data into GPUs. with load_timer: - # (1) Load data into GPUs. num_loaded_tuples = {} for policy_id, batch in samples.policy_batches.items(): # Not a policy-to-train. @@ -201,8 +203,8 @@ def __call__(self, self.sess, [tuples[k] for k in data_keys], [tuples[k] for k in state_keys])) + # Execute minibatch SGD on loaded data. with learn_timer: - # (2) Execute minibatch SGD on loaded data. fetches = {} for policy_id, tuples_per_device in num_loaded_tuples.items(): optimizer = self.optimizers[policy_id] @@ -218,12 +220,14 @@ def __call__(self, self.sess, permutation[batch_index] * self.per_device_batch_size) - batch_fetches_all_towers.append(tree.map_structure( - lambda *s: np.nanmean(s), + batch_fetches_all_towers.append(tree.map_structure_with_path( + lambda path, *s: self._all_tower_reduce(path, *s), *(batch_fetches["tower_{}".format(tower_num)] for tower_num in range(len(self.devices))))) - fetches[policy_id] = tree.map_structure(lambda *s: np.nanmean(s), *batch_fetches_all_towers) + # Reduce mean across all minibatch SGD steps (axis=0 to keep + # all shapes as-is). + fetches[policy_id] = tree.map_structure(lambda *s: np.nanmean(s, axis=0), *batch_fetches_all_towers) load_timer.push_units_processed(samples.count) learn_timer.push_units_processed(samples.count) @@ -240,6 +244,16 @@ def __call__(self, self.workers.local_worker().set_global_vars(_get_global_vars()) return samples, fetches + def _all_tower_reduce(self, path, *tower_data): + """Reduces stats across towers based on their stats-dict paths.""" + if len(path) == 1 and path[0] == "td_error": + return np.concatenate(tower_data, axis=0) + elif path[-1].startswith("min_"): + return np.nanmin(tower_data) + elif path[-1].startswith("max_"): + return np.nanmax(tower_data) + return np.nanmean(tower_data) + class ComputeGradients: """Callable that computes gradients with respect to the policy loss. diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index 34e2557b33e6a..9dac98932e0d2 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -162,7 +162,7 @@ def __init__( if existing_model: if isinstance(existing_model, list): self.model = existing_model[0] - #TODO: (sven) total hack, but works for `target_q_model` + #TODO: (sven) total hack, but works for `target_q_model/target_model` for i in range(1, len(existing_model)): setattr(self, existing_model[i][0], existing_model[i][1]) elif make_model: @@ -371,8 +371,9 @@ def copy(self, self.config, existing_inputs=input_dict, existing_model=[ - self.model, ("target_q_model", - getattr(self, "target_q_model", None)) + self.model, + ("target_q_model", getattr(self, "target_q_model", None)), + ("target_model", getattr(self, "target_model", None)), ]) instance._loss_input_dict = input_dict diff --git a/rllib/tests/test_lstm.py b/rllib/tests/test_lstm.py index cd13157d17230..5463b6f7c878d 100644 --- a/rllib/tests/test_lstm.py +++ b/rllib/tests/test_lstm.py @@ -120,7 +120,7 @@ def test_simple_optimizer_sequencing(self): "rollout_fragment_length": 10, "train_batch_size": 10, "sgd_minibatch_size": 10, - "simple_optimizer": True, + #"simple_optimizer": True, "num_sgd_iter": 1, "model": { "custom_model": "rnn", @@ -178,7 +178,7 @@ def test_minibatch_sequencing(self): "rollout_fragment_length": 20, "train_batch_size": 20, "sgd_minibatch_size": 10, - "simple_optimizer": False, + #"simple_optimizer": False, "num_sgd_iter": 1, "model": { "custom_model": "rnn", From b84ebd938179a572acd90f796c413e769f1b631b Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 23 Feb 2021 15:03:52 +0100 Subject: [PATCH 08/18] wip and LINT. --- rllib/agents/a3c/a2c.py | 16 ++++++++++++++-- rllib/agents/a3c/tests/test_a2c.py | 25 +++++++++++++++++++++++++ rllib/agents/dqn/dqn.py | 16 ++++------------ rllib/agents/dqn/simple_q.py | 17 +++++++++++++++-- rllib/agents/dqn/tests/test_dqn.py | 2 +- rllib/agents/dqn/tests/test_simple_q.py | 24 ++++++++++++++++++++++++ rllib/agents/ppo/ppo.py | 15 ++------------- rllib/agents/trainer.py | 8 +++++--- rllib/agents/trainer_template.py | 4 ++-- rllib/examples/multi_agent_cartpole.py | 2 -- rllib/examples/sumo_env_local.py | 1 - rllib/execution/train_ops.py | 19 +++++++++++-------- rllib/policy/dynamic_tf_policy.py | 2 +- rllib/tests/test_lstm.py | 2 -- 14 files changed, 104 insertions(+), 49 deletions(-) diff --git a/rllib/agents/a3c/a2c.py b/rllib/agents/a3c/a2c.py index e6ea0c356f9da..bbb40e337a2be 100644 --- a/rllib/agents/a3c/a2c.py +++ b/rllib/agents/a3c/a2c.py @@ -7,7 +7,7 @@ from ray.rllib.execution.metric_ops import StandardMetricsReporting from ray.rllib.execution.rollout_ops import ParallelRollouts, ConcatBatches from ray.rllib.execution.train_ops import ComputeGradients, AverageGradients, \ - ApplyGradients, TrainOneStep + ApplyGradients, TrainTFMultiGPU, TrainOneStep from ray.rllib.utils import merge_dicts A2C_DEFAULT_CONFIG = merge_dicts( @@ -47,11 +47,23 @@ def execution_plan(workers, config): .for_each(ApplyGradients(workers))) else: # In normal mode, we execute one SGD step per each train batch. + if config["simple_optimizer"]: + train_step_op = TrainOneStep(workers) + else: + train_step_op = TrainTFMultiGPU( + workers=workers, + sgd_minibatch_size=config["train_batch_size"], + num_sgd_iter=1, + num_gpus=config["num_gpus"], + shuffle_sequences=True, + _fake_gpus=config["_fake_gpus"], + framework=config.get("framework")) + train_op = rollouts.combine( ConcatBatches( min_batch_size=config["train_batch_size"], count_steps_by=config["multiagent"][ - "count_steps_by"])).for_each(TrainOneStep(workers)) + "count_steps_by"])).for_each(train_step_op) return StandardMetricsReporting(train_op, workers, config) diff --git a/rllib/agents/a3c/tests/test_a2c.py b/rllib/agents/a3c/tests/test_a2c.py index e1198de041b1b..602403dd808e1 100644 --- a/rllib/agents/a3c/tests/test_a2c.py +++ b/rllib/agents/a3c/tests/test_a2c.py @@ -1,3 +1,4 @@ +import copy import unittest import ray @@ -33,6 +34,30 @@ def test_a2c_compilation(self): check_compute_single_action(trainer) trainer.stop() + def test_a2c_fake_multi_gpu_learning(self): + """Test whether A2CTrainer can learn CartPole w/ faked multi-GPU.""" + config = copy.deepcopy(a3c.a2c.A2C_DEFAULT_CONFIG) + + # Fake GPU setup. + config["num_gpus"] = 2 + config["_fake_gpus"] = True + + config["framework"] = "tf" + # Mimic tuned_example for A2C CartPole. + config["lr"] = 0.001 + + trainer = a3c.A2CTrainer(config=config, env="CartPole-v0") + num_iterations = 100 + learnt = False + for i in range(num_iterations): + results = trainer.train() + print("reward={}".format(results["episode_reward_mean"])) + if results["episode_reward_mean"] > 100.0: + learnt = True + break + assert learnt, "A2C multi-GPU (with fake-GPUs) did not learn CartPole!" + trainer.stop() + def test_a2c_exec_impl(ray_start_regular): config = {"min_iter_time_s": 0} for _ in framework_iterator(config): diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index 00cb19e038ce2..cb9c4b5ce5ca2 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -167,14 +167,6 @@ def validate_config(config: TrainerConfigDict) -> None: raise ValueError("Prioritized replay is not supported when " "replay_sequence_length > 1.") - # Multi-gpu not supported for PyTorch and tf-eager. - #if config["framework"] in ["tf2", "tfe", "torch"]: - # config["simple_optimizer"] = True - # Performance warning, if "simple" optimizer used with (static-graph) tf. - #if config["simple_optimizer"]: - # logger.warning( - # "Using the simple minibatch optimizer. This will significantly " - # "reduce performance, consider simple_optimizer=False.") # Multi-agent mode and multi-GPU optimizer. if config["multiagent"]["policies"] and not config["simple_optimizer"]: logger.info( @@ -238,20 +230,20 @@ def update_prio(item): post_fn = config.get("before_learn_on_batch") or (lambda b, *a: b) if config["simple_optimizer"]: - train_op = TrainOneStep(workers) + train_step_op = TrainOneStep(workers) else: - train_op = TrainTFMultiGPU( + train_step_op = TrainTFMultiGPU( workers=workers, sgd_minibatch_size=config["train_batch_size"], num_sgd_iter=1, num_gpus=config["num_gpus"], - shuffle_sequences=True, #DQN: always shuffle, no sequences. + shuffle_sequences=True, _fake_gpus=config["_fake_gpus"], framework=config.get("framework")) replay_op = Replay(local_buffer=local_replay_buffer) \ .for_each(lambda x: post_fn(x, workers, config)) \ - .for_each(train_op) \ + .for_each(train_step_op) \ .for_each(update_prio) \ .for_each(UpdateTargetNetwork( workers, config["target_network_update_freq"])) diff --git a/rllib/agents/dqn/simple_q.py b/rllib/agents/dqn/simple_q.py index 9580e126192a4..cfa2631246586 100644 --- a/rllib/agents/dqn/simple_q.py +++ b/rllib/agents/dqn/simple_q.py @@ -22,7 +22,8 @@ from ray.rllib.execution.replay_buffer import LocalReplayBuffer from ray.rllib.execution.replay_ops import Replay, StoreToReplayBuffer from ray.rllib.execution.rollout_ops import ParallelRollouts -from ray.rllib.execution.train_ops import TrainOneStep, UpdateTargetNetwork +from ray.rllib.execution.train_ops import TrainTFMultiGPU, TrainOneStep, \ + UpdateTargetNetwork from ray.rllib.policy.policy import Policy from ray.rllib.utils.typing import TrainerConfigDict from ray.util.iter import LocalIterator @@ -135,9 +136,21 @@ def execution_plan(workers: WorkerSet, store_op = rollouts.for_each( StoreToReplayBuffer(local_buffer=local_replay_buffer)) + if config["simple_optimizer"]: + train_step_op = TrainOneStep(workers) + else: + train_step_op = TrainTFMultiGPU( + workers=workers, + sgd_minibatch_size=config["train_batch_size"], + num_sgd_iter=1, + num_gpus=config["num_gpus"], + shuffle_sequences=True, + _fake_gpus=config["_fake_gpus"], + framework=config.get("framework")) + # (2) Read and train on experiences from the replay buffer. replay_op = Replay(local_buffer=local_replay_buffer) \ - .for_each(TrainOneStep(workers)) \ + .for_each(train_step_op) \ .for_each(UpdateTargetNetwork( workers, config["target_network_update_freq"])) diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py index 012c9b3f9c5ab..811a3f0a7f802 100644 --- a/rllib/agents/dqn/tests/test_dqn.py +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -11,7 +11,7 @@ class TestDQN(unittest.TestCase): @classmethod def setUpClass(cls) -> None: - ray.init(local_mode=True)#TODO + ray.init() @classmethod def tearDownClass(cls) -> None: diff --git a/rllib/agents/dqn/tests/test_simple_q.py b/rllib/agents/dqn/tests/test_simple_q.py index 94e1c7a0918e4..8a8815a66cbe7 100644 --- a/rllib/agents/dqn/tests/test_simple_q.py +++ b/rllib/agents/dqn/tests/test_simple_q.py @@ -1,3 +1,4 @@ +import copy import numpy as np import unittest @@ -35,6 +36,29 @@ def test_simple_q_compilation(self): check_compute_single_action(trainer) + def test_simple_q_fake_multi_gpu_learning(self): + """Test whether SimpleQTrainer learns CartPole w/ fake GPUs.""" + config = copy.deepcopy(dqn.SIMPLE_Q_DEFAULT_CONFIG) + + # Fake GPU setup. + config["num_gpus"] = 2 + config["_fake_gpus"] = True + + config["framework"] = "tf" + + trainer = dqn.SimpleQTrainer(config=config, env="CartPole-v0") + num_iterations = 200 + learnt = False + for i in range(num_iterations): + results = trainer.train() + print("reward={}".format(results["episode_reward_mean"])) + if results["episode_reward_mean"] > 75.0: + learnt = True + break + assert learnt, "SimpleQ multi-GPU (with fake-GPUs) did not " \ + "learn CartPole!" + trainer.stop() + def test_simple_q_loss_function(self): """Tests the Simple-Q loss function results on all frameworks.""" config = dqn.SIMPLE_Q_DEFAULT_CONFIG.copy() diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index aba48c8a54e3f..0ee9889a36134 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -132,18 +132,6 @@ def validate_config(config: TrainerConfigDict) -> None: "function (to estimate the return at the end of the truncated " "trajectory). Consider setting batch_mode=complete_episodes.") - # Multi-gpu not supported for PyTorch and tf-eager. - #if config["framework"] in ["tf2", "tfe", "torch"]: - # if config["num_gpus"] > 1: - # raise ValueError( - # "`num_gpus` > 1 not supported by PPO-{} yet!".format( - # config["framework"])) - # config["simple_optimizer"] = True - # Performance warning, if "simple" optimizer used with (static-graph) tf. - #if config["simple_optimizer"]: - # logger.warning( - # "Using the simple minibatch optimizer. This will significantly " - # "reduce performance! Consider `simple_optimizer=False`.") # Multi-agent mode and multi-GPU optimizer. if config["multiagent"]["policies"] and not config["simple_optimizer"]: logger.info( @@ -206,7 +194,8 @@ def warn_about_bad_reward_scales(config, result): if DEFAULT_POLICY_ID in learner_stats: scaled_vf_loss = config["vf_loss_coeff"] * \ learner_stats[DEFAULT_POLICY_ID][LEARNER_STATS_KEY]["vf_loss"] - policy_loss = learner_stats[DEFAULT_POLICY_ID][LEARNER_STATS_KEY]["policy_loss"] + policy_loss = learner_stats[DEFAULT_POLICY_ID][LEARNER_STATS_KEY][ + "policy_loss"] if config.get("model", {}).get("vf_share_layers") and \ scaled_vf_loss > 100: logger.warning( diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 7e9145a955fce..9a00d2141db41 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -1118,10 +1118,12 @@ def _validate_config(config: PartialTrainerConfigDict): if config.get("num_gpus", 0) > 1: if config.get("framework") in ["tfe", "tf2", "torch"]: raise ValueError("`num_gpus` > 1 not supported yet for " - "framework={}!".format(config.get("framework"))) + "framework={}!".format( + config.get("framework"))) elif simple_optim_setting is True: - raise ValueError("Cannot use `simple_optimizer` if `num_gpus` > 1! " - "Consider `simple_optimizer=False`.") + raise ValueError( + "Cannot use `simple_optimizer` if `num_gpus` > 1! " + "Consider `simple_optimizer=False`.") config["simple_optimizer"] = False elif simple_optim_setting == DEPRECATED_VALUE: config["simple_optimizer"] = True diff --git a/rllib/agents/trainer_template.py b/rllib/agents/trainer_template.py index 111ed02e5c67a..08918f8dfd453 100644 --- a/rllib/agents/trainer_template.py +++ b/rllib/agents/trainer_template.py @@ -34,8 +34,8 @@ def default_execution_plan(workers: WorkerSet, config: TrainerConfigDict): train_op = train_op.for_each( TrainTFMultiGPU( workers=workers, - sgd_minibatch_size=config.get( - "sgd_minibatch_size", config["train_batch_size"]), + sgd_minibatch_size=config.get("sgd_minibatch_size", + config["train_batch_size"]), num_sgd_iter=config.get("num_sgd_iter", 1), num_gpus=config["num_gpus"], shuffle_sequences=config.get("shuffle_sequences", False), diff --git a/rllib/examples/multi_agent_cartpole.py b/rllib/examples/multi_agent_cartpole.py index 8e34eb0ca8c1c..e0964c3b42e2c 100644 --- a/rllib/examples/multi_agent_cartpole.py +++ b/rllib/examples/multi_agent_cartpole.py @@ -33,7 +33,6 @@ parser.add_argument("--stop-iters", type=int, default=200) parser.add_argument("--stop-reward", type=float, default=150) parser.add_argument("--stop-timesteps", type=int, default=100000) -#parser.add_argument("--simple", action="store_true") parser.add_argument("--num-cpus", type=int, default=0) parser.add_argument("--as-test", action="store_true") parser.add_argument( @@ -82,7 +81,6 @@ def gen_policy(i): "env_config": { "num_agents": args.num_agents, }, - #"simple_optimizer": args.simple, # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0. "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")), "num_sgd_iter": 10, diff --git a/rllib/examples/sumo_env_local.py b/rllib/examples/sumo_env_local.py index 9ac02a9523102..782844a9d6f96 100644 --- a/rllib/examples/sumo_env_local.py +++ b/rllib/examples/sumo_env_local.py @@ -73,7 +73,6 @@ config["num_workers"] = args.num_workers config["rollout_fragment_length"] = 200 config["sgd_minibatch_size"] = 256 - #config["simple_optimizer"] = True config["train_batch_size"] = 4000 config["batch_mode"] = "complete_episodes" diff --git a/rllib/execution/train_ops.py b/rllib/execution/train_ops.py index 564d3448a9559..7c5d005dec94f 100644 --- a/rllib/execution/train_ops.py +++ b/rllib/execution/train_ops.py @@ -1,4 +1,3 @@ -from collections import defaultdict import logging import numpy as np import math @@ -19,7 +18,7 @@ from ray.rllib.policy.sample_batch import SampleBatch, DEFAULT_POLICY_ID, \ MultiAgentBatch from ray.rllib.utils.framework import try_import_tf -from ray.rllib.utils.sgd import do_minibatch_sgd, averaged +from ray.rllib.utils.sgd import do_minibatch_sgd from ray.rllib.utils.typing import PolicyID, SampleBatchType, ModelGradients tf1, tf, tfv = try_import_tf() @@ -132,7 +131,8 @@ def __init__(self, "/{}:{}".format(type_, i) for i in range(int(math.ceil(num_gpus))) ] - # Total batch size (all towers). Make sure it is dividable by num towers. + # Total batch size (all towers). Make sure it is dividable by + # num towers. self.batch_size = int(sgd_minibatch_size / len(self.devices)) * len( self.devices) assert self.batch_size % len(self.devices) == 0 @@ -220,14 +220,17 @@ def __call__(self, self.sess, permutation[batch_index] * self.per_device_batch_size) - batch_fetches_all_towers.append(tree.map_structure_with_path( - lambda path, *s: self._all_tower_reduce(path, *s), - *(batch_fetches["tower_{}".format(tower_num)] - for tower_num in range(len(self.devices))))) + batch_fetches_all_towers.append( + tree.map_structure_with_path( + lambda p, *s: self._all_tower_reduce(p, *s), + *(batch_fetches["tower_{}".format(tower_num)] + for tower_num in range(len(self.devices))))) # Reduce mean across all minibatch SGD steps (axis=0 to keep # all shapes as-is). - fetches[policy_id] = tree.map_structure(lambda *s: np.nanmean(s, axis=0), *batch_fetches_all_towers) + fetches[policy_id] = tree.map_structure( + lambda *s: np.nanmean(s, axis=0), + *batch_fetches_all_towers) load_timer.push_units_processed(samples.count) learn_timer.push_units_processed(samples.count) diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index 9dac98932e0d2..4ae2d3e606a2d 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -162,7 +162,7 @@ def __init__( if existing_model: if isinstance(existing_model, list): self.model = existing_model[0] - #TODO: (sven) total hack, but works for `target_q_model/target_model` + # TODO: (sven) hack, but works for `target_[q_]?model`. for i in range(1, len(existing_model)): setattr(self, existing_model[i][0], existing_model[i][1]) elif make_model: diff --git a/rllib/tests/test_lstm.py b/rllib/tests/test_lstm.py index 5463b6f7c878d..fef305fe86358 100644 --- a/rllib/tests/test_lstm.py +++ b/rllib/tests/test_lstm.py @@ -120,7 +120,6 @@ def test_simple_optimizer_sequencing(self): "rollout_fragment_length": 10, "train_batch_size": 10, "sgd_minibatch_size": 10, - #"simple_optimizer": True, "num_sgd_iter": 1, "model": { "custom_model": "rnn", @@ -178,7 +177,6 @@ def test_minibatch_sequencing(self): "rollout_fragment_length": 20, "train_batch_size": 20, "sgd_minibatch_size": 10, - #"simple_optimizer": False, "num_sgd_iter": 1, "model": { "custom_model": "rnn", From 8e309a3dfb018ef301e2d7cc5ae2e70e5768a500 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 23 Feb 2021 16:29:39 +0100 Subject: [PATCH 09/18] wip and LINT. --- rllib/agents/cql/cql.py | 2 ++ rllib/agents/ddpg/ddpg.py | 2 ++ rllib/agents/dqn/apex.py | 11 +++++++++-- rllib/agents/dqn/dqn_torch_policy.py | 10 +++++----- rllib/agents/dqn/simple_q_tf_policy.py | 12 +++++------- rllib/agents/dqn/simple_q_torch_policy.py | 4 ++-- rllib/agents/dqn/tests/test_simple_q.py | 10 +++++++++- rllib/agents/dreamer/dreamer.py | 2 ++ rllib/agents/es/es.py | 2 ++ rllib/agents/maml/maml.py | 2 ++ rllib/agents/marwil/marwil.py | 6 ++++++ rllib/agents/mbmpo/mbmpo.py | 2 ++ rllib/agents/sac/sac.py | 3 +++ rllib/agents/slateq/slateq.py | 3 +++ 14 files changed, 54 insertions(+), 17 deletions(-) diff --git a/rllib/agents/cql/cql.py b/rllib/agents/cql/cql.py index 83bbc1183403e..3245cf6f93ec6 100644 --- a/rllib/agents/cql/cql.py +++ b/rllib/agents/cql/cql.py @@ -46,6 +46,8 @@ def validate_config(config: TrainerConfigDict): + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for CQL!") if config["framework"] == "tf": raise ValueError("Tensorflow CQL not implemented yet!") diff --git a/rllib/agents/ddpg/ddpg.py b/rllib/agents/ddpg/ddpg.py index 9e580c0f8f868..65f7e357da453 100644 --- a/rllib/agents/ddpg/ddpg.py +++ b/rllib/agents/ddpg/ddpg.py @@ -151,6 +151,8 @@ def validate_config(config): + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for DDPG!") if config["model"]["custom_model"]: logger.warning( "Setting use_state_preprocessor=True since a custom model " diff --git a/rllib/agents/dqn/apex.py b/rllib/agents/dqn/apex.py index cc48c6820a203..1236eb8a80676 100644 --- a/rllib/agents/dqn/apex.py +++ b/rllib/agents/dqn/apex.py @@ -17,8 +17,8 @@ from typing import Tuple import ray -from ray.rllib.agents.dqn.dqn import DEFAULT_CONFIG as DQN_CONFIG -from ray.rllib.agents.dqn.dqn import DQNTrainer, calculate_rr_weights +from ray.rllib.agents.dqn.dqn import calculate_rr_weights, \ + DEFAULT_CONFIG as DQN_CONFIG, DQNTrainer, validate_config from ray.rllib.agents.dqn.learner_thread import LearnerThread from ray.rllib.evaluation.worker_set import WorkerSet from ray.rllib.execution.common import (STEPS_TRAINED_COUNTER, @@ -195,7 +195,14 @@ def add_apex_metrics(result: dict) -> dict: selected_workers=selected_workers).for_each(add_apex_metrics) +def apex_validate_config(config): + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for APEX-DQN!") + validate_config(config) + + ApexTrainer = DQNTrainer.with_updates( name="APEX", default_config=APEX_DEFAULT_CONFIG, + validate_config=apex_validate_config, execution_plan=apex_execution_plan) diff --git a/rllib/agents/dqn/dqn_torch_policy.py b/rllib/agents/dqn/dqn_torch_policy.py index b6800dafa7e0e..9d0760d13ba80 100644 --- a/rllib/agents/dqn/dqn_torch_policy.py +++ b/rllib/agents/dqn/dqn_torch_policy.py @@ -161,7 +161,7 @@ def build_q_model_and_distribution( isinstance(getattr(policy, "exploration", None), ParameterNoise) or config["exploration_config"]["type"] == "ParameterNoise") - policy.q_model = ModelCatalog.get_model_v2( + model = ModelCatalog.get_model_v2( obs_space=obs_space, action_space=action_space, num_outputs=num_outputs, @@ -180,7 +180,7 @@ def build_q_model_and_distribution( # generically into ModelCatalog. add_layer_norm=add_layer_norm) - policy.q_func_vars = policy.q_model.variables() + policy.q_func_vars = model.variables() policy.target_q_model = ModelCatalog.get_model_v2( obs_space=obs_space, @@ -203,7 +203,7 @@ def build_q_model_and_distribution( policy.target_q_func_vars = policy.target_q_model.variables() - return policy.q_model, TorchCategorical + return model, TorchCategorical def get_distribution_inputs_and_class( @@ -237,7 +237,7 @@ def build_q_losses(policy: Policy, model, _, # Q-network evaluation. q_t, q_logits_t, q_probs_t = compute_q_values( policy, - policy.q_model, + policy.model, train_batch[SampleBatch.CUR_OBS], explore=False, is_training=True) @@ -265,7 +265,7 @@ def build_q_losses(policy: Policy, model, _, q_tp1_using_online_net, q_logits_tp1_using_online_net, \ q_dist_tp1_using_online_net = compute_q_values( policy, - policy.q_model, + policy.model, train_batch[SampleBatch.NEXT_OBS], explore=False, is_training=True) diff --git a/rllib/agents/dqn/simple_q_tf_policy.py b/rllib/agents/dqn/simple_q_tf_policy.py index 515e64eefb012..91e03ca34b940 100644 --- a/rllib/agents/dqn/simple_q_tf_policy.py +++ b/rllib/agents/dqn/simple_q_tf_policy.py @@ -79,7 +79,7 @@ def build_q_models(policy: Policy, obs_space: gym.spaces.Space, raise UnsupportedSpaceException( "Action space {} is not supported for DQN.".format(action_space)) - policy.q_model = ModelCatalog.get_model_v2( + model = ModelCatalog.get_model_v2( obs_space=obs_space, action_space=action_space, num_outputs=action_space.n, @@ -95,10 +95,10 @@ def build_q_models(policy: Policy, obs_space: gym.spaces.Space, framework=config["framework"], name=Q_TARGET_SCOPE) - policy.q_func_vars = policy.q_model.variables() + policy.q_func_vars = model.variables() policy.target_q_func_vars = policy.target_q_model.variables() - return policy.q_model + return model def get_distribution_inputs_and_class( @@ -114,6 +114,7 @@ def get_distribution_inputs_and_class( q_vals = q_vals[0] if isinstance(q_vals, tuple) else q_vals policy.q_values = q_vals + policy.q_func_vars = q_model.variables() return policy.q_values, (TorchCategorical if policy.config["framework"] == "torch" else Categorical), [] # state-outs @@ -135,10 +136,7 @@ def build_q_losses(policy: Policy, model: ModelV2, """ # q network evaluation q_t = compute_q_values( - policy, - policy.q_model, - train_batch[SampleBatch.CUR_OBS], - explore=False) + policy, policy.model, train_batch[SampleBatch.CUR_OBS], explore=False) # target q network evalution q_tp1 = compute_q_values( diff --git a/rllib/agents/dqn/simple_q_torch_policy.py b/rllib/agents/dqn/simple_q_torch_policy.py index 9862f82b79749..9e39596e9ad59 100644 --- a/rllib/agents/dqn/simple_q_torch_policy.py +++ b/rllib/agents/dqn/simple_q_torch_policy.py @@ -38,7 +38,7 @@ def do_update(): # target Q network. assert len(self.q_func_vars) == len(self.target_q_func_vars), \ (self.q_func_vars, self.target_q_func_vars) - self.target_q_model.load_state_dict(self.q_model.state_dict()) + self.target_q_model.load_state_dict(self.model.state_dict()) self.update_target = do_update @@ -67,7 +67,7 @@ def build_q_losses(policy: Policy, model, dist_class, # q network evaluation q_t = compute_q_values( policy, - policy.q_model, + policy.model, train_batch[SampleBatch.CUR_OBS], explore=False, is_training=True) diff --git a/rllib/agents/dqn/tests/test_simple_q.py b/rllib/agents/dqn/tests/test_simple_q.py index 8a8815a66cbe7..09524fa21618f 100644 --- a/rllib/agents/dqn/tests/test_simple_q.py +++ b/rllib/agents/dqn/tests/test_simple_q.py @@ -2,6 +2,7 @@ import numpy as np import unittest +import ray import ray.rllib.agents.dqn as dqn from ray.rllib.agents.dqn.simple_q_tf_policy import build_q_losses as loss_tf from ray.rllib.agents.dqn.simple_q_torch_policy import build_q_losses as \ @@ -16,6 +17,14 @@ class TestSimpleQ(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + ray.init() + + @classmethod + def tearDownClass(cls) -> None: + ray.shutdown() + def test_simple_q_compilation(self): """Test whether a SimpleQTrainer can be built on all frameworks.""" config = dqn.SIMPLE_Q_DEFAULT_CONFIG.copy() @@ -43,7 +52,6 @@ def test_simple_q_fake_multi_gpu_learning(self): # Fake GPU setup. config["num_gpus"] = 2 config["_fake_gpus"] = True - config["framework"] = "tf" trainer = dqn.SimpleQTrainer(config=config, env="CartPole-v0") diff --git a/rllib/agents/dreamer/dreamer.py b/rllib/agents/dreamer/dreamer.py index 21646d61871d2..8d783514f5100 100644 --- a/rllib/agents/dreamer/dreamer.py +++ b/rllib/agents/dreamer/dreamer.py @@ -247,6 +247,8 @@ def get_policy_class(config): def validate_config(config): config["action_repeat"] = config["env_config"]["frame_skip"] + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for Dreamer!") if config["framework"] != "torch": raise ValueError("Dreamer not supported in Tensorflow yet!") if config["batch_mode"] != "complete_episodes": diff --git a/rllib/agents/es/es.py b/rllib/agents/es/es.py index 28202e3bdec98..09ff9f5f011c6 100644 --- a/rllib/agents/es/es.py +++ b/rllib/agents/es/es.py @@ -178,6 +178,8 @@ def get_policy_class(config): def validate_config(config): + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for ES/ARS!") if config["num_workers"] <= 0: raise ValueError("`num_workers` must be > 0 for ES!") if config["evaluation_config"]["num_envs_per_worker"] != 1: diff --git a/rllib/agents/maml/maml.py b/rllib/agents/maml/maml.py index 52477f43b018e..9d82a0e192cc5 100644 --- a/rllib/agents/maml/maml.py +++ b/rllib/agents/maml/maml.py @@ -219,6 +219,8 @@ def get_policy_class(config): def validate_config(config): + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for MAML!") if config["inner_adaptation_steps"] <= 0: raise ValueError("Inner Adaptation Steps must be >=1!") if config["maml_optimizer_steps"] <= 0: diff --git a/rllib/agents/marwil/marwil.py b/rllib/agents/marwil/marwil.py index d123b3ef5f5f8..68b917ed9678e 100644 --- a/rllib/agents/marwil/marwil.py +++ b/rllib/agents/marwil/marwil.py @@ -70,9 +70,15 @@ def execution_plan(workers, config): return StandardMetricsReporting(train_op, workers, config) +def validate_config(config): + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for MARWIL!") + + MARWILTrainer = build_trainer( name="MARWIL", default_config=DEFAULT_CONFIG, default_policy=MARWILTFPolicy, get_policy_class=get_policy_class, + validate_config=validate_config, execution_plan=execution_plan) diff --git a/rllib/agents/mbmpo/mbmpo.py b/rllib/agents/mbmpo/mbmpo.py index a3731ea9d35ab..0a537213ac193 100644 --- a/rllib/agents/mbmpo/mbmpo.py +++ b/rllib/agents/mbmpo/mbmpo.py @@ -416,6 +416,8 @@ def validate_config(config): Raises: ValueError: In case something is wrong with the config. """ + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for MB-MPO!") if config["framework"] != "torch": logger.warning("MB-MPO only supported in PyTorch so far! Switching to " "`framework=torch`.") diff --git a/rllib/agents/sac/sac.py b/rllib/agents/sac/sac.py index 97d0f7d771477..d080cc2488756 100644 --- a/rllib/agents/sac/sac.py +++ b/rllib/agents/sac/sac.py @@ -167,6 +167,9 @@ def validate_config(config: TrainerConfigDict) -> None: Raises: ValueError: In case something is wrong with the config. """ + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for SAC!") + if config["use_state_preprocessor"] != DEPRECATED_VALUE: deprecation_warning( old="config['use_state_preprocessor']", error=False) diff --git a/rllib/agents/slateq/slateq.py b/rllib/agents/slateq/slateq.py index f392a16b2ecde..80cedb69609a9 100644 --- a/rllib/agents/slateq/slateq.py +++ b/rllib/agents/slateq/slateq.py @@ -132,6 +132,9 @@ def validate_config(config: TrainerConfigDict) -> None: """Checks the config based on settings""" + if config["num_gpus"] > 1: + raise ValueError("`num_gpus` > 1 not yet supported for SlateQ!") + if config["framework"] != "torch": raise ValueError("SlateQ only runs on PyTorch") From f73c286e9d4835ac941720dd2a549fc7a769f360 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 24 Feb 2021 13:00:40 +0100 Subject: [PATCH 10/18] wip and LINT. --- rllib/agents/ppo/ppo.py | 3 ++- rllib/agents/ppo/tests/test_ppo.py | 4 ++-- rllib/utils/sgd.py | 9 ++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index 0ee9889a36134..5f30adb582245 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -173,7 +173,7 @@ def update(pi, pi_id): ("{} should be nested under policy id key".format( LEARNER_STATS_KEY), fetches) if pi_id in fetches: - kl = fetches[pi_id].get("kl") + kl = fetches[pi_id][LEARNER_STATS_KEY].get("kl") assert kl is not None, (fetches, pi_id) # Make the actual `Policy.update_kl()` call. pi.update_kl(kl) @@ -194,6 +194,7 @@ def warn_about_bad_reward_scales(config, result): if DEFAULT_POLICY_ID in learner_stats: scaled_vf_loss = config["vf_loss_coeff"] * \ learner_stats[DEFAULT_POLICY_ID][LEARNER_STATS_KEY]["vf_loss"] + policy_loss = learner_stats[DEFAULT_POLICY_ID][LEARNER_STATS_KEY][ "policy_loss"] if config.get("model", {}).get("vf_share_layers") and \ diff --git a/rllib/agents/ppo/tests/test_ppo.py b/rllib/agents/ppo/tests/test_ppo.py index 732c149cca743..963cbd7e3c60f 100644 --- a/rllib/agents/ppo/tests/test_ppo.py +++ b/rllib/agents/ppo/tests/test_ppo.py @@ -90,7 +90,7 @@ def test_ppo_compilation_and_lr_schedule(self): for _ in framework_iterator(config): for env in ["CartPole-v0", "MsPacmanNoFrameskip-v4"]: print("Env={}".format(env)) - for lstm in [True, False]: + for lstm in [False, True]: print("LSTM={}".format(lstm)) config["model"]["use_lstm"] = lstm config["model"]["lstm_use_prev_action"] = lstm @@ -127,7 +127,7 @@ def test_ppo_fake_multi_gpu_learning(self): for i in range(num_iterations): results = trainer.train() print(results) - if results["episode_reward_mean"] > 150: + if results["episode_reward_mean"] > 75.0: learnt = True break assert learnt, "PPO multi-GPU (with fake-GPUs) did not learn CartPole!" diff --git a/rllib/utils/sgd.py b/rllib/utils/sgd.py index 787b885cd7d69..528c162d586a8 100644 --- a/rllib/utils/sgd.py +++ b/rllib/utils/sgd.py @@ -118,7 +118,7 @@ def do_minibatch_sgd(samples, policies, local_worker, num_sgd_iter, if isinstance(samples, SampleBatch): samples = MultiAgentBatch({DEFAULT_POLICY_ID: samples}, samples.count) - fetches = {} + fetches = defaultdict(dict) for policy_id in policies.keys(): if policy_id not in samples.policy_batches: continue @@ -128,14 +128,13 @@ def do_minibatch_sgd(samples, policies, local_worker, num_sgd_iter, batch[field] = standardized(batch[field]) for i in range(num_sgd_iter): - iter_extra_fetches = defaultdict(list) + learner_stats = defaultdict(list) for minibatch in minibatches(batch, sgd_minibatch_size): batch_fetches = (local_worker.learn_on_batch( MultiAgentBatch({ policy_id: minibatch }, minibatch.count)))[policy_id] for k, v in batch_fetches.get(LEARNER_STATS_KEY, {}).items(): - iter_extra_fetches[k].append(v) - logger.debug("{} {}".format(i, averaged(iter_extra_fetches))) - fetches[policy_id] = averaged(iter_extra_fetches) + learner_stats[k].append(v) + fetches[policy_id][LEARNER_STATS_KEY] = averaged(learner_stats) return fetches From ae2dcb60359d2414b4e2729d8fedcf6634d72a71 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 24 Feb 2021 13:06:16 +0100 Subject: [PATCH 11/18] docs --- doc/source/rllib-algorithms.rst | 46 ++++++++++++++++----------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/doc/source/rllib-algorithms.rst b/doc/source/rllib-algorithms.rst index b4f42c7ceab83..0e7bbe05ace36 100644 --- a/doc/source/rllib-algorithms.rst +++ b/doc/source/rllib-algorithms.rst @@ -8,29 +8,29 @@ RLlib Algorithms Available Algorithms - Overview ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -=================== ========== ======================= ================== =========== ============================================================= -Algorithm Frameworks Discrete Actions Continuous Actions Multi-Agent Model Support -=================== ========== ======================= ================== =========== ============================================================= -`A2C, A3C`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_, `+LSTM auto-wrapping`_, `+Attention`_, `+autoreg`_ -`ARS`_ tf + torch **Yes** **Yes** No -`BC`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_ -`ES`_ tf + torch **Yes** **Yes** No -`DDPG`_, `TD3`_ tf + torch No **Yes** **Yes** -`APEX-DDPG`_ tf + torch No **Yes** **Yes** -`Dreamer`_ torch No **Yes** No `+RNN`_ -`DQN`_, `Rainbow`_ tf + torch **Yes** `+parametric`_ No **Yes** -`APEX-DQN`_ tf + torch **Yes** `+parametric`_ No **Yes** -`IMPALA`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_, `+LSTM auto-wrapping`_, `+Attention`_, `+autoreg`_ -`MAML`_ tf + torch No **Yes** No -`MARWIL`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_ -`MBMPO`_ torch No **Yes** No -`PG`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_, `+LSTM auto-wrapping`_, `+Attention`_, `+autoreg`_ -`PPO`_, `APPO`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_, `+LSTM auto-wrapping`_, `+Attention`_, `+autoreg`_ -`SAC`_ tf + torch **Yes** **Yes** **Yes** -`SlateQ`_ torch **Yes** No No -`LinUCB`_, `LinTS`_ torch **Yes** `+parametric`_ No **Yes** -`AlphaZero`_ torch **Yes** `+parametric`_ No No -=================== ========== ======================= ================== =========== ============================================================= +=================== ========== ======================= ================== =========== ============================================================= ========= +Algorithm Frameworks Discrete Actions Continuous Actions Multi-Agent Model Support Multi-GPU +=================== ========== ======================= ================== =========== ============================================================= ========= +`A2C, A3C`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_, `+LSTM auto-wrapping`_, `+Attention`_, `+autoreg`_ tf (A2C) +`ARS`_ tf + torch **Yes** **Yes** No No +`BC`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_ No +`ES`_ tf + torch **Yes** **Yes** No No +`DDPG`_, `TD3`_ tf + torch No **Yes** **Yes** No +`APEX-DDPG`_ tf + torch No **Yes** **Yes** No +`Dreamer`_ torch No **Yes** No `+RNN`_ No +`DQN`_, `Rainbow`_ tf + torch **Yes** `+parametric`_ No **Yes** tf +`APEX-DQN`_ tf + torch **Yes** `+parametric`_ No **Yes** No +`IMPALA`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_, `+LSTM auto-wrapping`_, `+Attention`_, `+autoreg`_ tf +`MAML`_ tf + torch No **Yes** No No +`MARWIL`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_ No +`MBMPO`_ torch No **Yes** No No +`PG`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_, `+LSTM auto-wrapping`_, `+Attention`_, `+autoreg`_ tf +`PPO`_, `APPO`_ tf + torch **Yes** `+parametric`_ **Yes** **Yes** `+RNN`_, `+LSTM auto-wrapping`_, `+Attention`_, `+autoreg`_ tf +`SAC`_ tf + torch **Yes** **Yes** **Yes** No +`SlateQ`_ torch **Yes** No No No +`LinUCB`_, `LinTS`_ torch **Yes** `+parametric`_ No **Yes** No +`AlphaZero`_ torch **Yes** `+parametric`_ No No No +=================== ========== ======================= ================== =========== ============================================================= ========= Multi-Agent only Methods From 816b37dc2ffb3bb6de08dbde9bef60bc98fa5043 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Thu, 25 Feb 2021 10:13:55 +0100 Subject: [PATCH 12/18] fix. --- rllib/agents/dqn/dqn.py | 9 +++++++-- rllib/policy/tf_policy_template.py | 6 ++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index cb9c4b5ce5ca2..aeb1852148ac3 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -24,7 +24,7 @@ from ray.rllib.execution.rollout_ops import ParallelRollouts from ray.rllib.execution.train_ops import TrainOneStep, UpdateTargetNetwork, \ TrainTFMultiGPU -from ray.rllib.policy.policy import Policy +from ray.rllib.policy.policy import LEARNER_STATS_KEY, Policy from ray.rllib.utils.typing import TrainerConfigDict from ray.util.iter import LocalIterator @@ -218,7 +218,12 @@ def update_prio(item): if config.get("prioritized_replay"): prio_dict = {} for policy_id, info in info_dict.items(): - td_error = info.get("td_error") + # TODO(sven): This is currently structured differently for + # torch/tf. Clean up these results/info dicts across + # policies (note: fixing this in torch_policy.py will + # break e.g. DDPPO!). + td_error = info.get("td_error", + info[LEARNER_STATS_KEY].get("td_error")) prio_dict[policy_id] = (samples.policy_batches[policy_id] .data.get("batch_indexes"), td_error) local_replay_buffer.update_priorities(prio_dict) diff --git a/rllib/policy/tf_policy_template.py b/rllib/policy/tf_policy_template.py index 34e7da360592b..b2749940e477e 100644 --- a/rllib/policy/tf_policy_template.py +++ b/rllib/policy/tf_policy_template.py @@ -283,6 +283,12 @@ def extra_compute_action_fetches(self): @override(TFPolicy) def extra_compute_grad_fetches(self): if extra_learn_fetches_fn: + # TODO: (sven) in torch, extra_learn_fetches do not exist. + # Hence, things like td_error are returned by the stats_fn + # and end up under the LEARNER_STATS_KEY. We should + # change tf to do this as well. However, this will confilct + # the handling of LEARNER_STATS_KEY inside the multi-GPU + # train op. # Auto-add empty learner stats dict if needed. return dict({ LEARNER_STATS_KEY: {} From 380d1e8e10ef1e290fa23ca1212b5e6ca94d7053 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 2 Mar 2021 15:58:33 +0100 Subject: [PATCH 13/18] fixes --- rllib/agents/dqn/dqn_tf_policy.py | 3 +-- rllib/agents/dqn/dqn_torch_policy.py | 3 +-- rllib/agents/ppo/tests/test_ddppo.py | 4 +++- rllib/tests/test_lstm.py | 18 +++++++++++------- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/rllib/agents/dqn/dqn_tf_policy.py b/rllib/agents/dqn/dqn_tf_policy.py index ca942fbacac84..3dc8ab00a055d 100644 --- a/rllib/agents/dqn/dqn_tf_policy.py +++ b/rllib/agents/dqn/dqn_tf_policy.py @@ -240,8 +240,7 @@ def build_q_losses(policy: Policy, model, _, # q network evaluation q_t, q_logits_t, q_dist_t, _ = compute_q_values( policy, - model, - {"obs": train_batch[SampleBatch.CUR_OBS]}, + model, {"obs": train_batch[SampleBatch.CUR_OBS]}, state_batches=None, explore=False) diff --git a/rllib/agents/dqn/dqn_torch_policy.py b/rllib/agents/dqn/dqn_torch_policy.py index f5a6697eba7dc..70b90a05d6a3f 100644 --- a/rllib/agents/dqn/dqn_torch_policy.py +++ b/rllib/agents/dqn/dqn_torch_policy.py @@ -241,8 +241,7 @@ def build_q_losses(policy: Policy, model, _, # Q-network evaluation. q_t, q_logits_t, q_probs_t, _ = compute_q_values( policy, - model, - {"obs": train_batch[SampleBatch.CUR_OBS]}, + model, {"obs": train_batch[SampleBatch.CUR_OBS]}, explore=False, is_training=True) diff --git a/rllib/agents/ppo/tests/test_ddppo.py b/rllib/agents/ppo/tests/test_ddppo.py index 94096700847d2..273df0de5af10 100644 --- a/rllib/agents/ppo/tests/test_ddppo.py +++ b/rllib/agents/ppo/tests/test_ddppo.py @@ -2,6 +2,7 @@ import ray import ray.rllib.agents.ppo as ppo +from ray.rllib.policy.policy import LEARNER_STATS_KEY from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID from ray.rllib.utils.test_utils import check_compute_single_action, \ framework_iterator @@ -40,7 +41,8 @@ def test_ddppo_schedule(self): trainer = ppo.ddppo.DDPPOTrainer(config=config, env="CartPole-v0") for _ in range(num_iterations): result = trainer.train() - lr = result["info"]["learner"][DEFAULT_POLICY_ID]["cur_lr"] + lr = result["info"]["learner"][DEFAULT_POLICY_ID][ + LEARNER_STATS_KEY]["cur_lr"] trainer.stop() assert lr == 0.0, "lr should anneal to 0.0" diff --git a/rllib/tests/test_lstm.py b/rllib/tests/test_lstm.py index fef305fe86358..da0a4daa29cb7 100644 --- a/rllib/tests/test_lstm.py +++ b/rllib/tests/test_lstm.py @@ -196,15 +196,17 @@ def test_minibatch_sequencing(self): ray.experimental.internal_kv._internal_kv_get("rnn_spy_in_1")) if batch0["sequences"][0][0][0] > batch1["sequences"][0][0][0]: batch0, batch1 = batch1, batch0 # sort minibatches - self.assertEqual(batch0["seq_lens"].tolist(), [4, 4]) - self.assertEqual(batch1["seq_lens"].tolist(), [4, 3]) + self.assertEqual(batch0["seq_lens"].tolist(), [4, 4, 2]) + self.assertEqual(batch1["seq_lens"].tolist(), [4, 3, 3]) self.assertEqual(batch0["sequences"].tolist(), [ [[0], [1], [2], [3]], [[4], [5], [6], [7]], + [[8], [9], [0], [0]], ]) self.assertEqual(batch1["sequences"].tolist(), [ [[8], [9], [10], [11]], [[12], [13], [14], [0]], + [[0], [1], [2], [0]], ]) # second epoch: 20 observations get split into 2 minibatches of 8 @@ -215,15 +217,17 @@ def test_minibatch_sequencing(self): ray.experimental.internal_kv._internal_kv_get("rnn_spy_in_3")) if batch2["sequences"][0][0][0] > batch3["sequences"][0][0][0]: batch2, batch3 = batch3, batch2 - self.assertEqual(batch2["seq_lens"].tolist(), [4, 4]) - self.assertEqual(batch3["seq_lens"].tolist(), [2, 4]) + self.assertEqual(batch2["seq_lens"].tolist(), [4, 4, 2]) + self.assertEqual(batch3["seq_lens"].tolist(), [4, 4, 2]) self.assertEqual(batch2["sequences"].tolist(), [ - [[5], [6], [7], [8]], - [[9], [10], [11], [12]], + [[0], [1], [2], [3]], + [[4], [5], [6], [7]], + [[8], [9], [0], [0]], ]) self.assertEqual(batch3["sequences"].tolist(), [ + [[5], [6], [7], [8]], + [[9], [10], [11], [12]], [[13], [14], [0], [0]], - [[0], [1], [2], [3]], ]) From 8c074b5cad30a4292971d6ab677df21b65e1c43b Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 2 Mar 2021 18:52:31 +0100 Subject: [PATCH 14/18] LINT. --- rllib/examples/custom_keras_model.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rllib/examples/custom_keras_model.py b/rllib/examples/custom_keras_model.py index 03ccb0859105b..eb3454c27458a 100644 --- a/rllib/examples/custom_keras_model.py +++ b/rllib/examples/custom_keras_model.py @@ -11,6 +11,7 @@ from ray.rllib.models.tf.misc import normc_initializer from ray.rllib.models.tf.tf_modelv2 import TFModelV2 from ray.rllib.models.tf.visionnet import VisionNetwork as MyVisionNetwork +from ray.rllib.policy.policy import LEARNER_STATS_KEY from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID from ray.rllib.utils.framework import try_import_tf @@ -84,7 +85,7 @@ def __init__(self, obs_space, action_space, num_outputs, model_config, kernel_initializer=normc_initializer(1.0))(layer_1) self.base_model = tf.keras.Model(self.inputs, layer_out) - # Implement the core forward method + # Implement the core forward method. def forward(self, input_dict, state, seq_lens): model_out = self.base_model(input_dict["obs"]) return model_out, state @@ -107,7 +108,7 @@ def metrics(self): def check_has_custom_metric(result): r = result["result"]["info"]["learner"] if DEFAULT_POLICY_ID in r: - r = r[DEFAULT_POLICY_ID] + r = r[DEFAULT_POLICY_ID][LEARNER_STATS_KEY] assert r["model"]["foo"] == 42, result if args.run == "DQN": From 810e8baf1f1dc534c8b17c39b87f77b0dc3032d7 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Mon, 8 Mar 2021 10:32:20 +0100 Subject: [PATCH 15/18] wip. --- rllib/BUILD | 2 -- rllib/examples/checkpoint_by_custom_criteria.py | 6 ++++-- rllib/examples/custom_keras_model.py | 5 ++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/rllib/BUILD b/rllib/BUILD index 5cb1033f67592..f44d89629da3a 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -173,7 +173,6 @@ py_test( "tuned_examples/dqn/cartpole-dqn.yaml", "tuned_examples/dqn/cartpole-dqn-softq.yaml", "tuned_examples/dqn/cartpole-dqn-param-noise.yaml", - "tuned_examples/dqn/cartpole-r2d2.yaml", ], args = ["--yaml-dir=tuned_examples/dqn"] ) @@ -188,7 +187,6 @@ py_test( "tuned_examples/dqn/cartpole-dqn.yaml", "tuned_examples/dqn/cartpole-dqn-softq.yaml", "tuned_examples/dqn/cartpole-dqn-param-noise.yaml", - "tuned_examples/dqn/cartpole-r2d2.yaml", ], args = ["--yaml-dir=tuned_examples/dqn", "--framework=torch"] ) diff --git a/rllib/examples/checkpoint_by_custom_criteria.py b/rllib/examples/checkpoint_by_custom_criteria.py index f426d3f001ebb..891d0dccfe28d 100644 --- a/rllib/examples/checkpoint_by_custom_criteria.py +++ b/rllib/examples/checkpoint_by_custom_criteria.py @@ -63,13 +63,15 @@ # Checkpoint with the lowest policy loss value: ckpt = results.get_best_checkpoint( best_trial, - metric="info/learner/default_policy/policy_loss", + metric="info/learner/default_policy/learner_stats/policy_loss", mode="min") print("Lowest pol-loss: {}".format(ckpt)) # Checkpoint with the highest value-function loss: ckpt = results.get_best_checkpoint( - best_trial, metric="info/learner/default_policy/vf_loss", mode="max") + best_trial, + metric="info/learner/default_policy/learner_stats/vf_loss", + mode="max") print("Highest vf-loss: {}".format(ckpt)) ray.shutdown() diff --git a/rllib/examples/custom_keras_model.py b/rllib/examples/custom_keras_model.py index eb3454c27458a..03ccb0859105b 100644 --- a/rllib/examples/custom_keras_model.py +++ b/rllib/examples/custom_keras_model.py @@ -11,7 +11,6 @@ from ray.rllib.models.tf.misc import normc_initializer from ray.rllib.models.tf.tf_modelv2 import TFModelV2 from ray.rllib.models.tf.visionnet import VisionNetwork as MyVisionNetwork -from ray.rllib.policy.policy import LEARNER_STATS_KEY from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID from ray.rllib.utils.framework import try_import_tf @@ -85,7 +84,7 @@ def __init__(self, obs_space, action_space, num_outputs, model_config, kernel_initializer=normc_initializer(1.0))(layer_1) self.base_model = tf.keras.Model(self.inputs, layer_out) - # Implement the core forward method. + # Implement the core forward method def forward(self, input_dict, state, seq_lens): model_out = self.base_model(input_dict["obs"]) return model_out, state @@ -108,7 +107,7 @@ def metrics(self): def check_has_custom_metric(result): r = result["result"]["info"]["learner"] if DEFAULT_POLICY_ID in r: - r = r[DEFAULT_POLICY_ID][LEARNER_STATS_KEY] + r = r[DEFAULT_POLICY_ID] assert r["model"]["foo"] == 42, result if args.run == "DQN": From bbca72ff8b1104a2b1062b1567f27f5c7927698e Mon Sep 17 00:00:00 2001 From: sven1977 Date: Mon, 8 Mar 2021 11:59:23 +0100 Subject: [PATCH 16/18] fixes. --- rllib/agents/dqn/r2d2_tf_policy.py | 2 +- rllib/agents/dqn/r2d2_torch_policy.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rllib/agents/dqn/r2d2_tf_policy.py b/rllib/agents/dqn/r2d2_tf_policy.py index fb18000eb90b6..d310694399f41 100644 --- a/rllib/agents/dqn/r2d2_tf_policy.py +++ b/rllib/agents/dqn/r2d2_tf_policy.py @@ -78,7 +78,7 @@ def r2d2_loss(policy: Policy, model, _, # Q-network evaluation (at t). q, _, _, _ = compute_q_values( policy, - policy.q_model, + model, train_batch, state_batches=state_batches, seq_lens=train_batch.get("seq_lens"), diff --git a/rllib/agents/dqn/r2d2_torch_policy.py b/rllib/agents/dqn/r2d2_torch_policy.py index 5468a86f44141..dd6a38f411a10 100644 --- a/rllib/agents/dqn/r2d2_torch_policy.py +++ b/rllib/agents/dqn/r2d2_torch_policy.py @@ -86,7 +86,7 @@ def r2d2_loss(policy: Policy, model, _, # Q-network evaluation (at t). q, _, _, _ = compute_q_values( policy, - policy.q_model, + model, train_batch, state_batches=state_batches, seq_lens=train_batch.get("seq_lens"), From f23598c4d38d81153aca34552736b96aafaf1976 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Mon, 8 Mar 2021 14:00:55 +0100 Subject: [PATCH 17/18] fix --- rllib/examples/custom_keras_model.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rllib/examples/custom_keras_model.py b/rllib/examples/custom_keras_model.py index 03ccb0859105b..54cbac8ea2218 100644 --- a/rllib/examples/custom_keras_model.py +++ b/rllib/examples/custom_keras_model.py @@ -11,6 +11,7 @@ from ray.rllib.models.tf.misc import normc_initializer from ray.rllib.models.tf.tf_modelv2 import TFModelV2 from ray.rllib.models.tf.visionnet import VisionNetwork as MyVisionNetwork +from ray.rllib.policy.policy import LEARNER_STATS_KEY from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID from ray.rllib.utils.framework import try_import_tf @@ -84,7 +85,7 @@ def __init__(self, obs_space, action_space, num_outputs, model_config, kernel_initializer=normc_initializer(1.0))(layer_1) self.base_model = tf.keras.Model(self.inputs, layer_out) - # Implement the core forward method + # Implement the core forward method. def forward(self, input_dict, state, seq_lens): model_out = self.base_model(input_dict["obs"]) return model_out, state @@ -95,7 +96,7 @@ def metrics(self): if __name__ == "__main__": args = parser.parse_args() - ray.init(num_cpus=args.num_cpus or None) + ray.init(num_cpus=args.num_cpus or None, local_mode=True) ModelCatalog.register_custom_model( "keras_model", MyVisionNetwork if args.use_vision_network else MyKerasModel) @@ -107,7 +108,8 @@ def metrics(self): def check_has_custom_metric(result): r = result["result"]["info"]["learner"] if DEFAULT_POLICY_ID in r: - r = r[DEFAULT_POLICY_ID] + r = r[DEFAULT_POLICY_ID].get(LEARNER_STATS_KEY, + r[DEFAULT_POLICY_ID]) assert r["model"]["foo"] == 42, result if args.run == "DQN": From acd182debc008c7404b802a6927e6e632650cce0 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Mon, 8 Mar 2021 15:39:53 +0100 Subject: [PATCH 18/18] fix --- rllib/BUILD | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/BUILD b/rllib/BUILD index f44d89629da3a..a4a836fe34b19 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -601,7 +601,7 @@ py_test( py_test( name = "test_pg", tags = ["agents_dir"], - size = "small", + size = "medium", srcs = ["agents/pg/tests/test_pg.py"] )