diff --git a/doc/source/data/doc_code/loading_data.py b/doc/source/data/doc_code/loading_data.py deleted file mode 100644 index 5ddd111f0f6ae..0000000000000 --- a/doc/source/data/doc_code/loading_data.py +++ /dev/null @@ -1,459 +0,0 @@ -# flake8: noqa - -# fmt: off -# __creating_datasets_import_begin__ -import ray -# __creating_datasets_import_end__ -# fmt: on - -# For tfrecords -ray.init(runtime_env={"pip": ["tensorflow_metadata"]}) - -# fmt: off -# __gen_synth_tabular_range_begin__ -# Create a Dataset of integers. -ds = ray.data.range(10000) -# -> Dataset(num_blocks=200, num_rows=10000, schema={id: int64}) - -ds.take_batch(5) -# -> {'id': array([0, 1, 2, 3, 4])} -# __gen_synth_tabular_range_end__ -# fmt: on - -# fmt: off -# __from_items_begin__ -# Create a Dataset from python dicts. -ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)]) -# -> MaterializedDataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_items_end__ -# fmt: on - -# fmt: off -# __from_pandas_begin__ -import pandas as pd - -# Create a Dataset from a Pandas DataFrame. -df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) -ds = ray.data.from_pandas(df) -# -> MaterializedDataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: object}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_pandas_end__ -# fmt: on - -# fmt: off -# __from_pandas_mult_begin__ -import pandas as pd - -data = list(range(10000)) -num_chunks = 10 -chunk_size = len(data) // num_chunks -chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] -dfs = [ - pd.DataFrame({"col1": list(chunk), "col2": list(map(str, chunk))}) - for chunk in chunks -] -# Create a Dataset from multiple Pandas DataFrames. -ds = ray.data.from_pandas(dfs) -# -> MaterializedDataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_pandas_mult_end__ -# fmt: on - -# fmt: off -# __from_numpy_begin__ -import numpy as np - -# Create a Dataset from a 3D NumPy ndarray. -arr = np.ones((3, 4, 4)) -# The outer dimension is treated as the row dimension. -ds = ray.data.from_numpy(arr) -# -> MaterializedDataset( -# num_blocks=1, -# num_rows=3, -# schema={data: numpy.ndarray(shape=(4, 4), dtype=double)} -# ) - -ds.show(2) -# -> {'data': array([[1., 1., 1., 1.], -# [1., 1., 1., 1.], -# [1., 1., 1., 1.], -# [1., 1., 1., 1.]])} -# -> {'data': array([[1., 1., 1., 1.], -# [1., 1., 1., 1.], -# [1., 1., 1., 1.], -# [1., 1., 1., 1.]])} -# __from_numpy_end__ -# fmt: on - -# fmt: off -# __read_images_begin__ -ds = ray.data.read_images("example://image-datasets/simple") -# -> Dataset(num_blocks=3, num_rows=3, -# schema={image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8)}) - -ds.take(1) -# -> [{'image': array([[[ 88, 70, 68], ...]]), dtype=uint8)}] -# __read_images_end__ -# fmt: on - -# fmt: off -# __from_numpy_mult_begin__ -import numpy as np - -# Create a Dataset from multiple 3D NumPy ndarray. -arrs = [np.random.rand(2, 4, 4) for _ in range(4)] -# The outer dimension is treated as the row dimension. -ds = ray.data.from_numpy(arrs) -# -> MaterializedDataset( -# num_blocks=4, -# num_rows=8, -# schema={data: numpy.ndarray(shape=(4, 4), dtype=double)} -# ) - -ds.show(2) -# -> {'data': array([[0.06587483, 0.67808656, 0.76461924, 0.83428549], -# [0.04932103, 0.25112165, 0.26476714, 0.24599738], -# [0.67624391, 0.58689537, 0.12594709, 0.94663371], -# [0.32435665, 0.97719096, 0.03234169, 0.71563231]])} -# -> {'data': array([[0.98570318, 0.65956399, 0.82168898, 0.09798336], -# [0.22426704, 0.34209978, 0.02605247, 0.48200137], -# [0.17312096, 0.38789983, 0.42663678, 0.92652456], -# [0.80787394, 0.92437162, 0.11185822, 0.3319638 ]])} -# __from_numpy_mult_end__ -# fmt: on - -# fmt: off -# __from_arrow_begin__ -import pyarrow as pa - -# Create a Dataset from an Arrow Table. -t = pa.table({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) -ds = ray.data.from_arrow(t) -# -> MaterializedDataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: string}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_arrow_end__ -# fmt: on - -# fmt: off -# __from_arrow_mult_begin__ -import pyarrow as pa - -data = list(range(10000)) -num_chunks = 10 -chunk_size = len(data) // num_chunks -chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] -ts = [ - pa.table({"col1": list(chunk), "col2": list(map(str, chunk))}) - for chunk in chunks -] -# Create a Dataset from multiple Arrow Tables. -ds = ray.data.from_arrow(ts) -# -> MaterializedDataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_arrow_mult_end__ -# fmt: on - -# fmt: off -# __from_dask_begin__ -import pandas as pd -import dask.dataframe as dd - -df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) -ddf = dd.from_pandas(df, npartitions=4) -# Create a Dataset from a Dask DataFrame. -ds = ray.data.from_dask(ddf) -# -> MaterializedDataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_dask_end__ -# fmt: on - -# fmt: off -# __from_modin_begin__ -import modin.pandas as md - -df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) -mdf = md.DataFrame(df) -# Create a Dataset from a Modin DataFrame. -ds = ray.data.from_modin(mdf) -# -> MaterializedDataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_modin_end__ -# fmt: on - -# fmt: off -# __read_parquet_begin__ -# Create a Dataset by reading a Parquet file. -ds = ray.data.read_parquet("example://iris.parquet") -# -> Dataset( -# num_blocks=1, -# num_rows=150, -# schema={ -# sepal.length: double, -# sepal.width: double, -# petal.length: double, -# petal.width: double, -# variety: string, -# } -# ) - -ds.show(2) -# -> { -# 'sepal.length': 5.1, -# 'sepal.width': 3.5, -# 'petal.length': 1.4, -# 'petal.width': 0.2, -# 'variety': 'Setosa', -# } -# -> { -# 'sepal.length': 4.9, -# 'sepal.width': 3.0, -# 'petal.length': 1.4, -# 'petal.width': 0.2, -# 'variety': 'Setosa', -# } -# __read_parquet_end__ -# fmt: on - -# fmt: off -# __read_parquet_pushdown_begin__ -import pyarrow as pa - -# Create a Dataset by reading a Parquet file, pushing column selection and row -# filtering down to the file scan. -ds = ray.data.read_parquet( - "example://iris.parquet", - columns=["sepal.length", "variety"], - filter=pa.dataset.field("sepal.length") > 5.0, -).materialize() # Force a full read of the file. -# -> Dataset(num_blocks=1, num_rows=118, schema={sepal.length: double, variety: string}) - -ds.show(2) -# -> {'sepal.length': 5.1, 'variety': 'Setosa'} -# {'sepal.length': 5.4, 'variety': 'Setosa'} -# __read_parquet_pushdown_end__ -# fmt: on - -# fmt: off -# __read_csv_begin__ -# Create a Dataset by reading a CSV file. -ds = ray.data.read_csv("example://iris.csv") -# -> Dataset( -# num_blocks=1, -# num_rows=150, -# schema={ -# sepal.length: double, -# sepal.width: double, -# petal.length: double, -# petal.width: double, -# variety: string, -# } -# ) - -ds.show(2) -# -> { -# 'sepal.length': 5.1, -# 'sepal.width': 3.5, -# 'petal.length': 1.4, -# 'petal.width': 0.2, -# 'variety': 'Setosa', -# } -# -> { -# 'sepal.length': 4.9, -# 'sepal.width': 3.0, -# 'petal.length': 1.4, -# 'petal.width': 0.2, -# 'variety': 'Setosa', -# } -# __read_csv_end__ -# fmt: on - -# fmt: off -# __read_json_begin__ -# Create a Dataset by reading a JSON file. -ds = ray.data.read_json("example://iris.json") -# -> Dataset( -# num_blocks=1, -# num_rows=150, -# schema={ -# sepal.length: double, -# sepal.width: double, -# petal.length: double, -# petal.width: double, -# variety: string, -# } -# ) - -ds.show(2) -# -> { -# 'sepal.length': 5.1, -# 'sepal.width': 3.5, -# 'petal.length': 1.4, -# 'petal.width': 0.2, -# 'variety': 'Setosa', -# } -# -> { -# 'sepal.length': 4.9, -# 'sepal.width': 3.0, -# 'petal.length': 1.4, -# 'petal.width': 0.2, -# 'variety': 'Setosa', -# } -# __read_json_end__ -# fmt: on - -# fmt: off -# __read_numpy_begin__ -# Create a Dataset by reading a NumPy file. -ds = ray.data.read_numpy("example://mnist_subset.npy") -# -> Dataset( -# num_blocks=1, -# num_rows=3, -# schema={data: numpy.ndarray(shape=(28, 28), dtype=uint8)} -# ) - -ds.show(2) -# -> {'data': array([[0, ...]], dtype=uint8)} -# {'data': array([[0, ...]], dtype=uint8)} -# __read_numpy_end__ -# fmt: on - -# fmt: off -# __read_text_begin__ -# Create a Dataset by reading a text file. -ds = ray.data.read_text("example://sms_spam_collection_subset.txt") -# -> Dataset(num_blocks=1, num_rows=10, schema={text: string}) - -ds.show(2) -# -> {'text': 'ham\tGo until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...'} -# {'text': 'ham\tOk lar... Joking wif u oni...'} -# __read_text_end__ -# fmt: on - -# fmt: off -# __read_binary_begin__ -from io import BytesIO -import PIL.Image - -# Create a Dataset by reading a binary file. -ds = ray.data.read_binary_files("example://mnist_subset_partitioned/0/1.png") -# -> Dataset(num_blocks=1, num_rows=1, schema={bytes: string}) - -ds = ds.map(lambda row: {"image": np.asarray(PIL.Image.open(BytesIO(row["bytes"])).convert("L"))}) -# -> Dataset( -# num_blocks=1, -# num_rows=1, -# schema={image: numpy.ndarray(shape=(28, 28), dtype=uint8)} -# ) - -ds.take(1) -# -> [{'image': array([[[ 88, 70, 68], ...]]), dtype=uint8)}] -# __read_binary_end__ -# fmt: on - -# fmt: off -# __read_parquet_s3_begin__ -# Create a Dataset by reading a Parquet file from S3. -ds = ray.data.read_parquet("s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/01/data.parquet") -# -> Dataset( -# num_blocks=1, -# num_rows=7667792, -# schema={ -# vendor_id: string, -# pickup_at: timestamp[us], -# dropoff_at: timestamp[us], -# passenger_count: int8, -# trip_distance: float, -# rate_code_id: string, -# store_and_fwd_flag: string, -# ..., -# }, -# ) - -ds.show(2) -# -> { -# 'vendor_id': '1', -# 'pickup_at': datetime.datetime(2019, 1, 1, 0, 46, 40), -# 'dropoff_at': datetime.datetime(2019, 1, 1, 0, 53, 20), -# 'passenger_count': 1, -# 'trip_distance': 1.5, -# 'rate_code_id': '1', -# 'store_and_fwd_flag': 'N', -# ..., -# } -# { -# 'vendor_id': '1', -# 'pickup_at': datetime.datetime(2019, 1, 1, 0, 59, 47) -# 'dropoff_at': datetime.datetime(2019, 1, 1, 1, 18, 59), -# 'passenger_count': 1, -# 'trip_distance': 2.5999999046325684, -# 'rate_code_id': '1', -# 'store_and_fwd_flag': 'N', -# ..., -# } -# __read_parquet_s3_end__ -# fmt: on - -# fmt: off -# __read_compressed_begin__ -# Read a gzip-compressed CSV file from S3. -ds = ray.data.read_csv( - "s3://anonymous@air-example-data/gzip_compressed.csv", - arrow_open_stream_args={"compression": "gzip"}, -) -# __read_compressed_end__ -# fmt: on - -# fmt: off -# __read_tfrecords_begin__ -# Create a Dataset by reading a TFRecord file. -ds = ray.data.read_tfrecords("example://iris.tfrecords") -# Dataset( -# num_blocks=1, -# num_rows=150, -# schema={ -# sepal.length: float64, -# sepal.width: float64, -# petal.length: float64, -# petal.width: float64, -# label: object, -# }, -# ) -ds.show(1) -# { -# 'sepal.length': 5.099999904632568, -# 'sepal.width': 3.5, -# 'petal.length': 1.399999976158142, -# 'petal.width': 0.20000000298023224, -# 'label': b'Setosa', -# } -# __read_tfrecords_end__ -# fmt: on diff --git a/doc/source/data/doc_code/loading_data_untested.py b/doc/source/data/doc_code/loading_data_untested.py deleted file mode 100644 index 83d9e9f6685f5..0000000000000 --- a/doc/source/data/doc_code/loading_data_untested.py +++ /dev/null @@ -1,135 +0,0 @@ -# Not tested in CI currently! -# flake8: noqa - -import ray - -# fmt: off -# __from_spark_begin__ -import raydp - -spark = raydp.init_spark(app_name="Spark -> Datasets Example", - num_executors=2, - executor_cores=2, - executor_memory="500MB") -df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"]) -# Create a tabular Dataset from a Spark DataFrame. -ds = ray.data.from_spark(df) -# -> MaterializedDataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_spark_end__ -# fmt: on - -# fmt: off -# __read_parquet_s3_with_fs_begin__ -import pyarrow as pa - -# Create a tabular Dataset by reading a Parquet file from a private S3 bucket. -# NOTE: This example is not runnable as-is; add in a path to your private bucket and the -# required S3 credentials! -ds = ray.data.read_parquet( - "s3://some/private/bucket", - filesystem=pa.fs.S3FileSystem( - region="us-west-2", - access_key="XXXX", - secret_key="XXXX", - ), -) -# __read_parquet_s3_with_fs_end__ -# fmt: on - -# fmt: off -# __read_parquet_hdfs_begin__ -# Create a tabular Dataset by reading a Parquet file from HDFS using HDFS connection -# automatically constructed based on the URI. -# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS -# cluster/data. -ds = ray.data.read_parquet("hdfs:///path/to/file.parquet") -# __read_parquet_hdfs_end__ -# fmt: on - -# TODO(Clark): Find clean way to start local HDFS cluster in the below example (that -# works in CI). - -# fmt: off -# __read_parquet_hdfs_with_fs_begin__ -import pyarrow as pa - -# Create a tabular Dataset by reading a Parquet file from HDFS, manually specifying a -# configured HDFS connection via a Pyarrow HDFSFileSystem instance. -# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS -# cluster/data. -ds = ray.data.read_parquet( - "hdfs://path/to/file.parquet", - filesystem=pa.fs.HDFSFileSystem(host="localhost", port=9000, user="bob"), -) -# __read_parquet_hdfs_with_fs_end__ -# fmt: on - -# TODO(Clark): Find open data for below GCS example. - -# fmt: off -# __read_parquet_gcs_begin__ -import gcsfs - -# Create a tabular Dataset by reading a Parquet file from GCS, passing the configured -# GCSFileSystem. -# NOTE: This example is not runnable as-is; you need to point it at your GCS bucket -# and configure your GCP project and credentials. -path = "gs://path/to/file.parquet" -filesystem = gcsfs.GCSFileSystem(project="my-google-project") -ds = ray.data.read_parquet(path, filesystem=filesystem) -# __read_parquet_gcs_end__ -# fmt: on - - -# fmt: off -# __validate_parquet_gcs_begin__ -print(filesystem.ls(path)) -# ['path/to/file.parquet'] -print(filesystem.open(path)) -# -# __validate_parquet_gcs_end__ -# fmt: on - -# fmt: off -# __read_parquet_az_begin__ -import adlfs - -# Create a tabular Dataset by reading a Parquet file from Azure Blob Storage, passing -# the configured AzureBlobFileSystem. -path = ( - "az://nyctlc/yellow/puYear=2009/puMonth=1/" - "part-00019-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426333-4" - ".c000.snappy.parquet" -) -ds = ray.data.read_parquet( - path, - filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage") -) -# __read_parquet_az_end__ -# fmt: on - -# fmt: off -# __from_mars_begin__ -import mars -import mars.dataframe as md -import pandas as pd - -cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1) - -df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) -mdf = md.DataFrame(df, num_partitions=8) -# Create a tabular Dataset from a Mars DataFrame. -ds = ray.data.from_mars(mdf) -# -> MaterializedDataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object}) - -ds.show(3) -# -> {'col1': 0, 'col2': '0'} -# -> {'col1': 1, 'col2': '1'} -# -> {'col1': 2, 'col2': '2'} -# __from_mars_end__ -# fmt: on diff --git a/doc/source/data/loading-data.rst b/doc/source/data/loading-data.rst index 42e2473185d44..4e8f1a5e6fa16 100644 --- a/doc/source/data/loading-data.rst +++ b/doc/source/data/loading-data.rst @@ -1,608 +1,640 @@ .. _loading_data: -==================== +============ Loading Data -==================== -:class:`Datasets ` can be created from: +============ -* generated synthetic data, -* local and distributed in-memory data, and -* local and external storage systems (local disk, cloud storage, HDFS, etc.).. +Ray Data loads data from various sources. This guide shows you how to: -For an exhaustive list of supported sources, read the :ref:`Input/Output API reference `. +* `Read files <#reading-files>`_ like images +* `Load in-memory data <#loading-data-from-other-libraries>`_ like pandas DataFrames +* `Read databases <#reading-databases>`_ like MySQL -.. _dataset_generate_data: +Reading files +============= -------------------------- -Generating Synthetic Data -------------------------- +Ray Data reads files from local disk or cloud storage in a variety of file formats. +To view the full list of supported file formats, see the +:ref:`Input/Output reference `. .. tab-set:: - .. tab-item:: Int Range + .. tab-item:: Parquet - Create a ``Dataset`` from a range of integers, with a single column containing this integer range. + To read Parquet files, call :func:`~ray.data.read_parquet`. - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __gen_synth_tabular_range_begin__ - :end-before: __gen_synth_tabular_range_end__ + .. testcode:: + :skipif: True - .. tab-item:: Tensor Range + import ray + + ds = ray.data.read_parquet("local:///tmp/iris.parquet") + + print(ds.schema()) - Create a dataset from a range of integers, packing this integer range into - ndarrays of the provided shape. + .. testoutput:: - .. - FIXME: The following code snippets is failing. See - https://buildkite.com/ray-project/oss-ci-build-pr/builds/24240#0188797d-4416-4a34-ada6-2917d1fa9b19 + Column Type + ------ ---- + sepal.length double + sepal.width double + petal.length double + petal.width double + variety string - .. code-block:: python + .. tab-item:: Images - >>> import ray - >>> ds = ray.data.range_tensor(1, shape=(64, 64)) - >>> ds.schema() - Column Type - ------ ---- - data numpy.ndarray(shape=(64, 64), dtype=int64) - >>> ds.show(1) - {'data': array([[0, 0, 0, ..., 0, 0, 0], - [0, 0, 0, ..., 0, 0, 0], - [0, 0, 0, ..., 0, 0, 0], - ..., - [0, 0, 0, ..., 0, 0, 0], - [0, 0, 0, ..., 0, 0, 0], - [0, 0, 0, ..., 0, 0, 0]])} + To read raw images, call :func:`~ray.data.read_images`. Ray Data represents + images as NumPy ndarrays. -.. _dataset_reading_from_storage: + .. testcode:: + :skipif: True --------------------------- -Reading Files From Storage --------------------------- + import ray -Using the ``ray.data.read_*()`` APIs, data can be loaded from files on local disk -or remote storage system such as S3, GCS, Azure Blob Storage, or HDFS. Any filesystem -`supported by pyarrow `__ -can be used to specify file locations, and many common file formats are supported: -Parquet, CSV, JSON, NPY, text, binary. + ds = ray.data.read_images("local:///tmp/batoidea/JPEGImages/") -Each of these APIs take a path or list of paths to files or directories. Any directories -provided will be walked in order to obtain concrete file paths, at which point all files -will be read in parallel. + print(ds.schema()) -.. _dataset_supported_file_formats: + .. testoutput:: + :skipif: True -Common File Formats -=================== + Column Type + ------ ---- + image numpy.ndarray(shape=(32, 32, 3), dtype=uint8) -.. tab-set:: + .. tab-item:: Text - .. tab-item:: Parquet + To read lines of text, call :func:`~ray.data.read_text`. - Read Parquet files and directories. Partitioned parquet read support is also available. + .. testcode:: + :skipif: True - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_parquet_begin__ - :end-before: __read_parquet_end__ + import ray - The Parquet reader also supports projection and filter pushdown, allowing column - selection and row filtering to be pushed down to the file scan. + ds = ray.data.read_text("local:///tmp/this.txt") - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_parquet_pushdown_begin__ - :end-before: __read_parquet_pushdown_end__ + print(ds.schema()) - See the API docs for :func:`read_parquet() `. + .. testoutput:: + + Column Type + ------ ---- + text string .. tab-item:: CSV - Read CSV files and directories. + To read CSV files, call :func:`~ray.data.read_csv`. - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_csv_begin__ - :end-before: __read_csv_end__ + .. testcode:: + :skipif: True - See the API docs for :func:`read_csv() `. + import ray - .. tab-item:: JSON + ds = ray.data.read_csv("local:///tmp/iris.csv") - Read JSON files and directories. + print(ds.schema()) - Currently, only newline-delimited JSON (NDJSON) is supported. + .. testoutput:: - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_json_begin__ - :end-before: __read_json_end__ + Column Type + ------ ---- + sepal length (cm) double + sepal width (cm) double + petal length (cm) double + petal width (cm) double + target int64 - See the API docs for :func:`read_json() `. + .. tab-item:: Binary - .. tab-item:: NumPy + To read raw binary files, call :func:`~ray.data.read_binary_files`. - Read NumPy files and directories. + .. testcode:: + :skipif: True - This function represents NumPy data as ndarrays. To learn more, read - :ref:`Working with tensor data `. + import ray - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_numpy_begin__ - :end-before: __read_numpy_end__ + ds = ray.data.read_binary_files("local:///tmp/file.dat") - See the API docs for :func:`read_numpy() `. + print(ds.schema()) - .. tab-item:: Text + .. testoutput:: - Read text files and directories. Each line in each text file will be treated as a row in the dataset. + Column Type + ------ ---- + bytes binary - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_text_begin__ - :end-before: __read_text_end__ + .. tab-item:: TFRecords - See the API docs for :func:`read_text() `. + To read TFRecords files, call :func:`~ray.data.read_tfrecords`. - .. tab-item:: Images + .. testcode:: + :skipif: True - Call :func:`~ray.data.read_images` to read images. + import ray - This function represents images as ndarrays. To learn more, read - :ref:`Working with tensor data `. + ds = ray.data.read_tfrecords("local:///tmp/iris.tfrecords") - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_images_begin__ - :end-before: __read_images_end__ + print(ds.schema()) - .. tab-item:: Binary + .. testoutput:: - Read binary files and directories. Each binary file will be converted to a record - containing opaque bytes. These bytes can be decoded into tensor, tabular, text, or any other - kind of data using :meth:`~ray.data.Dataset.map_batches` to apply a per-row decoding - :ref:`user-defined function `. + Column Type + ------ ---- + sepal length (cm) double + sepal width (cm) double + petal length (cm) double + petal width (cm) double + target int64 - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_binary_begin__ - :end-before: __read_binary_end__ +Reading files from local disk +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - See the API docs for :func:`read_binary_files() `. +To read files from local disk, call a function like :func:`~ray.data.read_parquet` and +specify paths with the ``local://`` schema. Paths can point to files or directories. - .. tab-item:: TFRecords +To read formats other than Parquet, see the :ref:`Input/Output reference `. - Call :func:`~ray.data.read_tfrecords` to read TFRecord files into a - :class:`~ray.data.Dataset`. +.. tip:: - .. warning:: - Only `tf.train.Example `_ - records are supported. + If your files are accessible on every node, exclude ``local://`` to parallelize the + read tasks across the cluster. - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_tfrecords_begin__ - :end-before: __read_tfrecords_end__ +.. testcode:: + :skipif: True -.. _dataset_reading_remote_storage: + import ray + + ds = ray.data.read_parquet("local:///tmp/iris.parquet") + + print(ds.schema()) +.. testoutput:: -Reading from Remote Storage -=========================== + Column Type + ------ ---- + sepal.length double + sepal.width double + petal.length double + petal.width double + variety string -All of the file formats mentioned above can be read from remote storage, such as S3, -GCS, Azure Blob Storage, and HDFS. These storage systems are supported via Arrow's -filesystem APIs natively for S3 and HDFS, and as a wrapper around fsspec for GCS and -HDFS. All ``ray.data.read_*()`` APIs expose a ``filesystem`` argument that accepts both -`Arrow FileSystem `__ instances -and `fsspec FileSystem `__ instances, -allowing you to configure this connection to the remote storage system, such as -authn/authz and buffer/block size. +Reading files from cloud storage +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -For S3 and HDFS, the underlying `FileSystem -`__ -implementation will be inferred from the URL scheme (``"s3://"`` and ``"hdfs://"``); if -the default connection configuration suffices for your workload, you won't need to -specify a ``filesystem`` argument. +To read files in cloud storage, authenticate all nodes with your cloud service provider. +Then, call a method like :func:`~ray.data.read_parquet` and specify URIs with the +appropriate schema. URIs can point to buckets, folders, or objects. -We use Parquet files for the below examples, but all of the aforementioned file formats -are supported for each of these storage systems. +To read formats other than Parquet, see the :ref:`Input/Output reference `. .. tab-set:: .. tab-item:: S3 - The AWS S3 storage system is inferred from the URI scheme (``s3://``), with required connection - configuration such as S3 credentials being pulled from the machine's environment - (e.g. the ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environment variables). + To read files from Amazon S3, specify URIs with the ``s3://`` scheme. + + .. testcode:: + + import ray - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_parquet_s3_begin__ - :end-before: __read_parquet_s3_end__ + ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet") - If needing to customize this S3 storage system connection (credentials, region, - endpoint override, etc.), you can pass in an - `S3FileSystem `__ instance - to :func:`read_parquet() `. + print(ds.schema()) - .. literalinclude:: ./doc_code/loading_data_untested.py - :language: python - :start-after: __read_parquet_s3_with_fs_begin__ - :end-before: __read_parquet_s3_with_fs_end__ + .. testoutput:: - .. tab-item:: HDFS + Column Type + ------ ---- + sepal.length double + sepal.width double + petal.length double + petal.width double + variety string - The HDFS storage system is inferred from the URI scheme (``hdfs://``), with required connection - configuration such as the host and the port being derived from the URI. + .. tab-item:: GCS - .. note:: + To read files from Google Cloud Storage, install the + `Filesystem interface to Google Cloud Storage `_ - This example is not runnable as-is; you'll need to point it at your HDFS - cluster/data. + .. code-block:: console - .. literalinclude:: ./doc_code/loading_data_untested.py - :language: python - :start-after: __read_parquet_hdfs_begin__ - :end-before: __read_parquet_hdfs_end__ + pip install gcsfs - If needing to customize this HDFS storage system connection (host, port, user, kerb - ticket, etc.), you can pass in an `HDFSFileSystem - `__ - instance to :func:`read_parquet() `. + Then, create a ``GCSFileSystem`` and specify URIs with the ``gcs://`` scheme. - .. literalinclude:: ./doc_code/loading_data_untested.py - :language: python - :start-after: __read_parquet_hdfs_with_fs_begin__ - :end-before: __read_parquet_hdfs_with_fs_end__ + .. testcode:: + :skipif: True - .. tab-item:: GCS + import ray - Data can be read from Google Cloud Storage by providing a configured - `gcsfs GCSFileSystem `__, where the - appropriate Google Cloud project and credentials can be specified. + ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet") - .. note:: - This example is not runnable as-is; you'll need to point it at your GCS bucket and - configure your GCP project and credentials. + print(ds.schema()) - .. literalinclude:: ./doc_code/loading_data_untested.py - :language: python - :start-after: __read_parquet_gcs_begin__ - :end-before: __read_parquet_gcs_end__ + .. testoutput:: - .. tip:: - To verify that your GCP project and credentials are set up, validate - that the GCS `filesystem` has permissions to read the input `path`. + Column Type + ------ ---- + sepal.length double + sepal.width double + petal.length double + petal.width double + variety string - .. literalinclude:: ./doc_code/loading_data_untested.py - :language: python - :start-after: __validate_parquet_gcs_begin__ - :end-before: __validate_parquet_gcs_end__ + .. tab-item:: ABL - For more examples, see the `GCSFS Documentation `__. + To read files from Azure Blob Storage, install the + `Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage `_ - .. tab-item:: ADL/ABS (Azure) + .. code-block:: console - Data can be read from Azure Blob Storage by providing a configured - `adlfs AzureBlobFileSystem `__, where the appropriate - account name and account key can be specified. + pip install adlfs - .. literalinclude:: ./doc_code/loading_data_untested.py - :language: python - :start-after: __read_parquet_az_begin__ - :end-before: __read_parquet_az_end__ + Then, create a ``AzureBlobFileSystem`` and specify URIs with the `az://` scheme. -Reading from Local Storage -========================== + .. testcode:: + :skipif: True + + import adlfs + import ray + + ds = ray.data.read_parquet( + "az://ray-example-data/iris.parquet", + adlfs.AzureBlobFileSystem(account_name="azureopendatastorage") + ) + + print(ds.schema()) + + .. testoutput:: + + Column Type + ------ ---- + sepal.length double + sepal.width double + petal.length double + petal.width double + variety string + +Reading files from NFS +~~~~~~~~~~~~~~~~~~~~~~ + +To read files from NFS filesystems, call a function like :func:`~ray.data.read_parquet` +and specify files on the mounted filesystem. Paths can point to files or directories. + +To read formats other than Parquet, see the :ref:`Input/Output reference `. + +.. testcode:: + :skipif: True -In Ray Data, users often read from remote storage systems as described above. In -some use cases, users may want to read from local storage. There are three ways to read -from a local filesystem: + import ray + + ds = ray.data.read_parquet("/mnt/cluster_storage/iris.parquet") + + print(ds.schema()) -* **Providing a raw filesystem path**: For example, in ``ray.data.read_csv("my_file.csv")``, - the given path will be resolved as a local filesystem path. If the file exists only on the - local node and you run this read operation in distributed cluster, this will fail as it - cannot access the file from remote nodes. -* **Using ``local://`` custom URI scheme**: Similarly, this will be resolved to local - filesystem, e.g. ``ray.data.read_csv("local://my_file.csv")`` will read the - same file as the approach above. The difference is that this scheme will ensure - all read tasks happen on the local node, so it's safe to run in a distributed - cluster. -* **Using ``example://`` custom URI scheme**: The paths with this scheme will be resolved - to ``ray/data/examples/data`` directory in the Ray package. This scheme is used - only for testing or demoing examples. +.. testoutput:: -Reading Compressed Files -======================== + Column Type + ------ ---- + sepal.length double + sepal.width double + petal.length double + petal.width double + variety string -Ray Data supports reading compressed files using the ``arrow_open_stream_args`` arg. -`Codecs supported by Arrow `__ -(bz2, brotli, gzip, lz4 or zstd) are compatible with Ray Data. -For example: +Handling compressed files +~~~~~~~~~~~~~~~~~~~~~~~~~ -.. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __read_compressed_begin__ - :end-before: __read_compressed_end__ +To read a compressed file, specify ``compression`` in ``arrow_open_stream_args``. +You can use any `Codec supported by Arrow `__. -.. _dataset_from_in_memory_data: +.. testcode:: -------------------- -From In-Memory Data -------------------- + import ray -Datasets can be constructed from existing in-memory data. In addition to being able to -construct a ``Dataset`` from plain Python objects, Datasets also interoperates with popular -single-node libraries (`Pandas `__, -`NumPy `__, `Arrow `__) as well as -distributed frameworks (:ref:`Dask `, :ref:`Spark `, -:ref:`Modin `, :ref:`Mars `). + ds = ray.data.read_csv( + "s3://anonymous@ray-example-data/iris.csv.gz", + arrow_open_stream_args={"compression": "gzip"}, + ) -.. _dataset_from_in_memory_data_single_node: +Loading data from other libraries +================================= -From Single-Node Data Libraries -=============================== +Loading data from single-node data libraries +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -In this section, we demonstrate creating a ``Dataset`` from single-node in-memory data. +Ray Data interoperates with libraries like pandas, NumPy, and Arrow. .. tab-set:: - .. tab-item:: Pandas + .. tab-item:: Python objects + + To create a :class:`~ray.data.dataset.Dataset` from Python objects, call + :func:`~ray.data.from_items` and pass in a list of ``Dict``. Ray Data treats + each ``Dict`` as a row. - Create a ``Dataset`` from a Pandas DataFrame. This constructs a ``Dataset`` - backed by a single block. + .. testcode:: - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_pandas_begin__ - :end-before: __from_pandas_end__ + import ray + + ds = ray.data.from_items([ + {"food": "spam", "price": 9.34}, + {"food": "ham", "price": 5.37}, + {"food": "eggs", "price": 0.94} + ]) + + print(ds) + + .. testoutput:: + + MaterializedDataset( + num_blocks=3, + num_rows=3, + schema={food: string, price: double} + ) - We can also build a ``Dataset`` from more than one Pandas DataFrame, where each said - DataFrame will become a block in the ``Dataset``. + You can also create a :class:`~ray.data.dataset.Dataset` from a list of regular + Python objects. - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_pandas_mult_begin__ - :end-before: __from_pandas_mult_end__ + .. testcode:: + + import ray + + ds = ray.data.from_items([1, 2, 3, 4, 5]) + + print(ds) + + .. testoutput:: + + MaterializedDataset(num_blocks=5, num_rows=5, schema={item: int64}) .. tab-item:: NumPy - Create a ``Dataset`` from a NumPy ndarray. This constructs a ``Dataset`` - backed by a single block; the outer dimension of the ndarray - will be treated as the row dimension, and the column will have name ``"data"``. + To create a :class:`~ray.data.dataset.Dataset` from a NumPy array, call + :func:`~ray.data.from_numpy`. Ray Data treats the outer axis as the row + dimension. + + .. testcode:: + + import numpy as np + import ray + + array = np.ones((3, 2, 2)) + ds = ray.data.from_numpy(array) + + print(ds) - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_numpy_begin__ - :end-before: __from_numpy_end__ + .. testoutput:: - We can also build a ``Dataset`` from more than one NumPy ndarray, where each said - ndarray will become a block in the ``Dataset``. + MaterializedDataset( + num_blocks=1, + num_rows=3, + schema={data: numpy.ndarray(shape=(2, 2), dtype=double)} + ) + + .. tab-item:: pandas + + To create a :class:`~ray.data.dataset.Dataset` from a pandas DataFrame, call + :func:`~ray.data.from_pandas`. + + .. testcode:: + + import pandas as pd + import ray - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_numpy_mult_begin__ - :end-before: __from_numpy_mult_end__ + df = pd.DataFrame({ + "food": ["spam", "ham", "eggs"], + "price": [9.34, 5.37, 0.94] + }) + ds = ray.data.from_pandas(df) - .. tab-item:: Arrow + print(ds) - Create a ``Dataset`` from an - `Arrow Table `__. - This constructs a ``Dataset`` backed by a single block. + .. testoutput:: - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_arrow_begin__ - :end-before: __from_arrow_end__ + MaterializedDataset( + num_blocks=1, + num_rows=3, + schema={food: object, price: float64} + ) + + .. tab-item:: PyArrow - We can also build a ``Dataset`` from more than one Arrow Table, where each said - ``Table`` will become a block in the ``Dataset``. + To create a :class:`~ray.data.dataset.Dataset` from an Arrow table, call + :func:`~ray.data.from_arrow`. - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_arrow_mult_begin__ - :end-before: __from_arrow_mult_end__ + .. testcode:: - .. tab-item:: Python Objects + import pyarrow as pa - Create a ``Dataset`` from a list of Python objects; which are interpreted as dict records. - If the object is not a dict, it will be wrapped as ``{"item": item}``. + table = pa.table({ + "food": ["spam", "ham", "eggs"], + "price": [9.34, 5.37, 0.94] + }) + ds = ray.data.from_arrow(table) - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_items_begin__ - :end-before: __from_items_end__ + print(ds) -.. _dataset_from_in_memory_data_distributed: + .. testoutput:: -From Distributed Data Processing Frameworks -=========================================== + MaterializedDataset( + num_blocks=1, + num_rows=3, + schema={food: string, price: double} + ) -In addition to working with single-node in-memory data, Datasets can be constructed from -distributed (multi-node) in-memory data, interoperating with popular distributed -data processing frameworks such as :ref:`Dask `, :ref:`Spark `, -:ref:`Modin `, and :ref:`Mars `. +Loading data from distributed DataFrame libraries +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Note that these data processing frameworks must be running on Ray in order for these -integrations to work. See how these frameworks can be run on Ray in our -:ref:`data processing integrations docs `. +Ray Data interoperates with distributed data processing frameworks like +:ref:`Dask `, :ref:`Spark `, :ref:`Modin `, and +:ref:`Mars `. .. tab-set:: .. tab-item:: Dask - Create a ``MaterializedDataset`` from a - `Dask DataFrame `__. This constructs a - ``Dataset`` backed by the distributed Pandas DataFrame partitions that underly the - Dask DataFrame. + To create a :class:`~ray.data.dataset.Dataset` from a + `Dask DataFrame `__, call + :func:`~ray.data.from_dask`. This function constructs a + ``Dataset`` backed by the distributed Pandas DataFrame partitions that underly + the Dask DataFrame. - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_dask_begin__ - :end-before: __from_dask_end__ + .. testcode:: + :skipif: True - .. tab-item:: Spark + import dask.dataframe as dd + import pandas as pd + import ray - Create a ``MaterializedDataset`` from a `Spark DataFrame - `__. - This constructs a ``Dataset`` backed by the distributed Spark DataFrame partitions - that underly the Spark DataFrame. When this conversion happens, Spark-on-Ray (RayDP) - will save the Spark DataFrame partitions to Ray's object store in the Arrow format, - which Datasets will then interpret as its blocks. + df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) + ddf = dd.from_pandas(df, npartitions=4) + # Create a Dataset from a Dask DataFrame. + ds = ray.data.from_dask(ddf) - .. literalinclude:: ./doc_code/loading_data_untested.py - :language: python - :start-after: __from_spark_begin__ - :end-before: __from_spark_end__ + ds.show(3) - .. tab-item:: Modin + .. testoutput:: - Create a ``MaterializedDataset`` from a Modin DataFrame. This constructs a ``Dataset`` - backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame. + {'string': 'spam', 'number': 0} + {'string': 'ham', 'number': 1} + {'string': 'eggs', 'number': 2} - .. literalinclude:: ./doc_code/loading_data.py - :language: python - :start-after: __from_modin_begin__ - :end-before: __from_modin_end__ + .. tab-item:: Spark - .. tab-item:: Mars + To create a :class:`~ray.data.dataset.Dataset` from a `Spark DataFrame + `__, + call :func:`~ray.data.from_spark`. This function creates a ``Dataset`` backed by + the distributed Spark DataFrame partitions that underly the Spark DataFrame. - Create a ``MaterializedDataset`` from a Mars DataFrame. This constructs a ``Dataset`` - backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame. + .. testcode:: + :skipif: True - .. literalinclude:: ./doc_code/loading_data_untested.py - :language: python - :start-after: __from_mars_begin__ - :end-before: __from_mars_end__ + import ray + import raydp -.. _dataset_from_torch_tf: + spark = raydp.init_spark(app_name="Spark -> Datasets Example", + num_executors=2, + executor_cores=2, + executor_memory="500MB") + df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"]) + ds = ray.data.from_spark(df) -------------------------- -From Torch and TensorFlow -------------------------- + ds.show(3) -.. tab-set:: + .. testoutput:: - .. tab-item:: PyTorch + {'col1': 0, 'col2': '0'} + {'col1': 1, 'col2': '1'} + {'col1': 2, 'col2': '2'} - If you already have a Torch dataset available, you can create a Dataset using - :class:`~ray.data.from_torch`. + .. tab-item:: Modin - .. warning:: - :class:`~ray.data.from_torch` doesn't support parallel - reads. You should only use this datasource for small datasets like MNIST or - CIFAR. + To create a :class:`~ray.data.dataset.Dataset` from a Modin DataFrame, call + :func:`~ray.data.from_modin`. This function constructs a ``Dataset`` backed by + the distributed Pandas DataFrame partitions that underly the Modin DataFrame. - .. code-block:: python + .. testcode:: + :skipif: True + import modin.pandas as md + import pandas as pd import ray - import torchvision - torch_ds = torchvision.datasets.MNIST("data", download=True) - dataset = ray.data.from_torch(torch_ds) - dataset.take(1) - # {"item": (, 5)} + df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) + mdf = md.DataFrame(df) + # Create a Dataset from a Modin DataFrame. + ds = ray.data.from_modin(mdf) - .. tab-item:: TensorFlow + ds.show(3) - If you already have a TensorFlow dataset available, you can create a Dataset - using :class:`~ray.data.from_tf`. + .. testoutput:: - .. warning:: - :class:`~ray.data.from_tf` doesn't support parallel reads. You - should only use this function with small datasets like MNIST or CIFAR. + {'col1': 0, 'col2': '0'} + {'col1': 1, 'col2': '1'} + {'col1': 2, 'col2': '2'} + + .. tab-item:: Mars + + To create a :class:`~ray.data.dataset.Dataset` from a Mars DataFrame, call + :func:`~ray.data.from_mars`. This function constructs a ``Dataset`` + backed by the distributed Pandas DataFrame partitions that underly the Mars + DataFrame. - .. code-block:: python + .. testcode:: + :skipif: True + import mars + import mars.dataframe as md + import pandas as pd import ray - import tensorflow_datasets as tfds - tf_ds, _ = tfds.load("cifar10", split=["train", "test"]) - dataset = ray.data.from_tf(tf_ds) + cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1) - dataset - # -> MaterializedDataset(num_blocks=200, num_rows=50000, schema={id: binary, image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8), label: int64}) + df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) + mdf = md.DataFrame(df, num_partitions=8) + # Create a tabular Dataset from a Mars DataFrame. + ds = ray.data.from_mars(mdf) -.. _dataset_from_huggingface: + ds.show(3) -------------------------------- -From 🤗 (Hugging Face) Datasets -------------------------------- + .. testoutput:: -You can convert 🤗 Datasets into Ray Data by using -:py:class:`~ray.data.from_huggingface`. This function accesses the underlying Arrow table and -converts it into a Dataset directly. + {'col1': 0, 'col2': '0'} + {'col1': 1, 'col2': '1'} + {'col1': 2, 'col2': '2'} -.. warning:: - :py:class:`~ray.data.from_huggingface` doesn't support parallel - reads. This will not usually be an issue with in-memory 🤗 Datasets, - but may fail with large memory-mapped 🤗 Datasets. 🤗 ``IterableDataset`` - objects are not supported. +Loading data from ML libraries +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. code-block:: python +Ray Data interoperates with HuggingFace and TensorFlow datasets. - import ray.data - from datasets import load_dataset +.. tab-set:: - hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1") - ray_ds = ray.data.from_huggingface(hf_ds) - ray_ds["train"].take(2) - # [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}] + .. tab-item:: HuggingFace -.. _dataset_mongo_db: + To convert a 🤗 Dataset to a Ray Datasets, call + :func:`~ray.data.from_huggingface`. This function accesses the underlying Arrow + table and converts it to a Dataset directly. ------------- -From MongoDB ------------- + .. warning:: + :class:`~ray.data.from_huggingface` doesn't support parallel + reads. This isn't an issue with in-memory 🤗 Datasets, but may fail with + large memory-mapped 🤗 Datasets. Also, 🤗 ``IterableDataset`` objects aren't + supported. -A Dataset can also be created from `MongoDB `__ with -:py:class:`~ray.data.read_mongo`. -This interacts with MongoDB similar to external filesystems, except here you will -need to specify the MongoDB source by its `uri `__, -`database and collection `__, -and specify a `pipeline `__ to run against -the collection. The execution results are then used to create a Dataset. + .. testcode:: -.. note:: + import ray.data + from datasets import load_dataset - This example is not runnable as-is; you'll need to point it at your MongoDB - instance. + hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1") + ray_ds = ray.data.from_huggingface(hf_ds) + ray_ds["train"].take(2) -.. code-block:: python + .. testoutput:: + :options: +MOCK - import ray + [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}] - # Read a local MongoDB. - ds = ray.data.read_mongo( - uri="mongodb://localhost:27017", - database="my_db", - collection="my_collection", - pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}], - ) + .. tab-item:: TensorFlow - # Reading a remote MongoDB is the same. - ds = ray.data.read_mongo( - uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", - database="my_db", - collection="my_collection", - pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}], - ) + To convert a TensorFlow dataset to a Ray Dataset, call :func:`~ray.data.from_tf`. - # Write back to MongoDB. - ds.write_mongo( - MongoDatasource(), - uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", - database="my_db", - collection="my_collection", - ) + .. warning:: + :class:`~ray.data.from_tf` doesn't support parallel reads. Only use this + function with small datasets like MNIST or CIFAR. + + .. testcode:: -.. _datasets_sql_databases: + import ray + import tensorflow_datasets as tfds + + tf_ds, _ = tfds.load("cifar10", split=["train", "test"]) + ds = ray.data.from_tf(tf_ds) + + print(ds) --------------------------- -Reading From SQL Databases --------------------------- + .. testoutput:: + + MaterializedDataset( + num_blocks=..., + num_rows=50000, + schema={ + id: binary, + image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8), + label: int64 + } + ) + +Reading databases +================= + +Ray Data reads from databases like MySQL, Postgres, and MongoDB. + +Reading SQL databases +~~~~~~~~~~~~~~~~~~~~~ Call :func:`~ray.data.read_sql` to read data from a database that provides a `Python DB API2-compliant `_ connector. @@ -619,9 +651,10 @@ Call :func:`~ray.data.read_sql` to read data from a database that provides a pip install mysql-connector-python - Then, define your connection login and query the database. + Then, define your connection logic and query the database. - .. code-block:: python + .. testcode:: + :skipif: True import mysql.connector @@ -657,9 +690,10 @@ Call :func:`~ray.data.read_sql` to read data from a database that provides a pip install psycopg2-binary - Then, define your connection login and query the database. + Then, define your connection logic and query the database. - .. code-block:: python + .. testcode:: + :skipif: True import psycopg2 @@ -693,9 +727,10 @@ Call :func:`~ray.data.read_sql` to read data from a database that provides a pip install snowflake-connector-python - Then, define your connection login and query the database. + Then, define your connection logic and query the database. - .. code-block:: python + .. testcode:: + :skipif: True import snowflake.connector @@ -733,7 +768,8 @@ Call :func:`~ray.data.read_sql` to read data from a database that provides a Then, define your connection logic and read from the Databricks SQL warehouse. - .. code-block:: python + .. testcode:: + :skipif: True from databricks import sql @@ -767,9 +803,10 @@ Call :func:`~ray.data.read_sql` to read data from a database that provides a pip install google-cloud-bigquery - Then, define your connection login and query the dataset. + Then, define your connection logic and query the dataset. - .. code-block:: python + .. testcode:: + :skipif: True from google.cloud import bigquery from google.cloud.bigquery import dbapi @@ -791,17 +828,97 @@ Call :func:`~ray.data.read_sql` to read data from a database that provides a "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection ) +Reading MongoDB +~~~~~~~~~~~~~~~ + +To read data from MongoDB, call :func:`~ray.data.read_mongo` and specify the +the source URI, database, and collection. You also need to specify a pipeline to +run against the collection. + +.. testcode:: + :skipif: True + + import ray + + # Read a local MongoDB. + ds = ray.data.read_mongo( + uri="mongodb://localhost:27017", + database="my_db", + collection="my_collection", + pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}], + ) + + # Reading a remote MongoDB is the same. + ds = ray.data.read_mongo( + uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", + database="my_db", + collection="my_collection", + pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}], + ) + + # Write back to MongoDB. + ds.write_mongo( + MongoDatasource(), + uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", + database="my_db", + collection="my_collection", + ) + +Creating synthetic data +======================= + +Synthetic datasets can be useful for testing and benchmarking. -.. _data_custom_datasource: +.. tab-set:: + + .. tab-item:: Int Range + + To create a synthetic :class:`~ray.data.Dataset` from a range of integers, call + :func:`~ray.data.range`. Ray Data stores the integer range in a single column. + + .. testcode:: + + import ray + + ds = ray.data.range(10000) + + print(ds.schema()) ------------------- -Custom Datasources ------------------- + .. testoutput:: + + Column Type + ------ ---- + id int64 + + .. tab-item:: Tensor Range -Datasets can read and write in parallel to :ref:`custom datasources ` defined in Python. -Once you have implemented `YourCustomDataSource`, you can use it like any other source in Ray Data: + To create a synthetic :class:`~ray.data.Dataset` containing arrays, call + :func:`~ray.data.range_tensor`. Ray Data packs an integer range into ndarrays of + the provided shape. -.. code-block:: python + .. testcode:: + + import ray + + ds = ray.data.range_tensor(10, shape=(64, 64)) + + print(ds.schema()) + + .. testoutput:: + + Column Type + ------ ---- + data numpy.ndarray(shape=(64, 64), dtype=int64) + +Loading other data sources +========================== + +If Ray Data can't load your data, subclass +:class:`~ray.data.datasource.Datasource`. Then, construct an instance of your custom +datasource and pass it to :func:`~ray.data.read_datasource`. + +.. testcode:: + :skipif: True # Read from a custom datasource. ds = ray.data.read_datasource(YourCustomDatasource(), **read_args) @@ -809,13 +926,16 @@ Once you have implemented `YourCustomDataSource`, you can use it like any other # Write to a custom datasource. ds.write_datasource(YourCustomDatasource(), **write_args) -For more details, read :ref:`Implementing a Custom Datasource `. +For an example, see :ref:`Implementing a Custom Datasource `. --------------------------- -Performance Considerations --------------------------- +Performance considerations +========================== -The dataset ``parallelism`` determines the number of blocks the base data will be split into for parallel reads. Ray Data will decide internally how many read tasks to run concurrently to best utilize the cluster, ranging from ``1...parallelism`` tasks. In other words, the higher the parallelism, the smaller the data blocks in the Dataset and hence the more opportunity for parallel execution. +The dataset ``parallelism`` determines the number of blocks the base data will be split +into for parallel reads. Ray Data will decide internally how many read tasks to run +concurrently to best utilize the cluster, ranging from ``1...parallelism`` tasks. In +other words, the higher the parallelism, the smaller the data blocks in the Dataset and +hence the more opportunity for parallel execution. .. image:: images/dataset-read.svg :width: 650px diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 4165480d91536..c44427d460ea5 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -566,6 +566,28 @@ def read_parquet( } ) + The Parquet reader also supports projection and filter pushdown, allowing column + selection and row filtering to be pushed down to the file scan. + + .. testcode:: + + import pyarrow as pa + + # Create a Dataset by reading a Parquet file, pushing column selection and + # row filtering down to the file scan. + ds = ray.data.read_parquet( + "example://iris.parquet", + columns=["sepal.length", "variety"], + filter=pa.dataset.field("sepal.length") > 5.0, + ) + + ds.show(2) + + .. testoutput:: + + {'sepal.length': 5.1, 'variety': 'Setosa'} + {'sepal.length': 5.4, 'variety': 'Setosa'} + For further arguments you can pass to pyarrow as a keyword argument, see https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment