Skip to content

Commit

Permalink
[FLINK-20897][table-planner] Add FlinkContext.isBatchMode flag
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Aug 13, 2021
1 parent 21df58a commit e23498d
Show file tree
Hide file tree
Showing 25 changed files with 45 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class PlannerContext {
private final FrameworkConfig frameworkConfig;

public PlannerContext(
boolean isBatchMode,
TableConfig tableConfig,
FunctionCatalog functionCatalog,
CatalogManager catalogManager,
Expand All @@ -111,7 +112,11 @@ public PlannerContext(

this.context =
new FlinkContextImpl(
tableConfig, functionCatalog, catalogManager, rexConverterFactory);
isBatchMode,
tableConfig,
functionCatalog,
catalogManager,
rexConverterFactory);

this.rootSchema = rootSchema;
this.traitDefs = traitDefs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public SourceAbilityContext(FlinkContext context, RowType sourceRowType) {
this.sourceRowType = sourceRowType;
}

@Override
public boolean isBatchMode() {
return context.isBatchMode();
}

@Override
public TableConfig getTableConfig() {
return context.getTableConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import org.apache.calcite.plan.Context
*/
trait FlinkContext extends Context {

/**
* Returns whether the planner runs in batch mode.
*/
def isBatchMode: Boolean

/**
* Gets [[TableConfig]] instance defined in [[org.apache.flink.table.api.TableEnvironment]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}

class FlinkContextImpl(
inBatchMode: Boolean,
tableConfig: TableConfig,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager,
toRexFactory: SqlExprToRexConverterFactory)
extends FlinkContext {

override def isBatchMode: Boolean = inBatchMode

override def getTableConfig: TableConfig = tableConfig

override def getFunctionCatalog: FunctionCatalog = functionCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ abstract class PlannerBase(
@VisibleForTesting
private[flink] val plannerContext: PlannerContext =
new PlannerContext(
!isStreamingMode,
config,
functionCatalog,
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
val context = relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext])

programs.optimize(relNode, new BatchOptimizeContext {

override def isBatchMode: Boolean = true

override def getTableConfig: TableConfig = config

override def getFunctionCatalog: FunctionCatalog = planner.functionCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)

programs.optimize(relNode, new StreamOptimizeContext() {

override def isBatchMode: Boolean = false

override def getTableConfig: TableConfig = config

override def getFunctionCatalog: FunctionCatalog = planner.functionCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class ParserImplTest {
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
private final PlannerContext plannerContext =
new PlannerContext(
!isStreamingMode,
tableConfig,
functionCatalog,
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ExpressionConverterTest {
private final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
private final PlannerContext plannerContext =
new PlannerContext(
false,
tableConfig,
new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()),
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public class SqlToOperationConverterTest {
catalogManager.getCurrentDatabase());
private final PlannerContext plannerContext =
new PlannerContext(
false,
tableConfig,
functionCatalog,
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void testDynamicTableSinkSpecSerde() throws IOException {
SerdeContext serdeCtx =
new SerdeContext(
new FlinkContextImpl(
false,
TableConfig.getDefault(),
null,
CatalogManagerMocks.createEmptyCatalogManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testDynamicTableSourceSpecSerde() throws IOException {
SerdeContext serdeCtx =
new SerdeContext(
new FlinkContextImpl(
false,
TableConfig.getDefault(),
null,
CatalogManagerMocks.createEmptyCatalogManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class LogicalTypeSerdeTest {
public void testLogicalTypeSerde() throws IOException {
SerdeContext serdeCtx =
new SerdeContext(
new FlinkContextImpl(TableConfig.getDefault(), null, null, null),
new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null),
Thread.currentThread().getContextClassLoader(),
FlinkTypeFactory.INSTANCE(),
FlinkSqlOperatorTable.instance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static List<LogicalWindow> testData() {
public void testLogicalWindowSerde() throws JsonProcessingException {
SerdeContext serdeCtx =
new SerdeContext(
new FlinkContextImpl(TableConfig.getDefault(), null, null, null),
new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null),
Thread.currentThread().getContextClassLoader(),
FlinkTypeFactory.INSTANCE(),
FlinkSqlOperatorTable.instance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void testLookupKey() throws JsonProcessingException {
.build();
FlinkContext flinkContext =
new FlinkContextImpl(
false,
tableConfig,
new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()),
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public static Collection<RelDataType> parameters() {
public void testTypeSerde() throws Exception {
SerdeContext serdeCtx =
new SerdeContext(
new FlinkContextImpl(TableConfig.getDefault(), null, null, null),
new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null),
Thread.currentThread().getContextClassLoader(),
FlinkTypeFactory.INSTANCE(),
FlinkSqlOperatorTable.instance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public static Object[][] parameters() {
.build();
FlinkContext flinkContext =
new FlinkContextImpl(
false,
tableConfig,
new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()),
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class RexWindowBoundSerdeTest {
public void testSerde() throws JsonProcessingException {
SerdeContext serdeCtx =
new SerdeContext(
new FlinkContextImpl(TableConfig.getDefault(), null, null, null),
new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null),
Thread.currentThread().getContextClassLoader(),
FlinkTypeFactory.INSTANCE(),
FlinkSqlOperatorTable.instance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void testTemporalTableSourceSpecSerde() throws IOException {
SerdeContext serdeCtx =
new SerdeContext(
new FlinkContextImpl(
false,
TableConfig.getDefault(),
null,
CatalogManagerMocks.createEmptyCatalogManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,18 @@
/** A utility class for creating an instance of {@link FlinkPlannerImpl} for testing. */
public class PlannerMocks {
public static FlinkPlannerImpl createDefaultPlanner() {
final boolean isStreamingMode = false;
TableConfig tableConfig = new TableConfig();
CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
ModuleManager moduleManager = new ModuleManager();
FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
PlannerContext plannerContext =
new PlannerContext(
false,
tableConfig,
functionCatalog,
catalogManager,
asRootSchema(
new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true)),
new ArrayList<>());
FlinkPlannerImpl planner =
plannerContext.createFlinkPlanner(
Expand All @@ -61,7 +60,7 @@ public static FlinkPlannerImpl createDefaultPlanner() {
planner::parser,
plannerContext.getSqlExprToRexConverterFactory());
catalogManager.initSchemaResolver(
isStreamingMode,
true,
ExpressionResolver.resolverFor(
tableConfig,
name -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) {
val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val functionCatalog = new FunctionCatalog(config, catalogManager, new ModuleManager)
val plannerContext = new PlannerContext(
false,
config,
functionCatalog,
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class AggCallSelectivityEstimatorTest {
rootSchema.add("test", table)
val plannerContext: PlannerContext =
new PlannerContext(
false,
tableConfig,
new FunctionCatalog(tableConfig, catalogManager, new ModuleManager),
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class FlinkRelMdHandlerTestBase {
// and RelOptCluster due to they have different trait definitions.
val plannerContext: PlannerContext =
new PlannerContext(
false,
tableConfig,
new FunctionCatalog(tableConfig, catalogManager, moduleManager),
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class SelectivityEstimatorTest {
rootSchema.add("test", table)
val plannerContext: PlannerContext =
new PlannerContext(
false,
tableConfig,
new FunctionCatalog(tableConfig, catalogManager, new ModuleManager),
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RelNodeTestBase {
val moduleManager = new ModuleManager

val plannerContext: PlannerContext = new PlannerContext(
false,
tableConfig,
new FunctionCatalog(tableConfig, catalogManager, moduleManager),
catalogManager,
Expand Down

0 comments on commit e23498d

Please sign in to comment.