diff --git a/python/ray/rllib/dqn/apex.py b/python/ray/rllib/dqn/apex.py index dbc6d72298710..0ec1778fc4ffe 100644 --- a/python/ray/rllib/dqn/apex.py +++ b/python/ray/rllib/dqn/apex.py @@ -9,6 +9,7 @@ optimizer_config=dict(DQN_CONFIG["optimizer_config"], **dict( max_weight_sync_delay=400, num_replay_buffer_shards=4, + debug=False, )), n_step=3, num_workers=32, diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index ac018a0e5dc70..a264754c4f129 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -11,7 +11,7 @@ import ray from ray.rllib import optimizers from ray.rllib.dqn.dqn_evaluator import DQNEvaluator -from ray.rllib.utils.actors import split_colocated +from ray.rllib.utils.actors import drop_colocated from ray.rllib.agent import Agent from ray.tune.result import TrainingResult @@ -134,8 +134,7 @@ def _init(self): for i in range(self.config["num_workers"])] if self.config["force_evaluators_remote"]: - _, self.remote_evaluators = split_colocated( - self.remote_evaluators) + self.remote_evaluators = drop_colocated(self.remote_evaluators) for k in OPTIMIZER_SHARED_CONFIGS: if k not in self.config["optimizer_config"]: diff --git a/python/ray/rllib/optimizers/apex_optimizer.py b/python/ray/rllib/optimizers/apex_optimizer.py index fef57ed968fcb..78a210b8a85ac 100644 --- a/python/ray/rllib/optimizers/apex_optimizer.py +++ b/python/ray/rllib/optimizers/apex_optimizer.py @@ -69,12 +69,11 @@ def replay(self): "batch_indexes": batch_indexes}) return batch - def update_priorities(self, batch, td_errors): + def update_priorities(self, batch_indexes, td_errors): with self.update_priorities_timer: new_priorities = ( np.abs(td_errors) + self.prioritized_replay_eps) - self.replay_buffer.update_priorities( - batch["batch_indexes"], new_priorities) + self.replay_buffer.update_priorities(batch_indexes, new_priorities) def stats(self): stat = { @@ -121,8 +120,10 @@ def _init( prioritized_replay=True, prioritized_replay_alpha=0.6, prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6, train_batch_size=512, sample_batch_size=50, - num_replay_buffer_shards=1, max_weight_sync_delay=400): + num_replay_buffer_shards=1, max_weight_sync_delay=400, + debug=False): + self.debug = debug self.replay_starts = learning_starts self.prioritized_replay_beta = prioritized_replay_beta self.prioritized_replay_eps = prioritized_replay_eps @@ -145,9 +146,6 @@ def _init( self.timers = {k: TimerStat() for k in [ "put_weights", "get_samples", "enqueue", "sample_processing", "replay_processing", "update_priorities", "train", "sample"]} - self.meters = {k: WindowStat(k, 10) for k in [ - "samples_per_loop", "replays_per_loop", "reprios_per_loop", - "reweights_per_loop"]} self.num_weight_syncs = 0 self.learning_started = False @@ -212,8 +210,6 @@ def _step(self): # Kick off another sample request self.sample_tasks.add(ev, ev.sample.remote()) - self.meters["samples_per_loop"].push(i) - self.meters["reweights_per_loop"].push(num_weight_syncs) with self.timers["replay_processing"]: i = 0 @@ -224,16 +220,14 @@ def _step(self): samples = ray.get(replay) with self.timers["enqueue"]: self.learner.inqueue.put((ra, samples)) - self.meters["replays_per_loop"].push(i) with self.timers["update_priorities"]: i = 0 while not self.learner.outqueue.empty(): i += 1 ra, replay, td_error = self.learner.outqueue.get() - ra.update_priorities.remote(replay, td_error) + ra.update_priorities.remote(replay["batch_indexes"], td_error) train_timesteps += self.train_batch_size - self.meters["reprios_per_loop"].push(i) return sample_timesteps, train_timesteps @@ -248,18 +242,18 @@ def stats(self): timing["learner_dequeue_time_ms"] = round( 1000 * self.learner.queue_timer.mean, 3) stats = { - "replay_shard_0": replay_stats, - "timing_breakdown": timing, "sample_throughput": round( self.timers["sample"].mean_throughput, 3), "train_throughput": round(self.timers["train"].mean_throughput, 3), "num_weight_syncs": self.num_weight_syncs, + } + debug_stats = { + "replay_shard_0": replay_stats, + "timing_breakdown": timing, "pending_sample_tasks": self.sample_tasks.count, "pending_replay_tasks": self.replay_tasks.count, "learner_queue": self.learner.learner_queue_size.stats(), - "samples": self.meters["samples_per_loop"].stats(), - "replays": self.meters["replays_per_loop"].stats(), - "reprios": self.meters["reprios_per_loop"].stats(), - "reweights": self.meters["reweights_per_loop"].stats(), } + if self.debug: + stats.update(debug_stats) return dict(Optimizer.stats(self), **stats) diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py index 4762e58aae9d2..f463d4c4e27aa 100644 --- a/python/ray/rllib/utils/actors.py +++ b/python/ray/rllib/utils/actors.py @@ -25,6 +25,13 @@ def count(self): return len(self._tasks) +def drop_colocated(actors): + colocated, non_colocated = split_colocated(actors) + for a in colocated: + a.__ray_terminate__.remote(a._ray_actor_id.id()) + return non_colocated + + def split_colocated(actors): localhost = os.uname()[1] hosts = ray.get([a.get_host.remote() for a in actors])