Skip to content

Commit

Permalink
[FLINK-17339][table-planner] Update test cases due to default planner…
Browse files Browse the repository at this point in the history
… change.
  • Loading branch information
KurtYoung committed Apr 26, 2020
1 parent f0a4b09 commit 04c52f8
Show file tree
Hide file tree
Showing 42 changed files with 612 additions and 1,132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class StreamTableEnvironmentTest {
@Test
public void testPassingExecutionParameters() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

tEnv.getConfig().addConfiguration(
new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.stream.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
Expand Down Expand Up @@ -456,7 +457,9 @@ public void testDataTypeBasedTypeInferenceNotSupported() throws Exception {
thrown.expectMessage("The new type inference for functions is only supported in the Blink planner.");

StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
streamExecEnvironment, settings);

tableEnvironment.createTemporarySystemFunction("func", SimpleScalarFunction.class);
Table table = tableEnvironment
Expand All @@ -477,6 +480,7 @@ public long eval(Integer i) {

private TableEnvironment getTableEnvironment() {
StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
return StreamTableEnvironment.create(streamExecEnvironment);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
return StreamTableEnvironment.create(streamExecEnvironment, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.utils.JavaStreamTestData;
Expand All @@ -47,7 +48,8 @@ public class JavaSqlITCase extends AbstractTestBase {
@Test
public void testRowRegisterRowWithNames() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
StreamITCase.clear();

List<Row> data = new ArrayList<>();
Expand Down Expand Up @@ -86,7 +88,8 @@ public void testRowRegisterRowWithNames() throws Exception {
@Test
public void testSelect() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
StreamITCase.clear();

DataStream<Tuple3<Integer, Long, String>> ds = JavaStreamTestData.getSmall3TupleDataSet(env);
Expand All @@ -111,7 +114,8 @@ public void testSelect() throws Exception {
@Test
public void testFilter() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
StreamITCase.clear();

DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = JavaStreamTestData.get5TupleDataStream(env);
Expand All @@ -136,7 +140,8 @@ public void testFilter() throws Exception {
@Test
public void testUnion() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
StreamITCase.clear();

DataStream<Tuple3<Integer, Long, String>> ds1 = JavaStreamTestData.getSmall3TupleDataSet(env);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.java.StreamTableEnvironment;
Expand Down Expand Up @@ -50,7 +51,9 @@ public void testDataTypeBasedTypeInferenceNotSupported() throws Exception {
thrown.expectMessage("The new type inference for functions is only supported in the Blink planner.");

StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
streamExecEnvironment, settings);

Table table = tableEnvironment
.sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)")
Expand All @@ -66,7 +69,9 @@ public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() throws Exc
thrown.expectMessage("The new type inference for functions is only supported in the Blink planner.");

StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
streamExecEnvironment, settings);

Table table = tableEnvironment
.sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS TableName(f0)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.utils.StreamITCase;
Expand Down Expand Up @@ -64,7 +65,9 @@ public void testTypeConversions() throws Exception {
);

StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
streamExecEnvironment, settings);

Table t = tableEnvironment.fromValues(
rowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.flink.table.api.stream

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.utils.TableTestUtil.streamTableNode
import org.apache.flink.test.util.AbstractTestBase

import org.junit.Assert.assertEquals
import org.junit._

Expand All @@ -34,7 +35,8 @@ class ExplainTest extends AbstractTestBase {
@Test
def testFilter(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv = StreamTableEnvironment.create(env, settings)

val scan = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
val table = scan.filter("a % 2 = 0")
Expand All @@ -50,7 +52,8 @@ class ExplainTest extends AbstractTestBase {
@Test
def testUnion(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv = StreamTableEnvironment.create(env, settings)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => JStreamTableEnvironmentImpl}
import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{Expressions, TableConfig, Types, ValidationException}
import org.apache.flink.table.api.{EnvironmentSettings, Expressions, TableConfig, Types, ValidationException}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.executor.StreamExecutor
import org.apache.flink.table.planner.StreamPlanner
Expand Down Expand Up @@ -83,7 +83,8 @@ class StreamTableEnvironmentTest extends TableTestBase {
" in order to handle add and retract messages.")

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv = StreamTableEnvironment.create(env, settings)

val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.table.api.stream

import java.math.BigDecimal

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.api.{EnvironmentSettings, TableException, ValidationException}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.stream.TimeAttributesITCase.TimestampWithEqualWatermark
import org.apache.flink.table.utils.TableTestBase

import org.junit.Test

class StreamTableEnvironmentValidationTest extends TableTestBase {
Expand Down Expand Up @@ -128,7 +128,8 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
def testInvalidTimeCharacteristicByPosition(): Unit = {
val data = List((1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv = StreamTableEnvironment.create(env, settings)
val stream = env
.fromCollection(data)
.assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ package org.apache.flink.table.api.stream.sql.validation
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{Types, ValidationException}
import org.apache.flink.table.api.{EnvironmentSettings, Types, ValidationException}
import org.apache.flink.table.runtime.utils.StreamTestData
import org.apache.flink.table.utils.MemoryTableSourceSinkUtil

import org.junit.Test

class InsertIntoValidationTest {

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

@Test(expected = classOf[ValidationException])
def testInconsistentLengthInsert(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("sourceTable", t)

Expand All @@ -51,9 +53,6 @@ class InsertIntoValidationTest {

@Test(expected = classOf[ValidationException])
def testUnmatchedTypesInsert(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("sourceTable", t)

Expand All @@ -72,9 +71,6 @@ class InsertIntoValidationTest {

@Test(expected = classOf[ValidationException])
def testUnsupportedPartialInsert(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("sourceTable", t)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ package org.apache.flink.table.api.stream.table.validation
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{Types, ValidationException}
import org.apache.flink.table.api.{EnvironmentSettings, Types, ValidationException}
import org.apache.flink.table.runtime.utils.StreamTestData
import org.apache.flink.table.utils.MemoryTableSourceSinkUtil

import org.junit.Test

class InsertIntoValidationTest {

@Test(expected = classOf[ValidationException])
def testInconsistentLengthInsert(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv = StreamTableEnvironment.create(env, settings)

val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("sourceTable", t)
Expand All @@ -53,7 +55,8 @@ class InsertIntoValidationTest {
@Test(expected = classOf[ValidationException])
def testUnmatchedTypesInsert(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv = StreamTableEnvironment.create(env, settings)

val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("sourceTable", t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.stream.table.validation.JoinValidationTest.WithoutEqualsHashCode
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.api.{EnvironmentSettings, TableException, ValidationException}
import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
import org.apache.flink.table.runtime.utils.StreamTestData
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row

import org.hamcrest.Matchers
import org.junit.Test

Expand All @@ -38,7 +39,8 @@ class JoinValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testInvalidStateTypes(): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tenv = StreamTableEnvironment.create(env, settings)
val ds = env.fromElements(new WithoutEqualsHashCode) // no equals/hashCode
val t = tenv.fromDataStream(ds)

Expand Down Expand Up @@ -207,8 +209,9 @@ class JoinValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testJoinTablesFromDifferentEnvs(): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv1 = StreamTableEnvironment.create(env)
val tEnv2 = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv1 = StreamTableEnvironment.create(env, settings)
val tEnv2 = StreamTableEnvironment.create(env, settings)
val ds1 = StreamTestData.get3TupleDataStream(env)
val ds2 = StreamTestData.get5TupleDataStream(env)
val in1 = tEnv1.fromDataStream(ds1, 'a, 'b, 'c)
Expand All @@ -221,8 +224,9 @@ class JoinValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testJoinTablesFromDifferentEnvsJava() {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv1 = StreamTableEnvironment.create(env)
val tEnv2 = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv1 = StreamTableEnvironment.create(env, settings)
val tEnv2 = StreamTableEnvironment.create(env, settings)
val ds1 = StreamTestData.get3TupleDataStream(env)
val ds2 = StreamTestData.get5TupleDataStream(env)
val in1 = tEnv1.fromDataStream(ds1, 'a, 'b, 'c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ package org.apache.flink.table.api.stream.table.validation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.{EnvironmentSettings, ValidationException}
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row

import org.junit.Assert.assertEquals
import org.junit.Test

Expand All @@ -35,7 +36,8 @@ class SetOperatorsValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testUnionFieldsNameNotOverlap1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv = StreamTableEnvironment.create(env, settings)

StreamITCase.testResults = mutable.MutableList()
val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
Expand All @@ -53,7 +55,8 @@ class SetOperatorsValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testUnionFieldsNameNotOverlap2(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv = StreamTableEnvironment.create(env, settings)

StreamITCase.testResults = mutable.MutableList()
val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
Expand All @@ -72,8 +75,9 @@ class SetOperatorsValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testUnionTablesFromDifferentEnv(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv1 = StreamTableEnvironment.create(env)
val tEnv2 = StreamTableEnvironment.create(env)
val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
val tEnv1 = StreamTableEnvironment.create(env, settings)
val tEnv2 = StreamTableEnvironment.create(env, settings)

val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
Expand Down
Loading

0 comments on commit 04c52f8

Please sign in to comment.