From d8fcda47ffa278a5c4372e63456a9f90f3d7a44c Mon Sep 17 00:00:00 2001 From: amogkam Date: Tue, 9 May 2023 21:17:38 -0700 Subject: [PATCH 1/6] improve huggingface Signed-off-by: amogkam --- doc/requirements-doc.txt | 1 + python/ray/data/read_api.py | 49 ++++++++++++++++++++++++++++++++----- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/doc/requirements-doc.txt b/doc/requirements-doc.txt index 05156ea26c2a5..8ea8d767da4e9 100644 --- a/doc/requirements-doc.txt +++ b/doc/requirements-doc.txt @@ -5,6 +5,7 @@ accelerate>=0.17.0 click colorama colorful +datasets # Newer versions of fairscale do not support Python 3.6 even though they still have wheels for it. # Have to manually pin it: https://github.com/facebookresearch/fairscale/issues/962 fairscale; python_version >= '3.7' diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 384c8998d60e3..df663137fea52 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1772,20 +1772,47 @@ def from_spark( @PublicAPI def from_huggingface( dataset: Union["datasets.Dataset", "datasets.DatasetDict"], -) -> Union[MaterializedDataset]: +) -> Union[MaterializedDataset, Dict[str, MaterializedDataset]]: """Create a dataset from a Hugging Face Datasets Dataset. This function is not parallelized, and is intended to be used with Hugging Face Datasets that are loaded into memory (as opposed to memory-mapped). + Example: + >>> import ray + >>> import datasets + >>> hf_dataset = datasets.load_dataset("tweet_eval", "emotion") + >>> ray_ds = ray.data.from_huggingface(hf_dataset) + >>> ray_ds + {'train': MaterializedDataset( + num_blocks=1, + num_rows=3257, + schema={text: string, label: int64} + ), 'test': MaterializedDataset( + num_blocks=1, + num_rows=1421, + schema={text: string, label: int64} + ), 'validation': MaterializedDataset( + num_blocks=1, + num_rows=374, + schema={text: string, label: int64} + )} + >>> ray_ds = ray.data.from_huggingface(hf_dataset["train"]) + >>> ray_ds + MaterializedDataset( + num_blocks=1, + num_rows=3257, + schema={text: string, label: int64} + ) + Args: - dataset: A Hugging Face ``Dataset``, or ``DatasetDict``. - ``IterableDataset`` is not supported. + dataset: A Hugging Face Dataset, or DatasetDict. IterableDataset is not + supported. ``IterableDataset`` is not supported. Returns: - MaterializedDataset holding Arrow records from the Hugging Face Dataset, or a - dict of MaterializedDataset in case ``dataset`` is a ``DatasetDict``. + Dataset holding Arrow records from the Hugging Face Dataset, or a dict of + datasets in case dataset is a DatasetDict. """ import datasets @@ -1797,12 +1824,22 @@ def convert(ds: "datasets.Dataset") -> Dataset: return ray_ds if isinstance(dataset, datasets.DatasetDict): + available_keys = list(dataset.keys()) + logger.warning( + "You provided a Huggingface DatasetDict which contains multiple " + "datasets. The output of `from_huggingface` is a dictionary of Ray " + "Datasets. To convert just a single Huggingface Dataset to a " + "Ray Dataset, specify a split. For example, " + "`ray.data.from_huggingface(my_dataset_dictionary" + f"['{available_keys[0]}'])`. " + f"Available splits are {available_keys}." + ) return {k: convert(ds) for k, ds in dataset.items()} elif isinstance(dataset, datasets.Dataset): return convert(dataset) else: raise TypeError( - "`dataset` must be a `datasets.Dataset` or `datasets.DatasetDict`, " + "`dataset` must be a `datasets.Dataset` or `datasets.DatasetDict`." f"got {type(dataset)}" ) From abeca47c5b881fea236d2b90d2ea5c96fe75dc43 Mon Sep 17 00:00:00 2001 From: amogkam Date: Wed, 10 May 2023 13:17:35 -0700 Subject: [PATCH 2/6] fix Signed-off-by: amogkam --- python/ray/data/read_api.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index df663137fea52..202d1058c295b 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1786,24 +1786,24 @@ def from_huggingface( >>> ray_ds = ray.data.from_huggingface(hf_dataset) >>> ray_ds {'train': MaterializedDataset( - num_blocks=1, - num_rows=3257, - schema={text: string, label: int64} + num_blocks=1, + num_rows=3257, + schema={text: string, label: int64} ), 'test': MaterializedDataset( - num_blocks=1, - num_rows=1421, - schema={text: string, label: int64} + num_blocks=1, + num_rows=1421, + schema={text: string, label: int64} ), 'validation': MaterializedDataset( - num_blocks=1, - num_rows=374, - schema={text: string, label: int64} + num_blocks=1, + num_rows=374, + schema={text: string, label: int64} )} >>> ray_ds = ray.data.from_huggingface(hf_dataset["train"]) >>> ray_ds MaterializedDataset( - num_blocks=1, - num_rows=3257, - schema={text: string, label: int64} + num_blocks=1, + num_rows=3257, + schema={text: string, label: int64} ) Args: From 3b9d2fe6945d82d227974da25ccce77f0d1af2c4 Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 11 May 2023 17:01:25 -0700 Subject: [PATCH 3/6] fix Signed-off-by: amogkam --- python/ray/data/read_api.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 202d1058c295b..73db448909abd 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -267,7 +267,7 @@ def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Datas Examples: >>> import ray >>> ds = ray.data.range_tensor(1000, shape=(2, 2)) - >>> ds # doctest: +ellipsis + >>> ds # doctest: +ELLIPSIS Dataset( num_blocks=..., num_rows=1000, @@ -855,8 +855,8 @@ def read_json( from file paths. If your data adheres to a different partitioning scheme, set the ``partitioning`` parameter. - >>> ds = ray.data.read_json("example://year=2022/month=09/sales.json") # doctest: + SKIP - >>> ds.take(1) # doctest: + SKIP + >>> ds = ray.data.read_json("example://year=2022/month=09/sales.json") # doctest: +SKIP + >>> ds.take(1) # doctest: +SKIP [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'} Args: @@ -950,8 +950,8 @@ def read_csv( from file paths. If your data adheres to a different partitioning scheme, set the ``partitioning`` parameter. - >>> ds = ray.data.read_csv("example://year=2022/month=09/sales.csv") # doctest: + SKIP - >>> ds.take(1) # doctest: + SKIP + >>> ds = ray.data.read_csv("example://year=2022/month=09/sales.csv") # doctest: +SKIP + >>> ds.take(1) # doctest: +SKIP [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}] By default, ``read_csv`` reads all files from file paths. If you want to filter @@ -1786,24 +1786,24 @@ def from_huggingface( >>> ray_ds = ray.data.from_huggingface(hf_dataset) >>> ray_ds {'train': MaterializedDataset( - num_blocks=1, - num_rows=3257, - schema={text: string, label: int64} + num_blocks=1, + num_rows=3257, + schema={text: string, label: int64} ), 'test': MaterializedDataset( - num_blocks=1, - num_rows=1421, - schema={text: string, label: int64} + num_blocks=1, + num_rows=1421, + schema={text: string, label: int64} ), 'validation': MaterializedDataset( - num_blocks=1, - num_rows=374, - schema={text: string, label: int64} + num_blocks=1, + num_rows=374, + schema={text: string, label: int64} )} >>> ray_ds = ray.data.from_huggingface(hf_dataset["train"]) >>> ray_ds MaterializedDataset( - num_blocks=1, - num_rows=3257, - schema={text: string, label: int64} + num_blocks=1, + num_rows=3257, + schema={text: string, label: int64} ) Args: From 3630fd6d1f4c50363f13f4274c7310e18179523d Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 11 May 2023 18:04:09 -0700 Subject: [PATCH 4/6] fix doctest Signed-off-by: amogkam --- python/ray/data/read_api.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 73db448909abd..613640d333bee 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -269,9 +269,9 @@ def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Datas >>> ds = ray.data.range_tensor(1000, shape=(2, 2)) >>> ds # doctest: +ELLIPSIS Dataset( - num_blocks=..., - num_rows=1000, - schema={data: numpy.ndarray(shape=(2, 2), dtype=int64)}) + num_blocks=..., + num_rows=1000, + schema={data: numpy.ndarray(shape=(2, 2), dtype=int64)}) >>> ds.map_batches(lambda arr: arr * 2).take(2) # doctest: +SKIP [array([[0, 0], [0, 0]]), @@ -1783,7 +1783,8 @@ def from_huggingface( >>> import ray >>> import datasets >>> hf_dataset = datasets.load_dataset("tweet_eval", "emotion") - >>> ray_ds = ray.data.from_huggingface(hf_dataset) + >>> ray_ds = ray.data.from_huggingface(hf_dataset) # doctest: +ELLIPSIS + Downloading ... >>> ray_ds {'train': MaterializedDataset( num_blocks=1, From 8c90f26080be70f6d6947a9d9aade8a339cf3954 Mon Sep 17 00:00:00 2001 From: amogkam Date: Fri, 12 May 2023 15:10:18 -0700 Subject: [PATCH 5/6] fix Signed-off-by: amogkam --- python/ray/data/preprocessors/encoder.py | 62 +++++++++++++++++++----- python/ray/data/read_api.py | 9 +++- 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/python/ray/data/preprocessors/encoder.py b/python/ray/data/preprocessors/encoder.py index 0df8885aabf52..885ea02daf164 100644 --- a/python/ray/data/preprocessors/encoder.py +++ b/python/ray/data/preprocessors/encoder.py @@ -72,6 +72,8 @@ class OrdinalEncoder(Preprocessor): Args: columns: The columns to separately encode. + output_columns: The new columns to store the encoded columns. If None, + the columns are encoded in-place. Defaults to None. encode_lists: If ``True``, encode list elements. If ``False``, encode whole lists (i.e., replace each list with an integer). ``True`` by default. @@ -82,9 +84,23 @@ class OrdinalEncoder(Preprocessor): Another preprocessor that encodes categorical data. """ - def __init__(self, columns: List[str], *, encode_lists: bool = True): + def __init__( + self, + columns: List[str], + *, + output_columns: Optional[List[str]] = None, + encode_lists: bool = True, + ): # TODO: allow user to specify order of values within each column. + if output_columns is not None and len(output_columns) != len(columns): + raise ValueError( + f"{len(columns)} were requested to be encoded, but only " + f"{len(output_columns)} were specified. The number " + "of columns to be encoded and the number of " + "output columns should match." + ) self.columns = columns + self.output_columns = output_columns self.encode_lists = encode_lists def _fit(self, dataset: Dataset) -> Preprocessor: @@ -115,12 +131,16 @@ def list_as_category(element): s_values = self.stats_[f"unique_values({s.name})"] return s.map(s_values) - df[self.columns] = df[self.columns].apply(column_ordinal_encoder) + output_columns = ( + self.columns if self.output_columns is not None else self.columns + ) + df[output_columns] = df[self.columns].apply(column_ordinal_encoder) return df def __repr__(self): return ( f"{self.__class__.__name__}(columns={self.columns!r}, " + f"output_columns={self.output_columns!r}, " f"encode_lists={self.encode_lists!r})" ) @@ -187,6 +207,8 @@ class OneHotEncoder(Preprocessor): max_categories: The maximum number of features to create for each column. If a value isn't specified for a column, then a feature is created for every category in that column. + drop_original_columns: Whether to drop the original unencoded columns. Defaults + to False. .. seealso:: @@ -200,11 +222,16 @@ class OneHotEncoder(Preprocessor): """ # noqa: E501 def __init__( - self, columns: List[str], *, max_categories: Optional[Dict[str, int]] = None + self, + columns: List[str], + *, + max_categories: Optional[Dict[str, int]] = None, + drop_original_columns: bool = False, ): - # TODO: add `drop` parameter. + self.columns = columns self.max_categories = max_categories + self.drop_original_columns = drop_original_columns def _fit(self, dataset: Dataset) -> Preprocessor: self.stats_ = _get_unique_value_indices( @@ -224,19 +251,28 @@ def _transform_pandas(self, df: pd.DataFrame): for column in self.columns: column_values = self.stats_[f"unique_values({column})"] if _is_series_composed_of_lists(df[column]): - df[column] = df[column].map(lambda x: tuple(x)) + df[f"tmp_{column}"] = df[column].map(lambda x: tuple(x)) + else: + df[f"tmp_{column}"] = df[column] + for column_value in column_values: - df[f"{column}_{column_value}"] = (df[column] == column_value).astype( - int - ) + df[f"{column}_{column_value}"] = ( + df[f"tmp_{column}"] == column_value + ).astype(int) + + # Drop the temp columns. + df.drop([f"tmp_{column}" for column in columns_to_drop]) + # Drop original unencoded columns. - df = df.drop(columns=list(columns_to_drop)) + if self.drop_original_columns: + df = df.drop(columns=list(columns_to_drop)) return df def __repr__(self): return ( f"{self.__class__.__name__}(columns={self.columns!r}, " - f"max_categories={self.max_categories!r})" + f"max_categories={self.max_categories!r}, " + f"drop_original_columns={self.drop_original_columns})" ) @@ -309,7 +345,11 @@ class MultiHotEncoder(Preprocessor): """ def __init__( - self, columns: List[str], *, max_categories: Optional[Dict[str, int]] = None + self, + columns: List[str], + *, + output_columns: Optional[List[str]] = None, + max_categories: Optional[Dict[str, int]] = None, ): # TODO: add `drop` parameter. self.columns = columns diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 613640d333bee..3cf4649c61fea 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -271,7 +271,8 @@ def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Datas Dataset( num_blocks=..., num_rows=1000, - schema={data: numpy.ndarray(shape=(2, 2), dtype=int64)}) + schema={data: numpy.ndarray(shape=(2, 2), dtype=int64)} + ) >>> ds.map_batches(lambda arr: arr * 2).take(2) # doctest: +SKIP [array([[0, 0], [0, 0]]), @@ -1780,11 +1781,15 @@ def from_huggingface( to memory-mapped). Example: + + .. doctest:: + :options: +ELLIPSIS + >>> import ray >>> import datasets >>> hf_dataset = datasets.load_dataset("tweet_eval", "emotion") - >>> ray_ds = ray.data.from_huggingface(hf_dataset) # doctest: +ELLIPSIS Downloading ... + >>> ray_ds = ray.data.from_huggingface(hf_dataset) >>> ray_ds {'train': MaterializedDataset( num_blocks=1, From 9006b62868072bb8efc06ea62c8a9ee59a7c5aea Mon Sep 17 00:00:00 2001 From: amogkam Date: Mon, 15 May 2023 10:59:58 -0700 Subject: [PATCH 6/6] revert Signed-off-by: amogkam --- python/ray/data/preprocessors/encoder.py | 62 +++++------------------- 1 file changed, 11 insertions(+), 51 deletions(-) diff --git a/python/ray/data/preprocessors/encoder.py b/python/ray/data/preprocessors/encoder.py index 885ea02daf164..0df8885aabf52 100644 --- a/python/ray/data/preprocessors/encoder.py +++ b/python/ray/data/preprocessors/encoder.py @@ -72,8 +72,6 @@ class OrdinalEncoder(Preprocessor): Args: columns: The columns to separately encode. - output_columns: The new columns to store the encoded columns. If None, - the columns are encoded in-place. Defaults to None. encode_lists: If ``True``, encode list elements. If ``False``, encode whole lists (i.e., replace each list with an integer). ``True`` by default. @@ -84,23 +82,9 @@ class OrdinalEncoder(Preprocessor): Another preprocessor that encodes categorical data. """ - def __init__( - self, - columns: List[str], - *, - output_columns: Optional[List[str]] = None, - encode_lists: bool = True, - ): + def __init__(self, columns: List[str], *, encode_lists: bool = True): # TODO: allow user to specify order of values within each column. - if output_columns is not None and len(output_columns) != len(columns): - raise ValueError( - f"{len(columns)} were requested to be encoded, but only " - f"{len(output_columns)} were specified. The number " - "of columns to be encoded and the number of " - "output columns should match." - ) self.columns = columns - self.output_columns = output_columns self.encode_lists = encode_lists def _fit(self, dataset: Dataset) -> Preprocessor: @@ -131,16 +115,12 @@ def list_as_category(element): s_values = self.stats_[f"unique_values({s.name})"] return s.map(s_values) - output_columns = ( - self.columns if self.output_columns is not None else self.columns - ) - df[output_columns] = df[self.columns].apply(column_ordinal_encoder) + df[self.columns] = df[self.columns].apply(column_ordinal_encoder) return df def __repr__(self): return ( f"{self.__class__.__name__}(columns={self.columns!r}, " - f"output_columns={self.output_columns!r}, " f"encode_lists={self.encode_lists!r})" ) @@ -207,8 +187,6 @@ class OneHotEncoder(Preprocessor): max_categories: The maximum number of features to create for each column. If a value isn't specified for a column, then a feature is created for every category in that column. - drop_original_columns: Whether to drop the original unencoded columns. Defaults - to False. .. seealso:: @@ -222,16 +200,11 @@ class OneHotEncoder(Preprocessor): """ # noqa: E501 def __init__( - self, - columns: List[str], - *, - max_categories: Optional[Dict[str, int]] = None, - drop_original_columns: bool = False, + self, columns: List[str], *, max_categories: Optional[Dict[str, int]] = None ): - + # TODO: add `drop` parameter. self.columns = columns self.max_categories = max_categories - self.drop_original_columns = drop_original_columns def _fit(self, dataset: Dataset) -> Preprocessor: self.stats_ = _get_unique_value_indices( @@ -251,28 +224,19 @@ def _transform_pandas(self, df: pd.DataFrame): for column in self.columns: column_values = self.stats_[f"unique_values({column})"] if _is_series_composed_of_lists(df[column]): - df[f"tmp_{column}"] = df[column].map(lambda x: tuple(x)) - else: - df[f"tmp_{column}"] = df[column] - + df[column] = df[column].map(lambda x: tuple(x)) for column_value in column_values: - df[f"{column}_{column_value}"] = ( - df[f"tmp_{column}"] == column_value - ).astype(int) - - # Drop the temp columns. - df.drop([f"tmp_{column}" for column in columns_to_drop]) - + df[f"{column}_{column_value}"] = (df[column] == column_value).astype( + int + ) # Drop original unencoded columns. - if self.drop_original_columns: - df = df.drop(columns=list(columns_to_drop)) + df = df.drop(columns=list(columns_to_drop)) return df def __repr__(self): return ( f"{self.__class__.__name__}(columns={self.columns!r}, " - f"max_categories={self.max_categories!r}, " - f"drop_original_columns={self.drop_original_columns})" + f"max_categories={self.max_categories!r})" ) @@ -345,11 +309,7 @@ class MultiHotEncoder(Preprocessor): """ def __init__( - self, - columns: List[str], - *, - output_columns: Optional[List[str]] = None, - max_categories: Optional[Dict[str, int]] = None, + self, columns: List[str], *, max_categories: Optional[Dict[str, int]] = None ): # TODO: add `drop` parameter. self.columns = columns