diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java index f9c939789d01c..8ff2429e93c9b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java @@ -102,6 +102,7 @@ public class PlannerContext { private final FrameworkConfig frameworkConfig; public PlannerContext( + boolean isBatchMode, TableConfig tableConfig, FunctionCatalog functionCatalog, CatalogManager catalogManager, @@ -111,7 +112,11 @@ public PlannerContext( this.context = new FlinkContextImpl( - tableConfig, functionCatalog, catalogManager, rexConverterFactory); + isBatchMode, + tableConfig, + functionCatalog, + catalogManager, + rexConverterFactory); this.rootSchema = rootSchema; this.traitDefs = traitDefs; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java index 4c53fc4616e0b..ccb772e73cf56 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java @@ -51,6 +51,11 @@ public SourceAbilityContext(FlinkContext context, RowType sourceRowType) { this.sourceRowType = sourceRowType; } + @Override + public boolean isBatchMode() { + return context.isBatchMode(); + } + @Override public TableConfig getTableConfig() { return context.getTableConfig(); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContext.scala index f6b19d409ce42..bb8ed527a6991 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContext.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContext.scala @@ -28,6 +28,11 @@ import org.apache.calcite.plan.Context */ trait FlinkContext extends Context { + /** + * Returns whether the planner runs in batch mode. + */ + def isBatchMode: Boolean + /** * Gets [[TableConfig]] instance defined in [[org.apache.flink.table.api.TableEnvironment]]. */ diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContextImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContextImpl.scala index 6c669e05ae0e0..0850099d75cd7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContextImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContextImpl.scala @@ -22,12 +22,15 @@ import org.apache.flink.table.api.TableConfig import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} class FlinkContextImpl( + inBatchMode: Boolean, tableConfig: TableConfig, functionCatalog: FunctionCatalog, catalogManager: CatalogManager, toRexFactory: SqlExprToRexConverterFactory) extends FlinkContext { + override def isBatchMode: Boolean = inBatchMode + override def getTableConfig: TableConfig = tableConfig override def getFunctionCatalog: FunctionCatalog = functionCatalog diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 25248917d9418..dc7f8bbe2843d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -101,6 +101,7 @@ abstract class PlannerBase( @VisibleForTesting private[flink] val plannerContext: PlannerContext = new PlannerContext( + !isStreamingMode, config, functionCatalog, catalogManager, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala index 08f47cb8c9203..b838fa6f0230e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala @@ -85,6 +85,9 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner) val context = relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]) programs.optimize(relNode, new BatchOptimizeContext { + + override def isBatchMode: Boolean = true + override def getTableConfig: TableConfig = config override def getFunctionCatalog: FunctionCatalog = planner.functionCatalog diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala index 32730b74f685a..7fc101f612c8b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala @@ -162,6 +162,8 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) programs.optimize(relNode, new StreamOptimizeContext() { + override def isBatchMode: Boolean = false + override def getTableConfig: TableConfig = config override def getFunctionCatalog: FunctionCatalog = planner.functionCatalog diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java index f8ea28b6b77f3..619cf450cb6a8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java @@ -59,6 +59,7 @@ public class ParserImplTest { new FunctionCatalog(tableConfig, catalogManager, moduleManager); private final PlannerContext plannerContext = new PlannerContext( + !isStreamingMode, tableConfig, functionCatalog, catalogManager, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java index 51b0b07df5a6d..7e3eba6b2bea1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java @@ -62,6 +62,7 @@ public class ExpressionConverterTest { private final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager(); private final PlannerContext plannerContext = new PlannerContext( + false, tableConfig, new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()), catalogManager, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 7f00ea3bcfbb1..e2e06f1cb74a9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -139,6 +139,7 @@ public class SqlToOperationConverterTest { catalogManager.getCurrentDatabase()); private final PlannerContext plannerContext = new PlannerContext( + false, tableConfig, functionCatalog, catalogManager, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java index 6f983963877fc..df606babde9d6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java @@ -69,6 +69,7 @@ public void testDynamicTableSinkSpecSerde() throws IOException { SerdeContext serdeCtx = new SerdeContext( new FlinkContextImpl( + false, TableConfig.getDefault(), null, CatalogManagerMocks.createEmptyCatalogManager(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java index 74eaadf99ebd0..4a0ed34e59cf5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java @@ -89,6 +89,7 @@ public void testDynamicTableSourceSpecSerde() throws IOException { SerdeContext serdeCtx = new SerdeContext( new FlinkContextImpl( + false, TableConfig.getDefault(), null, CatalogManagerMocks.createEmptyCatalogManager(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java index 4306b45eff964..7e2306adad1ff 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java @@ -95,7 +95,7 @@ public class LogicalTypeSerdeTest { public void testLogicalTypeSerde() throws IOException { SerdeContext serdeCtx = new SerdeContext( - new FlinkContextImpl(TableConfig.getDefault(), null, null, null), + new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), FlinkSqlOperatorTable.instance()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowSerdeTest.java index 93de983603229..864f037cf5adb 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowSerdeTest.java @@ -115,7 +115,7 @@ public static List testData() { public void testLogicalWindowSerde() throws JsonProcessingException { SerdeContext serdeCtx = new SerdeContext( - new FlinkContextImpl(TableConfig.getDefault(), null, null, null), + new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), FlinkSqlOperatorTable.instance()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupKeySerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupKeySerdeTest.java index 915958c1d0989..6738b98d8b236 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupKeySerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupKeySerdeTest.java @@ -56,6 +56,7 @@ public void testLookupKey() throws JsonProcessingException { .build(); FlinkContext flinkContext = new FlinkContextImpl( + false, tableConfig, new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()), catalogManager, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeSerdeTest.java index b8c3d0acb2da4..d60ed37c2e658 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeSerdeTest.java @@ -259,7 +259,7 @@ public static Collection parameters() { public void testTypeSerde() throws Exception { SerdeContext serdeCtx = new SerdeContext( - new FlinkContextImpl(TableConfig.getDefault(), null, null, null), + new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), FlinkSqlOperatorTable.instance()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java index 7c7d6672b3fdb..40623da962b0e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java @@ -86,6 +86,7 @@ public static Object[][] parameters() { .build(); FlinkContext flinkContext = new FlinkContextImpl( + false, tableConfig, new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()), catalogManager, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java index 190f11c0c7f7f..70d6b673a38ae 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java @@ -43,7 +43,7 @@ public class RexWindowBoundSerdeTest { public void testSerde() throws JsonProcessingException { SerdeContext serdeCtx = new SerdeContext( - new FlinkContextImpl(TableConfig.getDefault(), null, null, null), + new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), FlinkSqlOperatorTable.instance()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java index 06c078ed07bf9..d880d1c7dcca4 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java @@ -65,6 +65,7 @@ public void testTemporalTableSourceSpecSerde() throws IOException { SerdeContext serdeCtx = new SerdeContext( new FlinkContextImpl( + false, TableConfig.getDefault(), null, CatalogManagerMocks.createEmptyCatalogManager(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java index becbed1f53556..83faafcc6c495 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java @@ -37,7 +37,6 @@ /** A utility class for creating an instance of {@link FlinkPlannerImpl} for testing. */ public class PlannerMocks { public static FlinkPlannerImpl createDefaultPlanner() { - final boolean isStreamingMode = false; TableConfig tableConfig = new TableConfig(); CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager(); ModuleManager moduleManager = new ModuleManager(); @@ -45,11 +44,11 @@ public static FlinkPlannerImpl createDefaultPlanner() { new FunctionCatalog(tableConfig, catalogManager, moduleManager); PlannerContext plannerContext = new PlannerContext( + false, tableConfig, functionCatalog, catalogManager, - asRootSchema( - new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), + asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true)), new ArrayList<>()); FlinkPlannerImpl planner = plannerContext.createFlinkPlanner( @@ -61,7 +60,7 @@ public static FlinkPlannerImpl createDefaultPlanner() { planner::parser, plannerContext.getSqlExprToRexConverterFactory()); catalogManager.initSchemaResolver( - isStreamingMode, + true, ExpressionResolver.resolverFor( tableConfig, name -> { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala index 00350e00479cf..5ede86eda74bf 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala @@ -61,6 +61,7 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) { val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager() val functionCatalog = new FunctionCatalog(config, catalogManager, new ModuleManager) val plannerContext = new PlannerContext( + false, config, functionCatalog, catalogManager, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala index 1bfb2c939faa0..13c7566ef09cd 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala @@ -84,6 +84,7 @@ class AggCallSelectivityEstimatorTest { rootSchema.add("test", table) val plannerContext: PlannerContext = new PlannerContext( + false, tableConfig, new FunctionCatalog(tableConfig, catalogManager, new ModuleManager), catalogManager, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 78cfd4b543e69..7f457e239766a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -88,6 +88,7 @@ class FlinkRelMdHandlerTestBase { // and RelOptCluster due to they have different trait definitions. val plannerContext: PlannerContext = new PlannerContext( + false, tableConfig, new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala index 77976dfa6199f..5ee1481b20348 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala @@ -85,6 +85,7 @@ class SelectivityEstimatorTest { rootSchema.add("test", table) val plannerContext: PlannerContext = new PlannerContext( + false, tableConfig, new FunctionCatalog(tableConfig, catalogManager, new ModuleManager), catalogManager, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala index d145627558c13..fb680a61ac129 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala @@ -51,6 +51,7 @@ class RelNodeTestBase { val moduleManager = new ModuleManager val plannerContext: PlannerContext = new PlannerContext( + false, tableConfig, new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager,