Skip to content

Commit

Permalink
[rllib] Move Ape-X metrics behind a debug flag and remove some of them (
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl authored and richardliaw committed Mar 8, 2018
1 parent b0510ee commit 75e8251
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 21 deletions.
1 change: 1 addition & 0 deletions python/ray/rllib/dqn/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
optimizer_config=dict(DQN_CONFIG["optimizer_config"], **dict(
max_weight_sync_delay=400,
num_replay_buffer_shards=4,
debug=False,
)),
n_step=3,
num_workers=32,
Expand Down
5 changes: 2 additions & 3 deletions python/ray/rllib/dqn/dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import ray
from ray.rllib import optimizers
from ray.rllib.dqn.dqn_evaluator import DQNEvaluator
from ray.rllib.utils.actors import split_colocated
from ray.rllib.utils.actors import drop_colocated
from ray.rllib.agent import Agent
from ray.tune.result import TrainingResult

Expand Down Expand Up @@ -134,8 +134,7 @@ def _init(self):
for i in range(self.config["num_workers"])]

if self.config["force_evaluators_remote"]:
_, self.remote_evaluators = split_colocated(
self.remote_evaluators)
self.remote_evaluators = drop_colocated(self.remote_evaluators)

for k in OPTIMIZER_SHARED_CONFIGS:
if k not in self.config["optimizer_config"]:
Expand Down
30 changes: 12 additions & 18 deletions python/ray/rllib/optimizers/apex_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,11 @@ def replay(self):
"batch_indexes": batch_indexes})
return batch

def update_priorities(self, batch, td_errors):
def update_priorities(self, batch_indexes, td_errors):
with self.update_priorities_timer:
new_priorities = (
np.abs(td_errors) + self.prioritized_replay_eps)
self.replay_buffer.update_priorities(
batch["batch_indexes"], new_priorities)
self.replay_buffer.update_priorities(batch_indexes, new_priorities)

def stats(self):
stat = {
Expand Down Expand Up @@ -121,8 +120,10 @@ def _init(
prioritized_replay=True, prioritized_replay_alpha=0.6,
prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6,
train_batch_size=512, sample_batch_size=50,
num_replay_buffer_shards=1, max_weight_sync_delay=400):
num_replay_buffer_shards=1, max_weight_sync_delay=400,
debug=False):

self.debug = debug
self.replay_starts = learning_starts
self.prioritized_replay_beta = prioritized_replay_beta
self.prioritized_replay_eps = prioritized_replay_eps
Expand All @@ -145,9 +146,6 @@ def _init(
self.timers = {k: TimerStat() for k in [
"put_weights", "get_samples", "enqueue", "sample_processing",
"replay_processing", "update_priorities", "train", "sample"]}
self.meters = {k: WindowStat(k, 10) for k in [
"samples_per_loop", "replays_per_loop", "reprios_per_loop",
"reweights_per_loop"]}
self.num_weight_syncs = 0
self.learning_started = False

Expand Down Expand Up @@ -212,8 +210,6 @@ def _step(self):

# Kick off another sample request
self.sample_tasks.add(ev, ev.sample.remote())
self.meters["samples_per_loop"].push(i)
self.meters["reweights_per_loop"].push(num_weight_syncs)

with self.timers["replay_processing"]:
i = 0
Expand All @@ -224,16 +220,14 @@ def _step(self):
samples = ray.get(replay)
with self.timers["enqueue"]:
self.learner.inqueue.put((ra, samples))
self.meters["replays_per_loop"].push(i)

with self.timers["update_priorities"]:
i = 0
while not self.learner.outqueue.empty():
i += 1
ra, replay, td_error = self.learner.outqueue.get()
ra.update_priorities.remote(replay, td_error)
ra.update_priorities.remote(replay["batch_indexes"], td_error)
train_timesteps += self.train_batch_size
self.meters["reprios_per_loop"].push(i)

return sample_timesteps, train_timesteps

Expand All @@ -248,18 +242,18 @@ def stats(self):
timing["learner_dequeue_time_ms"] = round(
1000 * self.learner.queue_timer.mean, 3)
stats = {
"replay_shard_0": replay_stats,
"timing_breakdown": timing,
"sample_throughput": round(
self.timers["sample"].mean_throughput, 3),
"train_throughput": round(self.timers["train"].mean_throughput, 3),
"num_weight_syncs": self.num_weight_syncs,
}
debug_stats = {
"replay_shard_0": replay_stats,
"timing_breakdown": timing,
"pending_sample_tasks": self.sample_tasks.count,
"pending_replay_tasks": self.replay_tasks.count,
"learner_queue": self.learner.learner_queue_size.stats(),
"samples": self.meters["samples_per_loop"].stats(),
"replays": self.meters["replays_per_loop"].stats(),
"reprios": self.meters["reprios_per_loop"].stats(),
"reweights": self.meters["reweights_per_loop"].stats(),
}
if self.debug:
stats.update(debug_stats)
return dict(Optimizer.stats(self), **stats)
7 changes: 7 additions & 0 deletions python/ray/rllib/utils/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ def count(self):
return len(self._tasks)


def drop_colocated(actors):
colocated, non_colocated = split_colocated(actors)
for a in colocated:
a.__ray_terminate__.remote(a._ray_actor_id.id())
return non_colocated


def split_colocated(actors):
localhost = os.uname()[1]
hosts = ray.get([a.get_host.remote() for a in actors])
Expand Down

0 comments on commit 75e8251

Please sign in to comment.