Skip to content

Commit

Permalink
[Data] Remove _block_udf from FileBasedDatasource reads (ray-proj…
Browse files Browse the repository at this point in the history
…ect#38111)

FileBasedDatasource temporarily exposed a _block_udf parameter to support read fusion. Now that read fusion is properly supported, we can remove the parameter.

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Victor <[email protected]>
  • Loading branch information
bveeramani authored and Victor committed Oct 11, 2023
1 parent 1dfd758 commit a5ff86a
Showing 1 changed file with 1 addition and 5 deletions.
6 changes: 1 addition & 5 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,6 @@ def __init__(
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
partition_filter: PathPartitionFilter = None,
partitioning: Partitioning = None,
# TODO(ekl) deprecate this once read fusion is available.
_block_udf: Optional[Callable[[Block], Block]] = None,
ignore_missing_paths: bool = False,
**reader_args,
):
Expand All @@ -382,7 +380,6 @@ def __init__(
self._meta_provider = meta_provider
self._partition_filter = partition_filter
self._partitioning = partitioning
self._block_udf = _block_udf
self._ignore_missing_paths = ignore_missing_paths
self._reader_args = reader_args
paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem)
Expand Down Expand Up @@ -429,7 +426,6 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
open_stream_args = self._open_stream_args
reader_args = self._reader_args
partitioning = self._partitioning
_block_udf = self._block_udf

paths, file_sizes = self._paths, self._file_sizes
read_stream = self._delegate._read_stream
Expand All @@ -448,7 +444,7 @@ def read_files(
logger.get_logger().debug(f"Reading {len(read_paths)} files.")
fs = _unwrap_s3_serialization_workaround(filesystem)
output_buffer = BlockOutputBuffer(
block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size
block_udf=None, target_max_block_size=ctx.target_max_block_size
)
for read_path in read_paths:
compression = open_stream_args.pop("compression", None)
Expand Down

0 comments on commit a5ff86a

Please sign in to comment.