From cea281c495ef664ddf73bbe05c473122c0255362 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 24 Aug 2023 19:41:17 -0700 Subject: [PATCH] [Data] Restructure the user-provided optimizer rules file (#38829) * Disable EliminateBuildOutputBlocks rule temporarily Signed-off-by: Cheng Su * Change to use default and user-provided rules Signed-off-by: Cheng Su * Rename to _user_provided_optimizer_rules.py Signed-off-by: Cheng Su * Disable EliminateBuildOutputBlocks temporarily Signed-off-by: Cheng Su --------- Signed-off-by: Cheng Su --- .../ray/data/_internal/logical/optimizers.py | 22 ++++++++++++++----- .../logical/rules/_default_optimizer_rules.py | 17 -------------- .../rules/_user_provided_optimizer_rules.py | 7 ++++++ .../data/tests/test_dynamic_block_split.py | 1 + .../data/tests/test_execution_optimizer.py | 1 + 5 files changed, 26 insertions(+), 22 deletions(-) delete mode 100644 python/ray/data/_internal/logical/rules/_default_optimizer_rules.py create mode 100644 python/ray/data/_internal/logical/rules/_user_provided_optimizer_rules.py diff --git a/python/ray/data/_internal/logical/optimizers.py b/python/ray/data/_internal/logical/optimizers.py index 9821ee406c15f..72dfd70359767 100644 --- a/python/ray/data/_internal/logical/optimizers.py +++ b/python/ray/data/_internal/logical/optimizers.py @@ -6,19 +6,30 @@ PhysicalPlan, Rule, ) -from ray.data._internal.logical.rules._default_optimizer_rules import ( - get_logical_optimizer_rules, - get_physical_optimizer_rules, +from ray.data._internal.logical.rules._user_provided_optimizer_rules import ( + USER_PROVIDED_LOGICAL_RULES, + USER_PROVIDED_PHYSICAL_RULES, ) +from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule +from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule from ray.data._internal.planner.planner import Planner +DEFAULT_LOGICAL_RULES = [ + ReorderRandomizeBlocksRule, +] + +DEFAULT_PHYSICAL_RULES = [ + OperatorFusionRule, +] + class LogicalOptimizer(Optimizer): """The optimizer for logical operators.""" @property def rules(self) -> List[Rule]: - return [rule_cls() for rule_cls in get_logical_optimizer_rules()] + rules = DEFAULT_LOGICAL_RULES + USER_PROVIDED_LOGICAL_RULES + return [rule_cls() for rule_cls in rules] class PhysicalOptimizer(Optimizer): @@ -26,7 +37,8 @@ class PhysicalOptimizer(Optimizer): @property def rules(self) -> List["Rule"]: - return [rule_cls() for rule_cls in get_physical_optimizer_rules()] + rules = DEFAULT_PHYSICAL_RULES + USER_PROVIDED_PHYSICAL_RULES + return [rule_cls() for rule_cls in rules] def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan: diff --git a/python/ray/data/_internal/logical/rules/_default_optimizer_rules.py b/python/ray/data/_internal/logical/rules/_default_optimizer_rules.py deleted file mode 100644 index 410a1c13a4ee9..0000000000000 --- a/python/ray/data/_internal/logical/rules/_default_optimizer_rules.py +++ /dev/null @@ -1,17 +0,0 @@ -from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule -from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule -from ray.data._internal.logical.rules.zero_copy_map_fusion import ( - EliminateBuildOutputBlocks, -) - - -def get_logical_optimizer_rules(): - rules = [ReorderRandomizeBlocksRule] - return rules - - -def get_physical_optimizer_rules(): - # Subclasses of ZeroCopyMapFusionRule (e.g., EliminateBuildOutputBlocks) should - # be run after OperatorFusionRule. - rules = [OperatorFusionRule, EliminateBuildOutputBlocks] - return rules diff --git a/python/ray/data/_internal/logical/rules/_user_provided_optimizer_rules.py b/python/ray/data/_internal/logical/rules/_user_provided_optimizer_rules.py new file mode 100644 index 0000000000000..0034b13c669c4 --- /dev/null +++ b/python/ray/data/_internal/logical/rules/_user_provided_optimizer_rules.py @@ -0,0 +1,7 @@ +# Users can provide extra logical optimization rules here +# to be used in `LogicalOptimizer`. +USER_PROVIDED_LOGICAL_RULES = [] + +# Users can provide extra physical optimization rules here +# to be used in `PhysicalOptimizer`. +USER_PROVIDED_PHYSICAL_RULES = [] diff --git a/python/ray/data/tests/test_dynamic_block_split.py b/python/ray/data/tests/test_dynamic_block_split.py index 0d5de4acbf6e3..8faa975243bad 100644 --- a/python/ray/data/tests/test_dynamic_block_split.py +++ b/python/ray/data/tests/test_dynamic_block_split.py @@ -325,6 +325,7 @@ def test_lazy_block_list(shutdown_only, target_max_block_size): assert block_metadata.schema is not None +@pytest.mark.skip("Needs zero-copy optimization for read->map_batches.") def test_read_large_data(ray_start_cluster): # Test 20G input with single task num_blocks_per_task = 20 diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 10fd6bb991740..78ca90330410b 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -1484,6 +1484,7 @@ def check_transform_fns(op, expected_types): assert isinstance(transform_fn, expected_types[i]), transform_fn +@pytest.mark.skip("Needs zero-copy optimization for read->map_batches.") def test_zero_copy_fusion_eliminate_build_output_blocks( ray_start_regular_shared, enable_optimizer ):