Skip to content

Commit

Permalink
[Datasets] Force all operator/stage name to be in PascalCase (#33332)
Browse files Browse the repository at this point in the history
This PR is to force all operator/stage name to be in PascalCase. Before this PR, we have a mix of PascalCase (e.g. `MapBatches`, `ReadRange`), and snake case (e.g. `random_shuffle`, `flat_map`). This would be confusing to users when looking at progress bar and `Dataset.stats()`.

This is split out from #33302 , for easier review.
  • Loading branch information
c21 committed Mar 16, 2023
1 parent 69c3390 commit f5a49ac
Show file tree
Hide file tree
Showing 16 changed files with 151 additions and 148 deletions.
14 changes: 7 additions & 7 deletions python/ray/air/tests/test_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def checker(shard, results):
assert results[0] != results[1], results
stats = shard.stats()
assert (
"Stage 1 ReadRange->randomize_block_order->"
"Stage 1 ReadRange->RandomizeBlockOrder->"
"BatchMapper: 1/1 blocks executed " in stats
), stats

Expand Down Expand Up @@ -325,7 +325,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "randomize_block_order->random_shuffle" in stats, stats
assert "RandomizeBlockOrder->RandomShuffle" in stats, stats

ds = ray.data.range_table(5)
test = TestStream(
Expand All @@ -339,7 +339,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "Stage 1 ReadRange->random_shuffle" in stats, stats
assert "Stage 1 ReadRange->RandomShuffle" in stats, stats

ds = ray.data.range_table(5)
test = TestBatch(
Expand All @@ -355,7 +355,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "randomize_block_order: 5/5 blocks executed in 0s" in stats, stats
assert "RandomizeBlockOrder: 5/5 blocks executed in 0s" in stats, stats

ds = ray.data.range_table(5)
test = TestStream(
Expand All @@ -366,7 +366,7 @@ def checker(shard, results):

def checker(shard, results):
stats = shard.stats()
assert "randomize_block_order" not in stats, stats
assert "RandomizeBlockOrder" not in stats, stats

ds = ray.data.range_table(5)
test = TestStream(
Expand All @@ -382,7 +382,7 @@ def checker(shard, results):
# beginning, not once per epoch.
assert results[0] == results[1], results
stats = shard.stats()
assert "randomize_block_order: 5/5 blocks executed" in stats, stats
assert "RandomizeBlockOrder: 5/5 blocks executed" in stats, stats

ds = ray.data.range_table(5)
test = TestBatch(
Expand All @@ -397,7 +397,7 @@ def checker(shard, results):
assert len(results[0]) == 5, results
assert results[0] != results[1], results
stats = shard.stats()
assert "randomize_block_order: 5/5 blocks executed in 0s" in stats, stats
assert "RandomizeBlockOrder: 5/5 blocks executed in 0s" in stats, stats

ds = ray.data.range_table(5)
test = TestStream(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/lazy_block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def stats(self) -> DatasetStats:
"""Create DatasetStats for this LazyBlockList."""
return DatasetStats(
# Make a copy of metadata, as the DatasetStats may mutate it in-place.
stages={"read": self.get_metadata(fetch_if_missing=False).copy()},
stages={"Read": self.get_metadata(fetch_if_missing=False).copy()},
parent=None,
needs_stats_actor=True,
stats_uuid=self._stats_uuid,
Expand Down
31 changes: 4 additions & 27 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)

import ray
from ray.data._internal.util import capitalize
from ray.types import ObjectRef
from ray.data._internal.arrow_ops.transform_pyarrow import unify_schemas
from ray.data._internal.block_list import BlockList
Expand Down Expand Up @@ -48,30 +49,6 @@
logger = DatasetLogger(__name__)


def capfirst(s: str):
"""Capitalize the first letter of a string
Args:
s: String to capitalize
Returns:
Capitalized string
"""
return s[0].upper() + s[1:]


def capitalize(s: str):
"""Capitalize a string, removing '_' and keeping camelcase.
Args:
s: String to capitalize
Returns:
Capitalized string with no underscores.
"""
return "".join(capfirst(x) for x in s.split("_"))


class Stage:
"""Represents a Dataset transform stage (e.g., map or shuffle)."""

Expand Down Expand Up @@ -182,7 +159,7 @@ def get_plan_as_string(self) -> str:
if self._stages_after_snapshot:
# Get string representation of each stage in reverse order.
for stage in self._stages_after_snapshot[::-1]:
# Get name of each stage in camel case.
# Get name of each stage in pascal case.
# The stage representation should be in "<stage-name>(...)" format,
# e.g. "MapBatches(my_udf)".
#
Expand Down Expand Up @@ -1213,7 +1190,7 @@ def block_fn(
if stages and isinstance(stages[0], RandomizeBlocksStage):
block_list, _ = stages[0].do_randomize(block_list)
stages = stages[1:]
name += "->randomize_block_order"
name += "->RandomizeBlockOrder"

stage = OneToOneStage(
name,
Expand Down Expand Up @@ -1248,7 +1225,7 @@ def _reorder_stages(stages: List[Stage]) -> List[Stage]:
reorder_buf.append(s)
else:
# Barrier: flush the reorder buffer.
if isinstance(s, AllToAllStage) or s.name == "write":
if isinstance(s, AllToAllStage) or s.name == "Write":
output.extend(reorder_buf)
reorder_buf = []
output.append(s)
Expand Down
12 changes: 6 additions & 6 deletions python/ray/data/_internal/stage_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def do_shuffle(
)

super().__init__(
"repartition", num_blocks, do_shuffle, supports_block_udf=True
"Repartition", num_blocks, do_shuffle, supports_block_udf=True
)

else:
Expand All @@ -75,7 +75,7 @@ def do_fast_repartition(block_list, clear_input_blocks: bool, *_):
blocks = block_list
return fast_repartition(blocks, num_blocks)

super().__init__("repartition", num_blocks, do_fast_repartition)
super().__init__("Repartition", num_blocks, do_fast_repartition)


class RandomizeBlocksStage(AllToAllStage):
Expand All @@ -84,7 +84,7 @@ class RandomizeBlocksStage(AllToAllStage):
def __init__(self, seed: Optional[int]):
self._seed = seed

super().__init__("randomize_block_order", None, self.do_randomize)
super().__init__("RandomizeBlockOrder", None, self.do_randomize)

def do_randomize(self, block_list, *_):
num_blocks = block_list.initial_num_blocks()
Expand Down Expand Up @@ -139,7 +139,7 @@ def do_shuffle(
)

super().__init__(
"random_shuffle",
"RandomShuffle",
output_num_blocks,
do_shuffle,
supports_block_udf=True,
Expand Down Expand Up @@ -241,7 +241,7 @@ def do_zip_all(block_list: BlockList, clear_input_blocks: bool, *_):
)
return blocks, {}

super().__init__("zip", None, do_zip_all)
super().__init__("Zip", None, do_zip_all)


def _calculate_blocks_rows_and_bytes(
Expand Down Expand Up @@ -313,4 +313,4 @@ def do_sort(block_list, clear_input_blocks: bool, *_):
_validate_key_fn(ds, key)
return sort_impl(blocks, clear_input_blocks, key, descending)

super().__init__("sort", None, do_sort)
super().__init__("Sort", None, do_sort)
14 changes: 8 additions & 6 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import ray
from ray.data._internal.block_list import BlockList
from ray.data._internal.util import capfirst
from ray.data.block import BlockMetadata
from ray.data.context import DatasetContext
from ray.util.annotations import DeveloperAPI
Expand Down Expand Up @@ -69,11 +70,12 @@ def __init__(
def build_multistage(self, stages: StatsDict) -> "DatasetStats":
stage_infos = {}
for i, (k, v) in enumerate(stages.items()):
capped_k = capfirst(k)
if len(stages) > 1:
if i == 0:
stage_infos[self.stage_name + "_" + k] = v
stage_infos[self.stage_name + capped_k] = v
else:
stage_infos[self.stage_name.split("->")[-1] + "_" + k] = v
stage_infos[self.stage_name.split("->")[-1] + capped_k] = v
else:
stage_infos[self.stage_name] = v
stats = DatasetStats(
Expand Down Expand Up @@ -262,13 +264,13 @@ def to_summary(self) -> "DatasetStatsSummary":
if DatasetContext.get_current().block_splitting_enabled:
# Only populate stats when stats from all read tasks are ready at
# stats actor.
if len(stats_map.items()) == len(self.stages["read"]):
self.stages["read"] = []
if len(stats_map.items()) == len(self.stages["Read"]):
self.stages["Read"] = []
for _, blocks_metadata in sorted(stats_map.items()):
self.stages["read"] += blocks_metadata
self.stages["Read"] += blocks_metadata
else:
for i, metadata in stats_map.items():
self.stages["read"][i] = metadata[0]
self.stages["Read"][i] = metadata[0]

stages_stats = []
is_substage = len(self.stages) > 1
Expand Down
24 changes: 24 additions & 0 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,27 @@ def _default_batch_format(
else "default"
)
return batch_format


def capfirst(s: str):
"""Capitalize the first letter of a string
Args:
s: String to capitalize
Returns:
Capitalized string
"""
return s[0].upper() + s[1:]


def capitalize(s: str):
"""Capitalize a string, removing '_' and keeping camelcase.
Args:
s: String to capitalize
Returns:
Capitalized string with no underscores.
"""
return "".join(capfirst(x) for x in s.split("_"))
14 changes: 7 additions & 7 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def map(

plan = self._plan.with_stage(
OneToOneStage(
"map",
"Map",
transform_fn,
compute,
ray_remote_args,
Expand Down Expand Up @@ -892,7 +892,7 @@ def flat_map(
transform_fn = generate_flat_map_fn()

plan = self._plan.with_stage(
OneToOneStage("flat_map", transform_fn, compute, ray_remote_args, fn=fn)
OneToOneStage("FlatMap", transform_fn, compute, ray_remote_args, fn=fn)
)

logical_plan = self._logical_plan
Expand Down Expand Up @@ -958,7 +958,7 @@ def filter(
transform_fn = generate_filter_fn()

plan = self._plan.with_stage(
OneToOneStage("filter", transform_fn, compute, ray_remote_args, fn=fn)
OneToOneStage("Filter", transform_fn, compute, ray_remote_args, fn=fn)
)

logical_plan = self._logical_plan
Expand Down Expand Up @@ -1436,7 +1436,7 @@ def split_at_indices(self, indices: List[int]) -> List["Dataset[T]"]:
parent_stats = self._plan.stats()
splits = []
for bs, ms in zip(blocks, metadata):
stats = DatasetStats(stages={"split": ms}, parent=parent_stats)
stats = DatasetStats(stages={"Split": ms}, parent=parent_stats)
stats.time_total_s = split_duration
splits.append(
Dataset(
Expand Down Expand Up @@ -1658,7 +1658,7 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]":
"be shown again.".format(set(epochs), max_epoch)
)
dataset_stats = DatasetStats(
stages={"union": []},
stages={"Union": []},
parent=[d._plan.stats() for d in datasets],
)
dataset_stats.time_total_s = time.perf_counter() - start_time
Expand Down Expand Up @@ -2191,7 +2191,7 @@ def limit(self, limit: int) -> "Dataset[T]":
for m in metadata
]
dataset_stats = DatasetStats(
stages={"limit": meta_for_stats},
stages={"Limit": meta_for_stats},
parent=self._plan.stats(),
)
dataset_stats.time_total_s = split_duration
Expand Down Expand Up @@ -2793,7 +2793,7 @@ def write_fn_wrapper(blocks: Iterator[Block], ctx, fn) -> Iterator[Block]:

plan = self._plan.with_stage(
OneToOneStage(
"write",
"Write",
write_fn_wrapper,
"tasks",
ray_remote_args,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/grouped_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def do_agg(blocks, clear_input_blocks: bool, *_):
clear_input_blocks,
)

plan = self._dataset._plan.with_stage(AllToAllStage("aggregate", None, do_agg))
plan = self._dataset._plan.with_stage(AllToAllStage("Aggregate", None, do_agg))

logical_plan = self._dataset._logical_plan
if logical_plan is not None:
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def from_items(items: List[Any], *, parallelism: int = -1) -> Dataset[Any]:
return Dataset(
ExecutionPlan(
BlockList(blocks, metadata, owned_by_consumer=False),
DatasetStats(stages={"from_items": metadata}, parent=None),
DatasetStats(stages={"FromItems": metadata}, parent=None),
run_by_consumer=False,
),
0,
Expand Down Expand Up @@ -1334,7 +1334,7 @@ def from_pandas_refs(
return Dataset(
ExecutionPlan(
BlockList(dfs, metadata, owned_by_consumer=False),
DatasetStats(stages={"from_pandas_refs": metadata}, parent=None),
DatasetStats(stages={"FromPandasRefs": metadata}, parent=None),
run_by_consumer=False,
),
0,
Expand All @@ -1349,7 +1349,7 @@ def from_pandas_refs(
return Dataset(
ExecutionPlan(
BlockList(blocks, metadata, owned_by_consumer=False),
DatasetStats(stages={"from_pandas_refs": metadata}, parent=None),
DatasetStats(stages={"FromPandasRefs": metadata}, parent=None),
run_by_consumer=False,
),
0,
Expand Down Expand Up @@ -1408,7 +1408,7 @@ def from_numpy_refs(
return Dataset(
ExecutionPlan(
BlockList(blocks, metadata, owned_by_consumer=False),
DatasetStats(stages={"from_numpy_refs": metadata}, parent=None),
DatasetStats(stages={"FromNumpyRefs": metadata}, parent=None),
run_by_consumer=False,
),
0,
Expand Down Expand Up @@ -1460,7 +1460,7 @@ def from_arrow_refs(
return Dataset(
ExecutionPlan(
BlockList(tables, metadata, owned_by_consumer=False),
DatasetStats(stages={"from_arrow_refs": metadata}, parent=None),
DatasetStats(stages={"FromArrowRefs": metadata}, parent=None),
run_by_consumer=False,
),
0,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_basic_stats(ray_start_10_cpus_shared):
expected = [[x * 4] for x in range(20)]
assert output == expected, (output, expected)
stats_str = executor.get_stats().to_summary().to_string()
assert "Stage 0 read:" in stats_str, stats_str
assert "Stage 0 Read:" in stats_str, stats_str
assert "Stage 1 Foo:" in stats_str, stats_str
assert "Stage 2 Bar:" in stats_str, stats_str
assert "Extra metrics:" in stats_str, stats_str
Expand Down
Loading

0 comments on commit f5a49ac

Please sign in to comment.