Skip to content

Commit

Permalink
[Datasets] Add sub progress bar support for AllToAllOperator (#33302)
Browse files Browse the repository at this point in the history
This PR is to add sub progress bar support for AllToAllOperator. Before this PR, we don't report AllToAllOperator progress correctly with streaming executor. The sub progress bar was used internally in `AllToAllOperator.bulk_fn`. This PR is to hook the sub progress bar up with overall progress bar reporting, so they are shown up in console properly. 

The change includes:

* `AllToAllStage.sub_stage_names`: to indicate name of each sub-stage / sub-progress bar (e.g. `ShuffleMap`)
*  `AllToAllOperator.sub_progress_bar_names`: same as `AllToAllStage.sub_stage_names`.
*  `AllToAllOperator.initialize_sub_progress_bars()/close_sub_progress_bars()`: called from `OpState` to control initializing and closing thse sub progress bars.
* `TaskContext.sub_progress_bar_iter`: the iterator of sub progress bar to be used in each `AllToAllOperator.bulk_fn`.

Examples:

1. random_shuffle() and repartition()

```py
>>> import ray 
>>> import time
>>> def sleep(x):
...     time.sleep(0.1)
...     return x
... 
>>> 
>>> for _ in (
...     ray.data.range_tensor(5000, shape=(80, 80, 3), parallelism=200)
...     .map_batches(sleep, num_cpus=2)
...     .map_batches(sleep, compute=ray.data.ActorPoolStrategy(2, 4))
...     .random_shuffle()
...     .map_batches(sleep, num_cpus=2)
...     .repartition(400)
...     .map_batches(sleep, num_cpus=2)
...     .iter_batches()
... ):
...     pass


2023-03-14 14:41:19,062	INFO worker.py:1550 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
2023-03-14 14:41:21,218	INFO streaming_executor.py:74 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[MapBatches(sleep)] -> ActorPoolMapOperator[MapBatches(sleep)] -> AllToAllOperator[RandomShuffle] -> TaskPoolMapOperator[MapBatches(sleep)] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[MapBatches(sleep)]
Resource usage vs limits: 0.0/10.0 CPU, 0.0/0.0 GPU, 14.5 MiB/512.0 MiB object_store_memory 0:   0%| | 0/1 [00:24<?, ?it/2023-03-14 14:41:45,936 qWARNING plan.py:572 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune
(raylet) Spilled 2922 MiB, 2208 objects, write throughput 1118 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.                                                                                                                         
Resource usage vs limits: 2.0/10.0 CPU, 0.0/0.0 GPU, 677.51 MiB/512.0 MiB object_store_memory 0:   0%| | 0/1 [00:31<?, ?it
ReadRange: 0 active, 0 queued 1: 100%|██████████████████████████████████████████████████| 200/200 [00:29<00:00, 16.23it/s]
MapBatches(sleep): 0 active, 0 queued 2: 100%|██████████████████████████████████████████| 200/200 [00:29<00:00, 16.59it/s]
MapBatches(sleep): 0 active, 0 queued, 0 actors [200 locality hits, 0 misses] 3: 100%|██| 200/200 [00:29<00:00, 17.25it/s]
RandomShuffle: 0 active, 0 queued 4: 100%|██████████████████████████████████████████████| 200/200 [00:26<00:00, 19.93s/it]
  *- ShuffleMap 5: 100%|████████████████████████████████████████████████████████████████| 200/200 [00:29<00:00, 80.30it/s]
  *- ShuffleReduce 6: 100%|█████████████████████████████████████████████████████████████| 200/200 [00:29<00:00, 10.66it/s]
MapBatches(sleep): 0 active, 0 queued 7: 100%|██████████████████████████████████████████| 200/200 [00:26<00:00, 42.88it/s]
Repartition: 0 active, 0 queued 8:   0%|                                                | 1/400 [00:28<3:12:04, 28.88s/it]
  *- Repartition 9:  94%|███████████████████████████████████████████████████████████▌   | 378/400 [00:27<00:00, 23.50it/s]
MapBatches(sleep): 5 active, 362 queued 10:   8%|███▎                                    | 33/400 [00:31<00:56,  6.55it/s]
output: 3 queued 11:   8%|█████                                                          | 32/400 [00:31<00:56,  6.57it/s]
```

2. repartition(shuffle=True)

```py
Resource usage vs limits: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory 0:   0%|                                   | 0/1 [00:00<?, ?it/s]
ReadCSV->Repartition: 0 active, 0 queued, 0 output 1:   0%|                                                                          | 0/10 [00:00<?, ?it/s]
  *- ShuffleMap 2:   0%|                                                                                                             | 0/10 [00:00<?, ?it/s]
  *- ShuffleReduce 3:   0%|                                                                                                          | 0/10 [00:00<?, ?it/s]
output: 0 queued 4:   0%|                                                                                                            | 0/10 [00:00<?, ?it/s]
```

3. sort()

```py
Resource usage vs limits: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory 0:   0%|                                   | 0/1 [00:00<?, ?it/s]
Sort: 0 active, 0 queued, 0 output 1:   0%|                                                                                     | 0/10 [00:00<?, ?it/s]
  *- SortSample 2: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:01<00:00,  6.72it/s]
  *- ShuffleMap 3:  20%|████████████████████▏                                                                                | 2/10 [00:18<01:00,  7.58s/it]
  *- ShuffleReduce 4:   0%|                                                                                                          | 0/10 [00:00<?, ?it/s]
output: 0 queued 5:   0%|                                                                                                            | 0/10 [00:00<?, ?it/s]
```

4. groupby().aggregate()

```py
Resource usage vs limits: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory 0:   0%|                                   | 0/1 [00:00<?, ?it/s]
Aggregate: 0 active, 0 queued, 0 output 1:   0%|                                                                                     | 0/10 [00:00<?, ?it/s]
  *- SortSample 2: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:01<00:00,  6.72it/s]
  *- ShuffleMap 3:  20%|████████████████████▏                                                                                | 2/10 [00:18<01:00,  7.58s/it]
  *- ShuffleReduce 4:   0%|                                                                                                          | 0/10 [00:00<?, ?it/s]
output: 0 queued 5:   0%|                                                                                                            | 0/10 [00:00<?, ?it/s]
```

5. groupby().map_groups()

```py
Resource usage vs limits: 4.0/10.0 CPU, 0.0/0.0 GPU, 792.74 MiB/512.0 MiB object_store_memory 0:   0%|                                | 0/1 [00:03<?, ?it/s]
Sort: 0 active, 0 queued, 0 output 1: 0%|                                                                                            | 0/10 [00:00<?, ?it/s]
  *- SortSample 2: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:00<00:00, 25.20it/s]
  *- ShuffleMap 3:   0%|                                                                                                             | 0/10 [00:00<?, ?it/s]
  *- ShuffleReduce 4:   0%|                                                                                                          | 0/10 [00:00<?, ?it/s]
MapBatches(group_fn): 4 active, 6 queued 5:   0%|                                                                                    | 0/10 [00:03<?, ?it/s]
output: 0 queued 6:   0%|                                                                                                            | 0/10 [00:00<?, ?it/s]
```
  • Loading branch information
c21 committed Mar 16, 2023
1 parent c93e8ae commit c0e1631
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 47 deletions.
6 changes: 6 additions & 0 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ray
from ray.data._internal.logical.interfaces import Operator
from ray.data._internal.memory_tracing import trace_deallocation
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import DatasetStats, StatsDict
from ray.data.block import Block, BlockMetadata
from ray.data.context import DatasetContext
Expand Down Expand Up @@ -216,6 +217,11 @@ class TaskContext:
# operator.
task_idx: int

# The dictionary of sub progress bar to update. The key is name of sub progress
# bar. Note this is only used on driver side.
# TODO(chengsu): clean it up from TaskContext with new optimizer framework.
sub_progress_bar_dict: Optional[Dict[str, ProgressBar]] = None


# Block transform function applied by task and actor pools in MapOperator.
MapTransformFn = Callable[[Iterable[Block], TaskContext], Iterable[Block]]
Expand Down
1 change: 1 addition & 0 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def bulk_fn(
input_op,
name=stage.name,
num_outputs=stage.num_blocks,
sub_progress_bar_names=stage.sub_stage_names,
)
else:
raise NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List, Optional
from ray.data._internal.progress_bar import ProgressBar

from ray.data._internal.stats import StatsDict
from ray.data._internal.execution.interfaces import (
Expand All @@ -20,6 +21,7 @@ def __init__(
bulk_fn: AllToAllTransformFn,
input_op: PhysicalOperator,
num_outputs: Optional[int] = None,
sub_progress_bar_names: Optional[List[str]] = None,
name: str = "AllToAll",
):
"""Create an AllToAllOperator.
Expand All @@ -30,11 +32,14 @@ def __init__(
and a stats dict.
input_op: Operator generating input data for this op.
num_outputs: The number of expected output bundles for progress bar.
sub_progress_bar_names: The names of internal sub progress bars.
name: The name of this operator.
"""
self._bulk_fn = bulk_fn
self._next_task_index = 0
self._num_outputs = num_outputs
self._sub_progress_bar_names = sub_progress_bar_names
self._sub_progress_bar_dict = None
self._input_buffer: List[RefBundle] = []
self._output_buffer: List[RefBundle] = []
self._stats: StatsDict = {}
Expand All @@ -53,7 +58,10 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
self._input_buffer.append(refs)

def inputs_done(self) -> None:
ctx = TaskContext(task_idx=self._next_task_index)
ctx = TaskContext(
task_idx=self._next_task_index,
sub_progress_bar_dict=self._sub_progress_bar_dict,
)
self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
self._next_task_index += 1
self._input_buffer.clear()
Expand All @@ -70,3 +78,27 @@ def get_stats(self) -> StatsDict:

def get_transformation_fn(self) -> AllToAllTransformFn:
return self._bulk_fn

def progress_str(self) -> str:
return f"{len(self._output_buffer)} output"

def initialize_sub_progress_bars(self, position: int) -> int:
"""Initialize all internal sub progress bars, and return the number of bars."""
if self._sub_progress_bar_names is not None:
self._sub_progress_bar_dict = {}
for name in self._sub_progress_bar_names:
bar = ProgressBar(name, self.num_outputs_total() or 1, position)
# NOTE: call `set_description` to trigger the initial print of progress
# bar on console.
bar.set_description(f" *- {name}")
self._sub_progress_bar_dict[name] = bar
position += 1
return len(self._sub_progress_bar_dict)
else:
return 0

def close_sub_progress_bars(self):
"""Close all internal sub progress bars."""
if self._sub_progress_bar_dict is not None:
for sub_bar in self._sub_progress_bar_dict.values():
sub_bar.close()
9 changes: 5 additions & 4 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ def execute(
self._global_info = ProgressBar("Resource usage vs limits", 1, 0)

# Setup the streaming DAG topology and start the runner thread.
self._topology = build_streaming_topology(dag, self._options)
self._topology, progress_bar_position = build_streaming_topology(
dag, self._options
)
self._output_info = ProgressBar(
"Output", dag.num_outputs_total() or 1, len(self._topology)
"Output", dag.num_outputs_total() or 1, progress_bar_position
)
_validate_topology(self._topology, self._get_or_refresh_resource_limits())

Expand Down Expand Up @@ -131,8 +133,7 @@ def shutdown(self):
self._global_info.close()
for op, state in self._topology.items():
op.shutdown()
if state.progress_bar:
state.progress_bar.close()
state.close_progress_bars()
if self._output_info:
self._output_info.close()

Expand Down
30 changes: 23 additions & 7 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from collections import deque
from dataclasses import dataclass
from typing import Dict, List, Optional, Deque, Union
from typing import Dict, List, Optional, Deque, Tuple, Union

import ray
from ray.data._internal.execution.interfaces import (
Expand All @@ -16,6 +16,7 @@
PhysicalOperator,
ExecutionOptions,
)
from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.progress_bar import ProgressBar

Expand Down Expand Up @@ -101,11 +102,26 @@ def __init__(self, op: PhysicalOperator, inqueues: List[Deque[MaybeRefBundle]]):
self.num_completed_tasks = 0
self.inputs_done_called = False

def initialize_progress_bar(self, index: int) -> None:
"""Create a progress bar at the given index (line offset in console)."""
def initialize_progress_bars(self, index: int) -> int:
"""Create progress bars at the given index (line offset in console).
For AllToAllOperator, zero or more sub progress bar would be created.
Return the number of progress bars created for this operator.
"""
self.progress_bar = ProgressBar(
self.op.name, self.op.num_outputs_total() or 1, index
)
num_bars = 1
if isinstance(self.op, AllToAllOperator):
num_bars += self.op.initialize_sub_progress_bars(index + 1)
return num_bars

def close_progress_bars(self):
"""Close all progress bars for this operator."""
if self.progress_bar:
self.progress_bar.close()
if isinstance(self.op, AllToAllOperator):
self.op.close_sub_progress_bars()

def num_queued(self) -> int:
"""Return the number of queued bundles across all inqueues."""
Expand Down Expand Up @@ -194,7 +210,7 @@ def outqueue_memory_usage(self) -> int:

def build_streaming_topology(
dag: PhysicalOperator, options: ExecutionOptions
) -> Topology:
) -> Tuple[Topology, int]:
"""Instantiate the streaming operator state topology for the given DAG.
This involves creating the operator state for each operator in the DAG,
Expand All @@ -207,6 +223,7 @@ def build_streaming_topology(
Returns:
The topology dict holding the streaming execution state.
The number of progress bars initialized so far.
"""

topology: Topology = {}
Expand Down Expand Up @@ -236,10 +253,9 @@ def setup_state(op: PhysicalOperator) -> OpState:
i = 1
for op_state in list(topology.values()):
if not isinstance(op_state.op, InputDataBuffer):
op_state.initialize_progress_bar(i)
i += 1
i += op_state.initialize_progress_bars(i)

return topology
return (topology, i)


def process_completed_tasks(topology: Topology) -> None:
Expand Down
19 changes: 16 additions & 3 deletions python/ray/data/_internal/fast_repartition.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Optional
import ray
from ray.data._internal.execution.interfaces import TaskContext

from ray.data.block import BlockAccessor
from ray.data._internal.block_list import BlockList
Expand All @@ -9,7 +11,7 @@
from ray.data._internal.stats import DatasetStats


def fast_repartition(blocks, num_blocks):
def fast_repartition(blocks, num_blocks, ctx: Optional[TaskContext] = None):
from ray.data.dataset import Dataset

wrapped_ds = Dataset(
Expand Down Expand Up @@ -39,7 +41,16 @@ def fast_repartition(blocks, num_blocks):

# Coalesce each split into a single block.
reduce_task = cached_remote_fn(_ShufflePartitionOp.reduce).options(num_returns=2)
reduce_bar = ProgressBar("Repartition", position=0, total=len(splits))

should_close_bar = True
if ctx is not None and ctx.sub_progress_bar_dict is not None:
bar_name = "Repartition"
assert bar_name in ctx.sub_progress_bar_dict, ctx.sub_progress_bar_dict
reduce_bar = ctx.sub_progress_bar_dict[bar_name]
should_close_bar = False
else:
reduce_bar = ProgressBar("Repartition", position=0, total=len(splits))

reduce_out = [
reduce_task.remote(False, None, *s.get_internal_block_refs())
for s in splits
Expand All @@ -54,7 +65,9 @@ def fast_repartition(blocks, num_blocks):
new_blocks, new_metadata = zip(*reduce_out)
new_blocks, new_metadata = list(new_blocks), list(new_metadata)
new_metadata = reduce_bar.fetch_until_complete(new_metadata)
reduce_bar.close()

if should_close_bar:
reduce_bar.close()

# Handle empty blocks.
if len(new_blocks) < num_blocks:
Expand Down
10 changes: 9 additions & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,12 +1048,14 @@ def __init__(
supports_block_udf: bool = False,
block_udf: Optional[BlockTransform] = None,
remote_args: Optional[Dict[str, Any]] = None,
sub_stage_names: Optional[List[str]] = None,
):
super().__init__(name, num_blocks)
self.fn = fn
self.supports_block_udf = supports_block_udf
self.block_udf = block_udf
self.ray_remote_args = remote_args or {}
self.sub_stage_names = sub_stage_names

def can_fuse(self, prev: Stage):
context = DatasetContext.get_current()
Expand Down Expand Up @@ -1100,7 +1102,13 @@ def block_udf(blocks: Iterable[Block], ctx: TaskContext) -> Iterable[Block]:
yield from self_block_udf(blocks, ctx)

return AllToAllStage(
name, self.num_blocks, self.fn, True, block_udf, prev.ray_remote_args
name,
self.num_blocks,
self.fn,
True,
block_udf,
prev.ray_remote_args,
self.sub_stage_names,
)

def __call__(
Expand Down
32 changes: 28 additions & 4 deletions python/ray/data/_internal/push_based_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import ray
from ray.data._internal.block_list import BlockList
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.shuffle import ShuffleOp
Expand Down Expand Up @@ -369,6 +370,7 @@ def execute(
map_ray_remote_args: Optional[Dict[str, Any]] = None,
reduce_ray_remote_args: Optional[Dict[str, Any]] = None,
merge_factor: int = 2,
ctx: Optional[TaskContext] = None,
) -> Tuple[BlockList, Dict[str, List[BlockMetadata]]]:
logger.info("Using experimental push-based shuffle.")
# TODO(swang): For jobs whose reduce work is heavier than the map work,
Expand Down Expand Up @@ -426,7 +428,18 @@ def merge(*args, **kwargs):
shuffle_map,
[output_num_blocks, stage.merge_schedule, *self._map_args],
)
map_bar = ProgressBar("Shuffle Map", position=0, total=len(input_blocks_list))

should_close_bar = True
if ctx is not None and ctx.sub_progress_bar_dict is not None:
bar_name = "ShuffleMap"
assert bar_name in ctx.sub_progress_bar_dict, ctx.sub_progress_bar_dict
map_bar = ctx.sub_progress_bar_dict[bar_name]
should_close_bar = False
else:
map_bar = ProgressBar(
"Shuffle Map", position=0, total=len(input_blocks_list)
)

map_stage_executor = _PipelinedStageExecutor(
map_stage_iter, stage.num_map_tasks_per_round, progress_bar=map_bar
)
Expand Down Expand Up @@ -460,11 +473,20 @@ def merge(*args, **kwargs):
merge_done = True
break

map_bar.close()
if should_close_bar:
map_bar.close()
all_merge_results = merge_stage_iter.pop_merge_results()

# Execute and wait for the reduce stage.
reduce_bar = ProgressBar("Shuffle Reduce", total=output_num_blocks)
should_close_bar = True
if ctx is not None and ctx.sub_progress_bar_dict is not None:
bar_name = "ShuffleReduce"
assert bar_name in ctx.sub_progress_bar_dict, ctx.sub_progress_bar_dict
reduce_bar = ctx.sub_progress_bar_dict[bar_name]
should_close_bar = False
else:
reduce_bar = ProgressBar("Shuffle Reduce", total=output_num_blocks)

shuffle_reduce = cached_remote_fn(self.reduce)
reduce_stage_iter = _ReduceStageIterator(
stage,
Expand Down Expand Up @@ -508,7 +530,9 @@ def merge(*args, **kwargs):
assert (
len(new_blocks) == output_num_blocks
), f"Expected {output_num_blocks} outputs, produced {len(new_blocks)}"
reduce_bar.close()

if should_close_bar:
reduce_bar.close()

stats = {
"map": map_stage_metadata,
Expand Down
Loading

0 comments on commit c0e1631

Please sign in to comment.