Skip to content

Commit

Permalink
[FLINK-17339][examples] Update examples due to default planner changing.
Browse files Browse the repository at this point in the history
  • Loading branch information
KurtYoung committed Apr 26, 2020
1 parent cb523d3 commit aa4e1d6
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,23 @@ public class StreamSQLExample {
public static void main(String[] args) throws Exception {

final ParameterTool params = ParameterTool.fromArgs(args);
String planner = params.has("planner") ? params.get("planner") : "flink";
String planner = params.has("planner") ? params.get("planner") : "blink";

// set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv;
if (Objects.equals(planner, "blink")) { // use blink planner in streaming mode
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
.inStreamingMode()
.useBlinkPlanner()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
} else if (Objects.equals(planner, "flink")) { // use flink planner in streaming mode
tEnv = StreamTableEnvironment.create(env);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useOldPlanner()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
} else {
System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
"where planner (it is either flink or blink, and the default is flink) indicates whether the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.table.examples.java;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
Expand All @@ -41,16 +40,9 @@
public class StreamWindowSQLExample {

public static void main(String[] args) throws Exception {

// set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// use blink planner in streaming mode,
// because watermark statement is only available in blink planner.
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// write source data into temporary file and get the absolute path
String contents = "1,beer,3,2019-12-12 00:00:01\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,22 @@ object StreamSQLExample {
def main(args: Array[String]): Unit = {

val params = ParameterTool.fromArgs(args)
val planner = if (params.has("planner")) params.get("planner") else "flink"
val planner = if (params.has("planner")) params.get("planner") else "blink"

// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = if (planner == "blink") { // use blink planner in streaming mode
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
.useBlinkPlanner()
.inStreamingMode()
.build()
StreamTableEnvironment.create(env, settings)
} else if (planner == "flink") { // use flink planner in streaming mode
StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
StreamTableEnvironment.create(env, settings)
} else {
System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
"where planner (it is either flink or blink, and the default is flink) indicates whether the " +
Expand Down

0 comments on commit aa4e1d6

Please sign in to comment.