Skip to content

Commit

Permalink
[data] add debugging info for SplitCoordinator (#45226)
Browse files Browse the repository at this point in the history
Add debugging info to debug
#45225.
The bug is hard to reproduce manually. Add debugging info, so that when
it happens, we have enough info to investigate the issue.
---------

Signed-off-by: Hao Chen <[email protected]>
  • Loading branch information
raulchen authored May 9, 2024
1 parent 866e03e commit adc18b8
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion python/ray/data/_internal/iterator/stream_split_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ def add_split_op(dag):

self._next_epoch = gen_epochs()
self._output_iterator = None
# Used for debugging https://github.com/ray-project/ray/issues/45225
self._debug_info = {}

def stats(self) -> DatasetStats:
"""Returns stats from the base dataset."""
Expand Down Expand Up @@ -249,9 +251,11 @@ def get(
def _barrier(self, split_idx: int) -> int:
"""Arrive and block until the start of the given epoch."""

self._debug_info[split_idx] = {}
# Decrement and await all clients to arrive here.
with self._lock:
starting_epoch = self._cur_epoch
self._debug_info[split_idx]["starting_epoch"] = starting_epoch
self._unfinished_clients_in_epoch -= 1

start_time = time.time()
Expand All @@ -271,11 +275,31 @@ def _barrier(self, split_idx: int) -> int:
time.sleep(0.1)

# Advance to the next epoch.
self._debug_info[split_idx]["entering_lock"] = (
self._cur_epoch,
self._output_iterator is None,
time.time(),
)
with self._lock:
self._debug_info[split_idx]["entered_lock"] = (
self._cur_epoch,
self._output_iterator is None,
time.time(),
)
if self._cur_epoch == starting_epoch:
self._cur_epoch += 1
self._unfinished_clients_in_epoch = self._n
self._output_iterator = next(self._next_epoch)
self._debug_info[split_idx]["set_iter"] = (
self._cur_epoch,
self._output_iterator is None,
time.time(),
)
self._debug_info[split_idx]["leaving_lock"] = (
self._cur_epoch,
self._output_iterator is None,
time.time(),
)

assert self._output_iterator is not None
assert self._output_iterator is not None, self._debug_info
return starting_epoch + 1

0 comments on commit adc18b8

Please sign in to comment.