diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index c8335f0e06517..b54e9e9c738a3 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -370,8 +370,6 @@ def __init__( meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(), partition_filter: PathPartitionFilter = None, partitioning: Partitioning = None, - # TODO(ekl) deprecate this once read fusion is available. - _block_udf: Optional[Callable[[Block], Block]] = None, ignore_missing_paths: bool = False, **reader_args, ): @@ -382,7 +380,6 @@ def __init__( self._meta_provider = meta_provider self._partition_filter = partition_filter self._partitioning = partitioning - self._block_udf = _block_udf self._ignore_missing_paths = ignore_missing_paths self._reader_args = reader_args paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem) @@ -429,7 +426,6 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]: open_stream_args = self._open_stream_args reader_args = self._reader_args partitioning = self._partitioning - _block_udf = self._block_udf paths, file_sizes = self._paths, self._file_sizes read_stream = self._delegate._read_stream @@ -448,7 +444,7 @@ def read_files( logger.get_logger().debug(f"Reading {len(read_paths)} files.") fs = _unwrap_s3_serialization_workaround(filesystem) output_buffer = BlockOutputBuffer( - block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size + block_udf=None, target_max_block_size=ctx.target_max_block_size ) for read_path in read_paths: compression = open_stream_args.pop("compression", None)