Skip to content

Commit

Permalink
Revert "[Data] Find partitions refactor (#38460)" (#38548)
Browse files Browse the repository at this point in the history
This reverts commit fb31492.

This is causing unit test failure:

#38460 (comment)
  • Loading branch information
c21 committed Aug 17, 2023
1 parent a29e528 commit 3575cf9
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 68 deletions.
39 changes: 37 additions & 2 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import bisect
import collections
import heapq
import random
Expand All @@ -24,7 +25,7 @@
is_valid_udf_return,
)
from ray.data._internal.table_block import TableBlockAccessor, TableBlockBuilder
from ray.data._internal.util import _truncated_repr, find_partitions
from ray.data._internal.util import _truncated_repr
from ray.data.block import (
Block,
BlockAccessor,
Expand Down Expand Up @@ -426,7 +427,41 @@ def sort_and_partition(
if len(boundaries) == 0:
return [table]

return find_partitions(table, boundaries, sort_key)
partitions = []
# For each boundary value, count the number of items that are less
# than it. Since the block is sorted, these counts partition the items
# such that boundaries[i] <= x < boundaries[i + 1] for each x in
# partition[i]. If `descending` is true, `boundaries` would also be
# in descending order and we only need to count the number of items
# *greater than* the boundary value instead.

def find_partition_index(records, boundary, sort_key):
if sort_key.get_descending():
return len(records) - bisect.bisect_left(records[::-1], boundary)
else:
return bisect.bisect_left(records, boundary)

def searchsorted(table, boundaries, sort_key):
records = [
tuple(d.values())
for d in transform_pyarrow.to_pylist(
table.select(sort_key.get_columns())
)
]
return [
find_partition_index(records, boundary, sort_key)
for boundary in boundaries
]

bounds = searchsorted(table, boundaries, sort_key)

partitions = []
last_idx = 0
for idx in bounds:
partitions.append(table.slice(last_idx, idx - last_idx))
last_idx = idx
partitions.append(table.slice(last_idx))
return partitions

def combine(self, key: str, aggs: Tuple["AggregateFn"]) -> Block:
"""Combine rows with the same key into an accumulator.
Expand Down
14 changes: 14 additions & 0 deletions python/ray/data/_internal/arrow_ops/transform_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,17 @@ def combine_chunks(table: "pyarrow.Table") -> "pyarrow.Table":
arr = col.combine_chunks()
new_cols.append(arr)
return pyarrow.Table.from_arrays(new_cols, schema=table.schema)


def to_pylist(table: "pyarrow.Table") -> "pyarrow.Table":
"""Convert the Table to a list of rows / dictionaries.
Required for compatibility with Arrow 6.
"""
pydict = table.to_pydict()
names = table.schema.names
pylist = [
{column: pydict[column][row] for column in names}
for row in range(table.num_rows)
]
return pylist
35 changes: 33 additions & 2 deletions python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import bisect
import collections
import heapq
from typing import (
Expand All @@ -17,7 +18,6 @@

from ray.air.constants import TENSOR_COLUMN_NAME
from ray.data._internal.table_block import TableBlockAccessor, TableBlockBuilder
from ray.data._internal.util import find_partitions
from ray.data.block import (
Block,
BlockAccessor,
Expand Down Expand Up @@ -357,7 +357,38 @@ def sort_and_partition(
if len(boundaries) == 0:
return [table]

return find_partitions(table, boundaries, sort_key)
partitions = []
# For each boundary value, count the number of items that are less
# than it. Since the block is sorted, these counts partition the items
# such that boundaries[i] <= x < boundaries[i + 1] for each x in
# partition[i]. If `descending` is true, `boundaries` would also be
# in descending order and we only need to count the number of items
# *greater than* the boundary value instead.

def find_partition_index(records, boundary, sort_key):
if sort_key.get_descending():
return len(records) - bisect.bisect_left(records[::-1], boundary)
else:
return bisect.bisect_left(records, boundary)

def searchsorted(table, boundaries, sort_key):
records = list(
table[sort_key.get_columns()].itertuples(index=False, name=None)
)

return [
find_partition_index(records, boundary, sort_key)
for boundary in boundaries
]

bounds = searchsorted(table, boundaries, sort_key)

last_idx = 0
for idx in bounds:
partitions.append(table[last_idx:idx])
last_idx = idx
partitions.append(table[last_idx:])
return partitions

def combine(self, key: str, aggs: Tuple["AggregateFn"]) -> "pandas.DataFrame":
"""Combine rows with the same key into an accumulator.
Expand Down
64 changes: 0 additions & 64 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import pyarrow

from ray.data._internal.compute import ComputeStrategy
from ray.data._internal.sort import SortKey
from ray.data.block import Block, BlockMetadata, UserDefinedFunction
from ray.data.datasource import Reader
from ray.util.placement_group import PlacementGroup
Expand Down Expand Up @@ -516,69 +515,6 @@ def unify_block_metadata_schema(
return None


def find_partition_index(
table: Union["pyarrow.Table", "pandas.DataFrame"],
desired: List[Any],
sort_key: "SortKey",
) -> int:
columns = sort_key.get_columns()
descending = sort_key.get_descending()

left, right = 0, len(table)
for i in range(len(desired)):
if left == right:
return right
col_name = columns[i]
col_vals = table[col_name].to_numpy()[left:right]
desired_val = desired[i]

prevleft = left
if descending is True:
left = prevleft + (
len(col_vals)
- np.searchsorted(
col_vals,
desired_val,
side="right",
sorter=np.arange(len(col_vals) - 1, -1, -1),
)
)
right = prevleft + (
len(col_vals)
- np.searchsorted(
col_vals,
desired_val,
side="left",
sorter=np.arange(len(col_vals) - 1, -1, -1),
)
)
else:
left = prevleft + np.searchsorted(col_vals, desired_val, side="left")
right = prevleft + np.searchsorted(col_vals, desired_val, side="right")
return right


def find_partitions(table, boundaries, sort_key):
partitions = []

# For each boundary value, count the number of items that are less
# than it. Since the block is sorted, these counts partition the items
# such that boundaries[i] <= x < boundaries[i + 1] for each x in
# partition[i]. If `descending` is true, `boundaries` would also be
# in descending order and we only need to count the number of items
# *greater than* the boundary value instead.
bounds = [
find_partition_index(table, boundary, sort_key) for boundary in boundaries
]

last_idx = 0
for idx in bounds:
partitions.append(table[last_idx:idx])
last_idx = idx
partitions.append(table[last_idx:])
return partitions


def get_attribute_from_class_name(class_name: str) -> Any:
"""Get Python attribute from the provided class name.
Expand Down

0 comments on commit 3575cf9

Please sign in to comment.