Skip to content

Commit

Permalink
[Data] Add read_lance API to read Lance Dataset (#45106)
Browse files Browse the repository at this point in the history
This PR adds the capability to load a LanceDB dataset into a Ray Dataset.

Signed-off-by: Brent Bain <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Co-authored-by: Brent Bain <[email protected]>
  • Loading branch information
c21 and brent-anyscale committed May 2, 2024
1 parent a6569d4 commit 24bf8a1
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 0 deletions.
9 changes: 9 additions & 0 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ Databricks

read_databricks_tables

Lance
-----

.. autosummary::
:nosignatures:
:toctree: doc/

read_lance

Dask
----

Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,11 @@ py_test(
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_lance",
size = "small",
srcs = ["tests/test_lance.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
read_datasource,
read_images,
read_json,
read_lance,
read_mongo,
read_numpy,
read_parquet,
Expand Down Expand Up @@ -128,6 +129,7 @@
"read_datasource",
"read_images",
"read_json",
"read_lance",
"read_numpy",
"read_mongo",
"read_parquet",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from ray.data.datasource.image_datasource import ImageDatasource
from ray.data.datasource.json_datasink import _JSONDatasink
from ray.data.datasource.json_datasource import JSONDatasource
from ray.data.datasource.lance_datasource import LanceDatasource
from ray.data.datasource.mongo_datasink import _MongoDatasink
from ray.data.datasource.mongo_datasource import MongoDatasource
from ray.data.datasource.numpy_datasink import _NumpyDatasink
Expand Down Expand Up @@ -91,6 +92,7 @@
"ImageDatasource",
"_JSONDatasink",
"JSONDatasource",
"LanceDatasource",
"_NumpyDatasink",
"NumpyDatasource",
"ParquetBaseDatasource",
Expand Down
83 changes: 83 additions & 0 deletions python/ray/data/datasource/lance_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import logging
from typing import TYPE_CHECKING, Dict, Iterator, List, Optional

import numpy as np

from ray.data._internal.util import _check_import
from ray.data.block import BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
import pyarrow


logger = logging.getLogger(__name__)


@DeveloperAPI
class LanceDatasource(Datasource):
"""Lance datasource, for reading Lance dataset."""

def __init__(
self,
uri: str,
columns: Optional[List[str]] = None,
filter: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
):
_check_import(self, module="lance", package="pylance")

import lance

self.uri = uri
self.columns = columns
self.filter = filter
self.storage_options = storage_options

self.lance_ds = lance.dataset(uri=uri, storage_options=storage_options)
self.fragments = self.lance_ds.get_fragments()

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
read_tasks = []
for fragments in np.array_split(self.fragments, parallelism):
if len(fragments) <= 0:
continue

num_rows = sum([f.count_rows() for f in fragments])
input_files = [
data_file.path() for f in fragments for data_file in f.data_files()
]

# TODO(chengsu): Take column projection into consideration for schema.
metadata = BlockMetadata(
num_rows=num_rows,
schema=fragments[0].schema,
input_files=input_files,
size_bytes=None,
exec_stats=None,
)
columns = self.columns
row_filter = self.filter

read_task = ReadTask(
lambda f=fragments: _read_fragments(f, columns, row_filter),
metadata,
)
read_tasks.append(read_task)

return read_tasks

def estimate_inmemory_data_size(self) -> Optional[int]:
# TODO(chengsu): Add memory size estimation to improve auto-tune of parallelism.
return None


def _read_fragments(fragments, columns, row_filter) -> Iterator["pyarrow.Table"]:
"""Read Lance fragments in batches."""
import pyarrow

for fragment in fragments:
batches = fragment.to_batches(columns=columns, filter=row_filter)
for batch in batches:
yield pyarrow.Table.from_batches([batch])
62 changes: 62 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
Datasource,
ImageDatasource,
JSONDatasource,
LanceDatasource,
MongoDatasource,
NumpyDatasource,
ParquetBaseDatasource,
Expand Down Expand Up @@ -2900,6 +2901,67 @@ def from_torch(
)


@PublicAPI
def read_lance(
uri: str,
*,
columns: Optional[List[str]] = None,
filter: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
) -> Dataset:
"""
Create a :class:`~ray.data.Dataset` from a
`Lance Dataset <https://lancedb.github.io/lance/api/python/lance.html#lance.LanceDataset>`_.
Examples:
>>> import ray
>>> ds = ray.data.read_lance( # doctest: +SKIP
... uri="./db_name.lance",
... columns=["image", "label"],
... filter="label = 2 AND text IS NOT NULL",
... )
Args:
uri: The URI of the Lance dataset to read from. Local file paths, S3, and GCS
are supported.
columns: The columns to read. By default, all columns are read.
filter: Read returns only the rows matching the filter. By default, no
filter is applied.
storage_options: Extra options that make sense for a particular storage
connection. This is used to store connection parameters like credentials,
endpoint, etc. For more information, see `Object Store Configuration <https\
:https://lancedb.github.io/lance/read_and_write.html#object-store-configuration>`_.
ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on the available resources.
override_num_blocks: Override the number of output blocks from all read tasks.
By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.
Returns:
A :class:`~ray.data.Dataset` producing records read from the Lance dataset.
""" # noqa: E501
datasource = LanceDatasource(
uri=uri,
columns=columns,
filter=filter,
storage_options=storage_options,
)

return read_datasource(
datasource=datasource,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_locks=override_num_blocks,
)


def _get_datasource_or_legacy_reader(
ds: Datasource,
ctx: DataContext,
Expand Down
89 changes: 89 additions & 0 deletions python/ray/data/tests/test_lance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os

import lance
import pyarrow as pa
import pytest
from pkg_resources import parse_version
from pytest_lazyfixture import lazy_fixture

import ray
from ray._private.utils import _get_pyarrow_version
from ray.data.datasource.path_util import _unwrap_protocol


@pytest.mark.parametrize(
"fs,data_path",
[
(None, lazy_fixture("local_path")),
(lazy_fixture("local_fs"), lazy_fixture("local_path")),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path")),
(
lazy_fixture("s3_fs_with_space"),
lazy_fixture("s3_path_with_space"),
), # Path contains space.
(
lazy_fixture("s3_fs_with_anonymous_crendential"),
lazy_fixture("s3_path_with_anonymous_crendential"),
),
],
)
def test_lance_read_basic(fs, data_path):
# NOTE: Lance only works with PyArrow 12 or above.
pyarrow_version = _get_pyarrow_version()
if pyarrow_version is not None:
pyarrow_version = parse_version(pyarrow_version)
if pyarrow_version is not None and pyarrow_version < parse_version("12.0.0"):
return

df1 = pa.table({"one": [2, 1, 3, 4, 6, 5], "two": ["b", "a", "c", "e", "g", "f"]})
setup_data_path = _unwrap_protocol(data_path)
path = os.path.join(setup_data_path, "test.lance")
lance.write_dataset(df1, path)

ds_lance = lance.dataset(path)
df2 = pa.table(
{
"one": [1, 2, 3, 4, 5, 6],
"three": [4, 5, 8, 9, 12, 13],
"four": ["u", "v", "w", "x", "y", "z"],
}
)
ds_lance.merge(df2, "one")

ds = ray.data.read_lance(path)

# Test metadata-only ops.
assert ds.count() == 6
assert ds.schema() is not None

assert (
" ".join(str(ds).split())
== "Dataset( num_rows=6, schema={one: int64, two: string, three: int64, four: string} )" # noqa: E501
), ds
assert (
" ".join(repr(ds).split())
== "Dataset( num_rows=6, schema={one: int64, two: string, three: int64, four: string} )" # noqa: E501
), ds

# Test read.
values = [[s["one"], s["two"]] for s in ds.take_all()]
assert sorted(values) == [
[1, "a"],
[2, "b"],
[3, "c"],
[4, "e"],
[5, "f"],
[6, "g"],
]

# Test column projection.
ds = ray.data.read_lance(path, columns=["one"])
values = [s["one"] for s in ds.take()]
assert sorted(values) == [1, 2, 3, 4, 5, 6]
assert ds.schema().names == ["one", "two", "three", "four"]


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
1 change: 1 addition & 0 deletions python/requirements/ml/data-test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ google-cloud-bigquery-storage==2.24.0
google-api-core==1.34.0
webdataset
raydp==1.7.0b20231020.dev0
pylance

0 comments on commit 24bf8a1

Please sign in to comment.