Skip to content

Commit

Permalink
[Data] Remove dead code from ray.data._internal.compute (ray-projec…
Browse files Browse the repository at this point in the history
…t#45564)

Signed-off-by: Balaji Veeramani <[email protected]>
  • Loading branch information
bveeramani committed May 28, 2024
1 parent c3e6eca commit 73f6fb2
Showing 1 changed file with 2 additions and 127 deletions.
129 changes: 2 additions & 127 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
import logging
import math
from typing import Any, Callable, Iterable, List, Optional, Tuple, TypeVar, Union
from typing import Any, Callable, Iterable, Optional, TypeVar, Union

from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.execution.interfaces import TaskContext
from ray.data.block import (
Block,
BlockAccessor,
BlockExecStats,
BlockMetadata,
BlockPartition,
UserDefinedFunction,
)
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.data.block import Block, UserDefinedFunction
from ray.util.annotations import DeveloperAPI, PublicAPI

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -161,117 +150,3 @@ def is_task_compute(compute_spec: Union[str, ComputeStrategy]) -> bool:
or compute_spec == "tasks"
or isinstance(compute_spec, TaskPoolStrategy)
)


def _map_block_split(
block_fn: BlockTransform,
input_files: List[str],
fn: Optional[UserDefinedFunction],
num_blocks: int,
*blocks_and_fn_args: Union[Block, Any],
**fn_kwargs,
) -> BlockPartition:
stats = BlockExecStats.builder()
blocks, fn_args = blocks_and_fn_args[:num_blocks], blocks_and_fn_args[num_blocks:]
if fn is not None:
fn_args = (fn,) + fn_args
new_metas = []
for new_block in block_fn(blocks, *fn_args, **fn_kwargs):
accessor = BlockAccessor.for_block(new_block)
new_meta = BlockMetadata(
num_rows=accessor.num_rows(),
size_bytes=accessor.size_bytes(),
schema=accessor.schema(),
input_files=input_files,
exec_stats=stats.build(),
)
yield new_block
new_metas.append(new_meta)
stats = BlockExecStats.builder()
yield new_metas


def _map_block_nosplit(
block_fn: BlockTransform,
input_files: List[str],
fn: Optional[UserDefinedFunction],
num_blocks: int,
*blocks_and_fn_args: Union[Block, Any],
**fn_kwargs,
) -> Tuple[Block, BlockMetadata]:
stats = BlockExecStats.builder()
builder = DelegatingBlockBuilder()
blocks, fn_args = blocks_and_fn_args[:num_blocks], blocks_and_fn_args[num_blocks:]
if fn is not None:
fn_args = (fn,) + fn_args
for new_block in block_fn(blocks, *fn_args, **fn_kwargs):
builder.add_block(new_block)
new_block = builder.build()
accessor = BlockAccessor.for_block(new_block)
return new_block, accessor.get_metadata(
input_files=input_files, exec_stats=stats.build()
)


def _bundle_blocks_up_to_size(
blocks: List[Tuple[ObjectRef[Block], BlockMetadata]],
target_size: int,
) -> List[Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]]]:
"""Group blocks into bundles that are up to (but not exceeding) the provided target
size.
"""
block_bundles: List[List[Tuple[ObjectRef[Block], BlockMetadata]]] = []
curr_bundle: List[Tuple[ObjectRef[Block], BlockMetadata]] = []
curr_bundle_size = 0
for b, m in blocks:
num_rows = m.num_rows
if num_rows is None:
num_rows = float("inf")
if curr_bundle_size > 0 and curr_bundle_size + num_rows > target_size:
block_bundles.append(curr_bundle)
curr_bundle = []
curr_bundle_size = 0
curr_bundle.append((b, m))
curr_bundle_size += num_rows
if curr_bundle:
block_bundles.append(curr_bundle)
if len(blocks) / len(block_bundles) >= 10:
logger.warning(
f"`batch_size` is set to {target_size}, which reduces parallelism from "
f"{len(blocks)} to {len(block_bundles)}. If the performance is worse than "
"expected, this may indicate that the batch size is too large or the "
"input block size is too small. To reduce batch size, consider decreasing "
"`batch_size` or use the default in `map_batches`. To increase input "
"block size, consider decreasing `parallelism` in read."
)
return [tuple(zip(*block_bundle)) for block_bundle in block_bundles]


def _check_batch_size(
blocks_and_meta: List[Tuple[ObjectRef[Block], BlockMetadata]],
batch_size: int,
name: str,
):
"""Log a warning if the provided batch size exceeds the configured target max block
size.
"""
batch_size_bytes = None
for _, meta in blocks_and_meta:
if meta.num_rows and meta.size_bytes:
batch_size_bytes = math.ceil(batch_size * (meta.size_bytes / meta.num_rows))
break
context = DataContext.get_current()
if (
batch_size_bytes is not None
and batch_size_bytes > context.target_max_block_size
):
logger.warning(
f"Requested batch size {batch_size} results in batches of "
f"{batch_size_bytes} bytes for {name} tasks, which is larger than the "
f"configured target max block size {context.target_max_block_size}. This "
"may result in out-of-memory errors for certain workloads, and you may "
"want to decrease your batch size or increase the configured target max "
"block size, e.g.: "
"from ray.data.context import DataContext; "
"DataContext.get_current().target_max_block_size = 4_000_000_000"
)

0 comments on commit 73f6fb2

Please sign in to comment.