Skip to content

Commit

Permalink
[Data] Improve error messages (ray-project#37752)
Browse files Browse the repository at this point in the history
This PR improves the clarity of two error messages.
---------

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Co-authored-by: Amog Kamsetty <[email protected]>
  • Loading branch information
bveeramani and amogkam committed Jul 25, 2023
1 parent 13f8bdd commit 0a383da
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,12 @@ def shutdown(self):
):
# The user specified a batch size, but it was probably too large.
logger.get_logger().warning(
"To ensure full parallelization across an actor pool of size "
f"{min_workers}, the specified batch size "
f"should be at most {max_desired_batch_size}. Your configured batch "
f"size for this operator was {self._min_rows_per_bundle}."
f"Your batch size is too large. Currently, your batch size is "
f"{self._min_rows_per_bundle}. Your dataset contains {total_rows}, and "
f"Ray Data tried to parallelize it across {min_workers} actors. To "
f"parallelize this fully across all {min_workers} actors, set batch "
f"size to not exceed `{total_rows} / {min_workers} = "
f"{max_desired_batch_size}`."
)
elif len(self._output_metadata) < min_workers:
# The user created a stream that has too few blocks to begin with.
Expand Down
35 changes: 32 additions & 3 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,39 @@ def walk(op):
base_usage = base_usage.add(op.base_resource_usage())

if not base_usage.satisfies_limit(limits):
raise ValueError(
f"The base resource usage of this topology {base_usage} "
f"exceeds the execution limits {limits}!"
error_message = (
"The current cluster doesn't have the required resources to execute your "
"Dataset pipeline:\n"
)
if (
base_usage.cpu is not None
and limits.cpu is not None
and base_usage.cpu > limits.cpu
):
error_message += (
f"- Your application needs {base_usage.cpu} CPU(s), but your cluster "
f"only has {limits.cpu}.\n"
)
if (
base_usage.gpu is not None
and limits.gpu is not None
and base_usage.gpu > limits.gpu
):
error_message += (
f"- Your application needs {base_usage.gpu} GPU(s), but your cluster "
f"only has {limits.gpu}.\n"
)
if (
base_usage.object_store_memory is not None
and base_usage.object_store_memory is not None
and base_usage.object_store_memory > limits.object_store_memory
):
error_message += (
f"- Your application needs {base_usage.object_store_memory}B object "
f"store memory, but your cluster only has "
f"{limits.object_store_memory}B.\n"
)
raise ValueError(error_message.strip())


def _debug_dump_topology(topology: Topology) -> None:
Expand Down

0 comments on commit 0a383da

Please sign in to comment.