Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Autodetect dataset parallelism based on available resources and data size #25883

Merged
merged 61 commits into from
Jul 13, 2022

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Jun 17, 2022

Why are these changes needed?

This PR defaults the parallelism of Dataset reads to -1. The parallelism is determined according to the following rule in this case:

  • The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
  • The parallelism is set to the estimated number of CPUs multiplied by 2.
  • The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:

  1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
  2. Overly large block sizes leading to OOMs when processing a single block.

TODO:

  • Unit tests
  • Docs update

Supercedes part of: #25708

@ericl ericl changed the title [WIP] Autodetect dataset parallelism based on available resources [WIP] Autodetect dataset parallelism based on available resources and data size Jun 17, 2022
@ericl
Copy link
Contributor Author

ericl commented Jul 13, 2022

test_tensors_shuffle failing

@ericl ericl merged commit 9de1add into ray-project:master Jul 13, 2022
self._columns = columns
self._schema = schema

def estimate_inmemory_data_size(self) -> Optional[int]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the late comment. but I think it's probably a bug here to rely on serialized_size, which seems to be size of file footer, not size of actual data. Crafted a quick fix in #26516 , please let me know if it makes sense or not, thanks.

ericl added a commit that referenced this pull request Jul 14, 2022
… for parallelism detection (#26543)

In the previous PR #25883, a subtle regression was introduced in the case where data sizes blow up significantly.

For example, suppose you're reading jpeg-image files from a Dataset, which increase in size substantially on decompression. On a small-core cluster (e.g., 4 cores), you end up with 4-8 blocks of ~200MiB each when reading a 1GiB dataset. This can blow up to OOM the node when decompressed (e.g., 25x size increase).

Previously the heuristic to use parallelism=200 avoids this small-node problem. This PR avoids this issue by (1) raising the min parallelism back to 200. As an optimization, we also introduce the min block size threshold, which allows using fewer blocks if the data size is really small (<100KiB per block).
xwjiang2010 pushed a commit to xwjiang2010/ray that referenced this pull request Jul 19, 2022
… for parallelism detection (ray-project#26543)

In the previous PR ray-project#25883, a subtle regression was introduced in the case where data sizes blow up significantly.

For example, suppose you're reading jpeg-image files from a Dataset, which increase in size substantially on decompression. On a small-core cluster (e.g., 4 cores), you end up with 4-8 blocks of ~200MiB each when reading a 1GiB dataset. This can blow up to OOM the node when decompressed (e.g., 25x size increase).

Previously the heuristic to use parallelism=200 avoids this small-node problem. This PR avoids this issue by (1) raising the min parallelism back to 200. As an optimization, we also introduce the min block size threshold, which allows using fewer blocks if the data size is really small (<100KiB per block).

Signed-off-by: Xiaowei Jiang <[email protected]>
Stefan-1313 pushed a commit to Stefan-1313/ray_mod that referenced this pull request Aug 18, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <[email protected]>
Signed-off-by: Stefan van der Kleij <[email protected]>
Stefan-1313 pushed a commit to Stefan-1313/ray_mod that referenced this pull request Aug 18, 2022
… for parallelism detection (ray-project#26543)

In the previous PR ray-project#25883, a subtle regression was introduced in the case where data sizes blow up significantly.

For example, suppose you're reading jpeg-image files from a Dataset, which increase in size substantially on decompression. On a small-core cluster (e.g., 4 cores), you end up with 4-8 blocks of ~200MiB each when reading a 1GiB dataset. This can blow up to OOM the node when decompressed (e.g., 25x size increase).

Previously the heuristic to use parallelism=200 avoids this small-node problem. This PR avoids this issue by (1) raising the min parallelism back to 200. As an optimization, we also introduce the min block size threshold, which allows using fewer blocks if the data size is really small (<100KiB per block).

Signed-off-by: Stefan van der Kleij <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants