Skip to content

Commit

Permalink
[FLINK-17339][misc] Update tests due to default planner changing.
Browse files Browse the repository at this point in the history
  • Loading branch information
KurtYoung committed Apr 26, 2020
1 parent aa4e1d6 commit a9f8a0b
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 13 deletions.
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ under the License.
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");
String planner = params.get("planner", "old");
String planner = params.get("planner", "blink");

final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
builder.inStreamingMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;

Expand Down Expand Up @@ -150,7 +151,9 @@ public BatchTableEnvironment getBatchTableEnvironment() {
*/
public StreamTableEnvironment getStreamTableEnvironment() {
if (null == streamTableEnv) {
streamTableEnv = StreamTableEnvironment.create(getStreamExecutionEnvironment());
streamTableEnv = StreamTableEnvironment.create(
getStreamExecutionEnvironment(),
EnvironmentSettings.newInstance().useOldPlanner().build());
}
return streamTableEnv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;

Expand Down Expand Up @@ -56,7 +57,8 @@ public void testConstructWithBatchEnv() {
@Test
public void testConstructWithStreamEnv() {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(
streamExecutionEnvironment, EnvironmentSettings.newInstance().useOldPlanner().build());

MLEnvironment mlEnvironment = new MLEnvironment(streamExecutionEnvironment, streamTableEnvironment);

Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/table/tests/test_environment_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ def test_planner_selection(self):

self.assertEqual(
envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
OLD_PLANNER_FACTORY)
BLINK_PLANNER_FACTORY)

self.assertEqual(
envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
OLD_EXECUTOR_FACTORY)
BLINK_EXECUTOR_FACTORY)

# test use_old_planner
envrionment_settings = builder.use_old_planner().build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,16 @@ def test_create_table_environment_with_blink_planner(self):

self.assertEqual(
planner.getClass().getName(),
"org.apache.flink.table.planner.StreamPlanner")
"org.apache.flink.table.planner.delegation.StreamPlanner")

t_env = StreamTableEnvironment.create(
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
environment_settings=EnvironmentSettings.new_instance().use_old_planner().build())

planner = t_env._j_tenv.getPlanner()

self.assertEqual(
planner.getClass().getName(),
"org.apache.flink.table.planner.delegation.StreamPlanner")
"org.apache.flink.table.planner.StreamPlanner")

def test_table_environment_with_blink_planner(self):
self.env.set_parallelism(1)
Expand Down
8 changes: 6 additions & 2 deletions flink-python/pyflink/testing/test_case_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from py4j.java_gateway import JavaObject
from py4j.protocol import Py4JJavaError

from pyflink.table import TableConfig
from pyflink.table.sources import CsvTableSource
from pyflink.dataset.execution_environment import ExecutionEnvironment
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
Expand Down Expand Up @@ -123,7 +124,10 @@ def setUp(self):
super(PyFlinkStreamTableTestCase, self).setUp()
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.t_env = StreamTableEnvironment.create(self.env)
self.t_env = StreamTableEnvironment.create(
self.env,
environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_old_planner().build())


class PyFlinkBatchTableTestCase(PyFlinkTestCase):
Expand All @@ -135,7 +139,7 @@ def setUp(self):
super(PyFlinkBatchTableTestCase, self).setUp()
self.env = ExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.t_env = BatchTableEnvironment.create(self.env)
self.t_env = BatchTableEnvironment.create(self.env, TableConfig())

def collect(self, table):
j_table = table._j_table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.flink.api.scala

import java.io.{BufferedReader, File, FileOutputStream}

import org.apache.flink.api.java.{JarHelper, ScalaShellEnvironment, ScalaShellStreamEnvironment}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.util.AbstractID

Expand Down Expand Up @@ -90,7 +90,8 @@ class FlinkILoop(
val scalaBenv = new ExecutionEnvironment(remoteBenv)
val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
val scalaBTEnv = BatchTableEnvironment.create(scalaBenv)
val scalaSTEnv = StreamTableEnvironment.create(scalaSenv)
val scalaSTEnv = StreamTableEnvironment.create(
scalaSenv, EnvironmentSettings.newInstance().useOldPlanner().build())
(scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv)
}

Expand Down

0 comments on commit a9f8a0b

Please sign in to comment.